xref: /aosp_15_r20/tools/netsim/rust/daemon/src/events.rs (revision cf78ab8cffb8fc9207af348f23af247fb04370a6)
1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! A module with mpmc channels for distributing global events.
16 
17 use netsim_proto::common::ChipKind;
18 use std::sync::mpsc::{channel, Receiver, Sender};
19 
20 use crate::devices::chip::ChipIdentifier;
21 use crate::devices::device::DeviceIdentifier;
22 use netsim_proto::stats::{
23     NetsimDeviceStats as ProtoDeviceStats, NetsimRadioStats as ProtoRadioStats,
24 };
25 
26 use std::sync::{Arc, Mutex, OnceLock};
27 
28 // Publish the event to all subscribers
publish(event: Event)29 pub fn publish(event: Event) {
30     get_events().lock().expect("Failed to acquire lock on events").publish(event);
31 }
32 
33 // Subscribe to events over the receiver
subscribe() -> Receiver<Event>34 pub fn subscribe() -> Receiver<Event> {
35     get_events().lock().expect("Failed to acquire locks on events").subscribe()
36 }
37 
38 #[derive(Clone, Debug, Default)]
39 pub struct DeviceAdded {
40     pub id: DeviceIdentifier,
41     pub name: String,
42     pub builtin: bool,
43     pub device_stats: ProtoDeviceStats,
44 }
45 
46 #[derive(Clone, Debug, Default)]
47 pub struct DeviceRemoved {
48     pub id: DeviceIdentifier,
49     pub name: String,
50     pub builtin: bool,
51 }
52 
53 #[derive(Clone, Debug, Default)]
54 pub struct DevicePatched {
55     pub id: DeviceIdentifier,
56     pub name: String,
57 }
58 
59 #[derive(Clone, Debug, Default)]
60 pub struct ChipAdded {
61     pub chip_id: ChipIdentifier,
62     pub chip_kind: ChipKind,
63     pub device_name: String,
64     pub builtin: bool,
65 }
66 
67 #[derive(Clone, Debug, Default)]
68 pub struct ChipRemoved {
69     pub chip_id: ChipIdentifier,
70     pub device_id: DeviceIdentifier,
71     pub remaining_nonbuiltin_devices: usize,
72     pub radio_stats: Vec<ProtoRadioStats>,
73 }
74 
75 #[derive(Clone, Debug, Default)]
76 pub struct ShutDown {
77     pub reason: String,
78 }
79 
80 /// Event messages shared across various components in a loosely
81 /// coupled manner.
82 #[derive(Clone, Debug)]
83 pub enum Event {
84     DeviceAdded(DeviceAdded),
85     DeviceRemoved(DeviceRemoved),
86     DevicePatched(DevicePatched),
87     DeviceReset,
88     ChipAdded(ChipAdded),
89     ChipRemoved(ChipRemoved),
90     ShutDown(ShutDown),
91 }
92 
93 static EVENTS: OnceLock<Arc<Mutex<Events>>> = OnceLock::new();
94 
get_events() -> Arc<Mutex<Events>>95 pub fn get_events() -> Arc<Mutex<Events>> {
96     EVENTS.get_or_init(Events::new).clone()
97 }
98 
99 /// A multi-producer, multi-consumer broadcast queue based on
100 /// `std::sync::mpsc`.
101 ///
102 /// Each Event message `published` is seen by all subscribers.
103 ///
104 /// Warning: invoke `subscribe()` before `publish()` or else messages
105 /// will be lost.
106 ///
107 pub struct Events {
108     // For each subscriber this module retrain the sender half and the
109     // subscriber reads events from the receiver half.
110     subscribers: Vec<Sender<Event>>,
111 }
112 
113 impl Events {
114     // Events is always owned by multiple publishers and subscribers
115     // across threads so return an Arc type.
new() -> Arc<Mutex<Events>>116     fn new() -> Arc<Mutex<Events>> {
117         Arc::new(Mutex::new(Self { subscribers: Vec::new() }))
118     }
119 
120     // Creates a new asynchronous channel, returning the receiver
121     // half. All `Event` messages sent through `publish` will become
122     // available on the receiver in the same order as it was sent.
subscribe(&mut self) -> Receiver<Event>123     fn subscribe(&mut self) -> Receiver<Event> {
124         let (tx, rx) = channel::<Event>();
125         self.subscribers.push(tx);
126         rx
127     }
128 
129     // Attempts to send an Event on the events channel.
publish(&mut self, msg: Event)130     pub fn publish(&mut self, msg: Event) {
131         if self.subscribers.is_empty() {
132             log::warn!("No Subscribers to the event: {msg:?}");
133         } else {
134             // Any channel with a disconnected receiver will return an
135             // error and be removed by retain.
136             log::info!("{msg:?}");
137             self.subscribers.retain(|subscriber| subscriber.send(msg.clone()).is_ok())
138         }
139     }
140 }
141 
142 // Test public functions to allow testing with local Events struct.
143 #[cfg(test)]
144 pub mod test {
145     use super::*;
146 
new() -> Arc<Mutex<Events>>147     pub fn new() -> Arc<Mutex<Events>> {
148         Events::new()
149     }
150 
publish(s: &mut Arc<Mutex<Events>>, msg: Event)151     pub fn publish(s: &mut Arc<Mutex<Events>>, msg: Event) {
152         s.lock().unwrap().publish(msg);
153     }
154 
subscribe(s: &mut Arc<Mutex<Events>>) -> Receiver<Event>155     pub fn subscribe(s: &mut Arc<Mutex<Events>>) -> Receiver<Event> {
156         s.lock().unwrap().subscribe()
157     }
158 }
159 
160 #[cfg(test)]
161 mod tests {
162     use super::Events;
163     use super::*;
164     use std::sync::Arc;
165     use std::thread;
166 
167     #[test]
test_subscribe_and_publish()168     fn test_subscribe_and_publish() {
169         let events = Events::new();
170 
171         let events_clone = Arc::clone(&events);
172         let rx = events_clone.lock().unwrap().subscribe();
173         let handle = thread::spawn(move || match rx.recv() {
174             Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => {
175                 assert_eq!(id.0, 123);
176                 assert_eq!(name, "Device1");
177                 assert!(!builtin);
178                 assert_eq!(device_stats, ProtoDeviceStats::new());
179             }
180             _ => panic!("Unexpected event"),
181         });
182 
183         events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
184             id: DeviceIdentifier(123),
185             name: "Device1".into(),
186             builtin: false,
187             device_stats: ProtoDeviceStats::new(),
188         }));
189 
190         // Wait for the other thread to process the message.
191         handle.join().unwrap();
192     }
193 
194     #[test]
test_publish_to_multiple_subscribers()195     fn test_publish_to_multiple_subscribers() {
196         let events = Events::new();
197 
198         let num_subscribers = 10;
199         let mut handles = Vec::with_capacity(num_subscribers);
200         for _ in 0..num_subscribers {
201             let events_clone = Arc::clone(&events);
202             let rx = events_clone.lock().unwrap().subscribe();
203             let handle = thread::spawn(move || match rx.recv() {
204                 Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => {
205                     assert_eq!(id.0, 123);
206                     assert_eq!(name, "Device1");
207                     assert!(!builtin);
208                     assert_eq!(device_stats, ProtoDeviceStats::new());
209                 }
210                 _ => panic!("Unexpected event"),
211             });
212             handles.push(handle);
213         }
214 
215         events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
216             id: DeviceIdentifier(123),
217             name: "Device1".into(),
218             builtin: false,
219             device_stats: ProtoDeviceStats::new(),
220         }));
221 
222         // Wait for the other threads to process the message.
223         for handle in handles {
224             handle.join().unwrap();
225         }
226     }
227 
228     #[test]
229     // Test the case where the subscriber half of the channel returned
230     // by subscribe() is dropped. We expect the subscriber to be auto
231     // removed when send() notices an error.
test_publish_to_dropped_subscriber()232     fn test_publish_to_dropped_subscriber() {
233         let events = Events::new();
234         let rx = events.lock().unwrap().subscribe();
235         assert_eq!(events.lock().unwrap().subscribers.len(), 1);
236         std::mem::drop(rx);
237         events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
238             id: DeviceIdentifier(123),
239             name: "Device1".into(),
240             builtin: false,
241             device_stats: ProtoDeviceStats::new(),
242         }));
243         assert_eq!(events.lock().unwrap().subscribers.len(), 0);
244     }
245 }
246