1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! A module with mpmc channels for distributing global events.
16
17 use netsim_proto::common::ChipKind;
18 use std::sync::mpsc::{channel, Receiver, Sender};
19
20 use crate::devices::chip::ChipIdentifier;
21 use crate::devices::device::DeviceIdentifier;
22 use netsim_proto::stats::{
23 NetsimDeviceStats as ProtoDeviceStats, NetsimRadioStats as ProtoRadioStats,
24 };
25
26 use std::sync::{Arc, Mutex, OnceLock};
27
28 // Publish the event to all subscribers
publish(event: Event)29 pub fn publish(event: Event) {
30 get_events().lock().expect("Failed to acquire lock on events").publish(event);
31 }
32
33 // Subscribe to events over the receiver
subscribe() -> Receiver<Event>34 pub fn subscribe() -> Receiver<Event> {
35 get_events().lock().expect("Failed to acquire locks on events").subscribe()
36 }
37
38 #[derive(Clone, Debug, Default)]
39 pub struct DeviceAdded {
40 pub id: DeviceIdentifier,
41 pub name: String,
42 pub builtin: bool,
43 pub device_stats: ProtoDeviceStats,
44 }
45
46 #[derive(Clone, Debug, Default)]
47 pub struct DeviceRemoved {
48 pub id: DeviceIdentifier,
49 pub name: String,
50 pub builtin: bool,
51 }
52
53 #[derive(Clone, Debug, Default)]
54 pub struct DevicePatched {
55 pub id: DeviceIdentifier,
56 pub name: String,
57 }
58
59 #[derive(Clone, Debug, Default)]
60 pub struct ChipAdded {
61 pub chip_id: ChipIdentifier,
62 pub chip_kind: ChipKind,
63 pub device_name: String,
64 pub builtin: bool,
65 }
66
67 #[derive(Clone, Debug, Default)]
68 pub struct ChipRemoved {
69 pub chip_id: ChipIdentifier,
70 pub device_id: DeviceIdentifier,
71 pub remaining_nonbuiltin_devices: usize,
72 pub radio_stats: Vec<ProtoRadioStats>,
73 }
74
75 #[derive(Clone, Debug, Default)]
76 pub struct ShutDown {
77 pub reason: String,
78 }
79
80 /// Event messages shared across various components in a loosely
81 /// coupled manner.
82 #[derive(Clone, Debug)]
83 pub enum Event {
84 DeviceAdded(DeviceAdded),
85 DeviceRemoved(DeviceRemoved),
86 DevicePatched(DevicePatched),
87 DeviceReset,
88 ChipAdded(ChipAdded),
89 ChipRemoved(ChipRemoved),
90 ShutDown(ShutDown),
91 }
92
93 static EVENTS: OnceLock<Arc<Mutex<Events>>> = OnceLock::new();
94
get_events() -> Arc<Mutex<Events>>95 pub fn get_events() -> Arc<Mutex<Events>> {
96 EVENTS.get_or_init(Events::new).clone()
97 }
98
99 /// A multi-producer, multi-consumer broadcast queue based on
100 /// `std::sync::mpsc`.
101 ///
102 /// Each Event message `published` is seen by all subscribers.
103 ///
104 /// Warning: invoke `subscribe()` before `publish()` or else messages
105 /// will be lost.
106 ///
107 pub struct Events {
108 // For each subscriber this module retrain the sender half and the
109 // subscriber reads events from the receiver half.
110 subscribers: Vec<Sender<Event>>,
111 }
112
113 impl Events {
114 // Events is always owned by multiple publishers and subscribers
115 // across threads so return an Arc type.
new() -> Arc<Mutex<Events>>116 fn new() -> Arc<Mutex<Events>> {
117 Arc::new(Mutex::new(Self { subscribers: Vec::new() }))
118 }
119
120 // Creates a new asynchronous channel, returning the receiver
121 // half. All `Event` messages sent through `publish` will become
122 // available on the receiver in the same order as it was sent.
subscribe(&mut self) -> Receiver<Event>123 fn subscribe(&mut self) -> Receiver<Event> {
124 let (tx, rx) = channel::<Event>();
125 self.subscribers.push(tx);
126 rx
127 }
128
129 // Attempts to send an Event on the events channel.
publish(&mut self, msg: Event)130 pub fn publish(&mut self, msg: Event) {
131 if self.subscribers.is_empty() {
132 log::warn!("No Subscribers to the event: {msg:?}");
133 } else {
134 // Any channel with a disconnected receiver will return an
135 // error and be removed by retain.
136 log::info!("{msg:?}");
137 self.subscribers.retain(|subscriber| subscriber.send(msg.clone()).is_ok())
138 }
139 }
140 }
141
142 // Test public functions to allow testing with local Events struct.
143 #[cfg(test)]
144 pub mod test {
145 use super::*;
146
new() -> Arc<Mutex<Events>>147 pub fn new() -> Arc<Mutex<Events>> {
148 Events::new()
149 }
150
publish(s: &mut Arc<Mutex<Events>>, msg: Event)151 pub fn publish(s: &mut Arc<Mutex<Events>>, msg: Event) {
152 s.lock().unwrap().publish(msg);
153 }
154
subscribe(s: &mut Arc<Mutex<Events>>) -> Receiver<Event>155 pub fn subscribe(s: &mut Arc<Mutex<Events>>) -> Receiver<Event> {
156 s.lock().unwrap().subscribe()
157 }
158 }
159
160 #[cfg(test)]
161 mod tests {
162 use super::Events;
163 use super::*;
164 use std::sync::Arc;
165 use std::thread;
166
167 #[test]
test_subscribe_and_publish()168 fn test_subscribe_and_publish() {
169 let events = Events::new();
170
171 let events_clone = Arc::clone(&events);
172 let rx = events_clone.lock().unwrap().subscribe();
173 let handle = thread::spawn(move || match rx.recv() {
174 Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => {
175 assert_eq!(id.0, 123);
176 assert_eq!(name, "Device1");
177 assert!(!builtin);
178 assert_eq!(device_stats, ProtoDeviceStats::new());
179 }
180 _ => panic!("Unexpected event"),
181 });
182
183 events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
184 id: DeviceIdentifier(123),
185 name: "Device1".into(),
186 builtin: false,
187 device_stats: ProtoDeviceStats::new(),
188 }));
189
190 // Wait for the other thread to process the message.
191 handle.join().unwrap();
192 }
193
194 #[test]
test_publish_to_multiple_subscribers()195 fn test_publish_to_multiple_subscribers() {
196 let events = Events::new();
197
198 let num_subscribers = 10;
199 let mut handles = Vec::with_capacity(num_subscribers);
200 for _ in 0..num_subscribers {
201 let events_clone = Arc::clone(&events);
202 let rx = events_clone.lock().unwrap().subscribe();
203 let handle = thread::spawn(move || match rx.recv() {
204 Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => {
205 assert_eq!(id.0, 123);
206 assert_eq!(name, "Device1");
207 assert!(!builtin);
208 assert_eq!(device_stats, ProtoDeviceStats::new());
209 }
210 _ => panic!("Unexpected event"),
211 });
212 handles.push(handle);
213 }
214
215 events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
216 id: DeviceIdentifier(123),
217 name: "Device1".into(),
218 builtin: false,
219 device_stats: ProtoDeviceStats::new(),
220 }));
221
222 // Wait for the other threads to process the message.
223 for handle in handles {
224 handle.join().unwrap();
225 }
226 }
227
228 #[test]
229 // Test the case where the subscriber half of the channel returned
230 // by subscribe() is dropped. We expect the subscriber to be auto
231 // removed when send() notices an error.
test_publish_to_dropped_subscriber()232 fn test_publish_to_dropped_subscriber() {
233 let events = Events::new();
234 let rx = events.lock().unwrap().subscribe();
235 assert_eq!(events.lock().unwrap().subscribers.len(), 1);
236 std::mem::drop(rx);
237 events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
238 id: DeviceIdentifier(123),
239 name: "Device1".into(),
240 builtin: false,
241 device_stats: ProtoDeviceStats::new(),
242 }));
243 assert_eq!(events.lock().unwrap().subscribers.len(), 0);
244 }
245 }
246