xref: /aosp_15_r20/tools/netsim/rust/daemon/src/events.rs (revision cf78ab8cffb8fc9207af348f23af247fb04370a6)
1*cf78ab8cSAndroid Build Coastguard Worker // Copyright 2023 Google LLC
2*cf78ab8cSAndroid Build Coastguard Worker //
3*cf78ab8cSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
4*cf78ab8cSAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
5*cf78ab8cSAndroid Build Coastguard Worker // You may obtain a copy of the License at
6*cf78ab8cSAndroid Build Coastguard Worker //
7*cf78ab8cSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*cf78ab8cSAndroid Build Coastguard Worker //
9*cf78ab8cSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*cf78ab8cSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
11*cf78ab8cSAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*cf78ab8cSAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
13*cf78ab8cSAndroid Build Coastguard Worker // limitations under the License.
14*cf78ab8cSAndroid Build Coastguard Worker 
15*cf78ab8cSAndroid Build Coastguard Worker //! A module with mpmc channels for distributing global events.
16*cf78ab8cSAndroid Build Coastguard Worker 
17*cf78ab8cSAndroid Build Coastguard Worker use netsim_proto::common::ChipKind;
18*cf78ab8cSAndroid Build Coastguard Worker use std::sync::mpsc::{channel, Receiver, Sender};
19*cf78ab8cSAndroid Build Coastguard Worker 
20*cf78ab8cSAndroid Build Coastguard Worker use crate::devices::chip::ChipIdentifier;
21*cf78ab8cSAndroid Build Coastguard Worker use crate::devices::device::DeviceIdentifier;
22*cf78ab8cSAndroid Build Coastguard Worker use netsim_proto::stats::{
23*cf78ab8cSAndroid Build Coastguard Worker     NetsimDeviceStats as ProtoDeviceStats, NetsimRadioStats as ProtoRadioStats,
24*cf78ab8cSAndroid Build Coastguard Worker };
25*cf78ab8cSAndroid Build Coastguard Worker 
26*cf78ab8cSAndroid Build Coastguard Worker use std::sync::{Arc, Mutex, OnceLock};
27*cf78ab8cSAndroid Build Coastguard Worker 
28*cf78ab8cSAndroid Build Coastguard Worker // Publish the event to all subscribers
publish(event: Event)29*cf78ab8cSAndroid Build Coastguard Worker pub fn publish(event: Event) {
30*cf78ab8cSAndroid Build Coastguard Worker     get_events().lock().expect("Failed to acquire lock on events").publish(event);
31*cf78ab8cSAndroid Build Coastguard Worker }
32*cf78ab8cSAndroid Build Coastguard Worker 
33*cf78ab8cSAndroid Build Coastguard Worker // Subscribe to events over the receiver
subscribe() -> Receiver<Event>34*cf78ab8cSAndroid Build Coastguard Worker pub fn subscribe() -> Receiver<Event> {
35*cf78ab8cSAndroid Build Coastguard Worker     get_events().lock().expect("Failed to acquire locks on events").subscribe()
36*cf78ab8cSAndroid Build Coastguard Worker }
37*cf78ab8cSAndroid Build Coastguard Worker 
38*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
39*cf78ab8cSAndroid Build Coastguard Worker pub struct DeviceAdded {
40*cf78ab8cSAndroid Build Coastguard Worker     pub id: DeviceIdentifier,
41*cf78ab8cSAndroid Build Coastguard Worker     pub name: String,
42*cf78ab8cSAndroid Build Coastguard Worker     pub builtin: bool,
43*cf78ab8cSAndroid Build Coastguard Worker     pub device_stats: ProtoDeviceStats,
44*cf78ab8cSAndroid Build Coastguard Worker }
45*cf78ab8cSAndroid Build Coastguard Worker 
46*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
47*cf78ab8cSAndroid Build Coastguard Worker pub struct DeviceRemoved {
48*cf78ab8cSAndroid Build Coastguard Worker     pub id: DeviceIdentifier,
49*cf78ab8cSAndroid Build Coastguard Worker     pub name: String,
50*cf78ab8cSAndroid Build Coastguard Worker     pub builtin: bool,
51*cf78ab8cSAndroid Build Coastguard Worker }
52*cf78ab8cSAndroid Build Coastguard Worker 
53*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
54*cf78ab8cSAndroid Build Coastguard Worker pub struct DevicePatched {
55*cf78ab8cSAndroid Build Coastguard Worker     pub id: DeviceIdentifier,
56*cf78ab8cSAndroid Build Coastguard Worker     pub name: String,
57*cf78ab8cSAndroid Build Coastguard Worker }
58*cf78ab8cSAndroid Build Coastguard Worker 
59*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
60*cf78ab8cSAndroid Build Coastguard Worker pub struct ChipAdded {
61*cf78ab8cSAndroid Build Coastguard Worker     pub chip_id: ChipIdentifier,
62*cf78ab8cSAndroid Build Coastguard Worker     pub chip_kind: ChipKind,
63*cf78ab8cSAndroid Build Coastguard Worker     pub device_name: String,
64*cf78ab8cSAndroid Build Coastguard Worker     pub builtin: bool,
65*cf78ab8cSAndroid Build Coastguard Worker }
66*cf78ab8cSAndroid Build Coastguard Worker 
67*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
68*cf78ab8cSAndroid Build Coastguard Worker pub struct ChipRemoved {
69*cf78ab8cSAndroid Build Coastguard Worker     pub chip_id: ChipIdentifier,
70*cf78ab8cSAndroid Build Coastguard Worker     pub device_id: DeviceIdentifier,
71*cf78ab8cSAndroid Build Coastguard Worker     pub remaining_nonbuiltin_devices: usize,
72*cf78ab8cSAndroid Build Coastguard Worker     pub radio_stats: Vec<ProtoRadioStats>,
73*cf78ab8cSAndroid Build Coastguard Worker }
74*cf78ab8cSAndroid Build Coastguard Worker 
75*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug, Default)]
76*cf78ab8cSAndroid Build Coastguard Worker pub struct ShutDown {
77*cf78ab8cSAndroid Build Coastguard Worker     pub reason: String,
78*cf78ab8cSAndroid Build Coastguard Worker }
79*cf78ab8cSAndroid Build Coastguard Worker 
80*cf78ab8cSAndroid Build Coastguard Worker /// Event messages shared across various components in a loosely
81*cf78ab8cSAndroid Build Coastguard Worker /// coupled manner.
82*cf78ab8cSAndroid Build Coastguard Worker #[derive(Clone, Debug)]
83*cf78ab8cSAndroid Build Coastguard Worker pub enum Event {
84*cf78ab8cSAndroid Build Coastguard Worker     DeviceAdded(DeviceAdded),
85*cf78ab8cSAndroid Build Coastguard Worker     DeviceRemoved(DeviceRemoved),
86*cf78ab8cSAndroid Build Coastguard Worker     DevicePatched(DevicePatched),
87*cf78ab8cSAndroid Build Coastguard Worker     DeviceReset,
88*cf78ab8cSAndroid Build Coastguard Worker     ChipAdded(ChipAdded),
89*cf78ab8cSAndroid Build Coastguard Worker     ChipRemoved(ChipRemoved),
90*cf78ab8cSAndroid Build Coastguard Worker     ShutDown(ShutDown),
91*cf78ab8cSAndroid Build Coastguard Worker }
92*cf78ab8cSAndroid Build Coastguard Worker 
93*cf78ab8cSAndroid Build Coastguard Worker static EVENTS: OnceLock<Arc<Mutex<Events>>> = OnceLock::new();
94*cf78ab8cSAndroid Build Coastguard Worker 
get_events() -> Arc<Mutex<Events>>95*cf78ab8cSAndroid Build Coastguard Worker pub fn get_events() -> Arc<Mutex<Events>> {
96*cf78ab8cSAndroid Build Coastguard Worker     EVENTS.get_or_init(Events::new).clone()
97*cf78ab8cSAndroid Build Coastguard Worker }
98*cf78ab8cSAndroid Build Coastguard Worker 
99*cf78ab8cSAndroid Build Coastguard Worker /// A multi-producer, multi-consumer broadcast queue based on
100*cf78ab8cSAndroid Build Coastguard Worker /// `std::sync::mpsc`.
101*cf78ab8cSAndroid Build Coastguard Worker ///
102*cf78ab8cSAndroid Build Coastguard Worker /// Each Event message `published` is seen by all subscribers.
103*cf78ab8cSAndroid Build Coastguard Worker ///
104*cf78ab8cSAndroid Build Coastguard Worker /// Warning: invoke `subscribe()` before `publish()` or else messages
105*cf78ab8cSAndroid Build Coastguard Worker /// will be lost.
106*cf78ab8cSAndroid Build Coastguard Worker ///
107*cf78ab8cSAndroid Build Coastguard Worker pub struct Events {
108*cf78ab8cSAndroid Build Coastguard Worker     // For each subscriber this module retrain the sender half and the
109*cf78ab8cSAndroid Build Coastguard Worker     // subscriber reads events from the receiver half.
110*cf78ab8cSAndroid Build Coastguard Worker     subscribers: Vec<Sender<Event>>,
111*cf78ab8cSAndroid Build Coastguard Worker }
112*cf78ab8cSAndroid Build Coastguard Worker 
113*cf78ab8cSAndroid Build Coastguard Worker impl Events {
114*cf78ab8cSAndroid Build Coastguard Worker     // Events is always owned by multiple publishers and subscribers
115*cf78ab8cSAndroid Build Coastguard Worker     // across threads so return an Arc type.
new() -> Arc<Mutex<Events>>116*cf78ab8cSAndroid Build Coastguard Worker     fn new() -> Arc<Mutex<Events>> {
117*cf78ab8cSAndroid Build Coastguard Worker         Arc::new(Mutex::new(Self { subscribers: Vec::new() }))
118*cf78ab8cSAndroid Build Coastguard Worker     }
119*cf78ab8cSAndroid Build Coastguard Worker 
120*cf78ab8cSAndroid Build Coastguard Worker     // Creates a new asynchronous channel, returning the receiver
121*cf78ab8cSAndroid Build Coastguard Worker     // half. All `Event` messages sent through `publish` will become
122*cf78ab8cSAndroid Build Coastguard Worker     // available on the receiver in the same order as it was sent.
subscribe(&mut self) -> Receiver<Event>123*cf78ab8cSAndroid Build Coastguard Worker     fn subscribe(&mut self) -> Receiver<Event> {
124*cf78ab8cSAndroid Build Coastguard Worker         let (tx, rx) = channel::<Event>();
125*cf78ab8cSAndroid Build Coastguard Worker         self.subscribers.push(tx);
126*cf78ab8cSAndroid Build Coastguard Worker         rx
127*cf78ab8cSAndroid Build Coastguard Worker     }
128*cf78ab8cSAndroid Build Coastguard Worker 
129*cf78ab8cSAndroid Build Coastguard Worker     // Attempts to send an Event on the events channel.
publish(&mut self, msg: Event)130*cf78ab8cSAndroid Build Coastguard Worker     pub fn publish(&mut self, msg: Event) {
131*cf78ab8cSAndroid Build Coastguard Worker         if self.subscribers.is_empty() {
132*cf78ab8cSAndroid Build Coastguard Worker             log::warn!("No Subscribers to the event: {msg:?}");
133*cf78ab8cSAndroid Build Coastguard Worker         } else {
134*cf78ab8cSAndroid Build Coastguard Worker             // Any channel with a disconnected receiver will return an
135*cf78ab8cSAndroid Build Coastguard Worker             // error and be removed by retain.
136*cf78ab8cSAndroid Build Coastguard Worker             log::info!("{msg:?}");
137*cf78ab8cSAndroid Build Coastguard Worker             self.subscribers.retain(|subscriber| subscriber.send(msg.clone()).is_ok())
138*cf78ab8cSAndroid Build Coastguard Worker         }
139*cf78ab8cSAndroid Build Coastguard Worker     }
140*cf78ab8cSAndroid Build Coastguard Worker }
141*cf78ab8cSAndroid Build Coastguard Worker 
142*cf78ab8cSAndroid Build Coastguard Worker // Test public functions to allow testing with local Events struct.
143*cf78ab8cSAndroid Build Coastguard Worker #[cfg(test)]
144*cf78ab8cSAndroid Build Coastguard Worker pub mod test {
145*cf78ab8cSAndroid Build Coastguard Worker     use super::*;
146*cf78ab8cSAndroid Build Coastguard Worker 
new() -> Arc<Mutex<Events>>147*cf78ab8cSAndroid Build Coastguard Worker     pub fn new() -> Arc<Mutex<Events>> {
148*cf78ab8cSAndroid Build Coastguard Worker         Events::new()
149*cf78ab8cSAndroid Build Coastguard Worker     }
150*cf78ab8cSAndroid Build Coastguard Worker 
publish(s: &mut Arc<Mutex<Events>>, msg: Event)151*cf78ab8cSAndroid Build Coastguard Worker     pub fn publish(s: &mut Arc<Mutex<Events>>, msg: Event) {
152*cf78ab8cSAndroid Build Coastguard Worker         s.lock().unwrap().publish(msg);
153*cf78ab8cSAndroid Build Coastguard Worker     }
154*cf78ab8cSAndroid Build Coastguard Worker 
subscribe(s: &mut Arc<Mutex<Events>>) -> Receiver<Event>155*cf78ab8cSAndroid Build Coastguard Worker     pub fn subscribe(s: &mut Arc<Mutex<Events>>) -> Receiver<Event> {
156*cf78ab8cSAndroid Build Coastguard Worker         s.lock().unwrap().subscribe()
157*cf78ab8cSAndroid Build Coastguard Worker     }
158*cf78ab8cSAndroid Build Coastguard Worker }
159*cf78ab8cSAndroid Build Coastguard Worker 
160*cf78ab8cSAndroid Build Coastguard Worker #[cfg(test)]
161*cf78ab8cSAndroid Build Coastguard Worker mod tests {
162*cf78ab8cSAndroid Build Coastguard Worker     use super::Events;
163*cf78ab8cSAndroid Build Coastguard Worker     use super::*;
164*cf78ab8cSAndroid Build Coastguard Worker     use std::sync::Arc;
165*cf78ab8cSAndroid Build Coastguard Worker     use std::thread;
166*cf78ab8cSAndroid Build Coastguard Worker 
167*cf78ab8cSAndroid Build Coastguard Worker     #[test]
test_subscribe_and_publish()168*cf78ab8cSAndroid Build Coastguard Worker     fn test_subscribe_and_publish() {
169*cf78ab8cSAndroid Build Coastguard Worker         let events = Events::new();
170*cf78ab8cSAndroid Build Coastguard Worker 
171*cf78ab8cSAndroid Build Coastguard Worker         let events_clone = Arc::clone(&events);
172*cf78ab8cSAndroid Build Coastguard Worker         let rx = events_clone.lock().unwrap().subscribe();
173*cf78ab8cSAndroid Build Coastguard Worker         let handle = thread::spawn(move || match rx.recv() {
174*cf78ab8cSAndroid Build Coastguard Worker             Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => {
175*cf78ab8cSAndroid Build Coastguard Worker                 assert_eq!(id.0, 123);
176*cf78ab8cSAndroid Build Coastguard Worker                 assert_eq!(name, "Device1");
177*cf78ab8cSAndroid Build Coastguard Worker                 assert!(!builtin);
178*cf78ab8cSAndroid Build Coastguard Worker                 assert_eq!(device_stats, ProtoDeviceStats::new());
179*cf78ab8cSAndroid Build Coastguard Worker             }
180*cf78ab8cSAndroid Build Coastguard Worker             _ => panic!("Unexpected event"),
181*cf78ab8cSAndroid Build Coastguard Worker         });
182*cf78ab8cSAndroid Build Coastguard Worker 
183*cf78ab8cSAndroid Build Coastguard Worker         events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
184*cf78ab8cSAndroid Build Coastguard Worker             id: DeviceIdentifier(123),
185*cf78ab8cSAndroid Build Coastguard Worker             name: "Device1".into(),
186*cf78ab8cSAndroid Build Coastguard Worker             builtin: false,
187*cf78ab8cSAndroid Build Coastguard Worker             device_stats: ProtoDeviceStats::new(),
188*cf78ab8cSAndroid Build Coastguard Worker         }));
189*cf78ab8cSAndroid Build Coastguard Worker 
190*cf78ab8cSAndroid Build Coastguard Worker         // Wait for the other thread to process the message.
191*cf78ab8cSAndroid Build Coastguard Worker         handle.join().unwrap();
192*cf78ab8cSAndroid Build Coastguard Worker     }
193*cf78ab8cSAndroid Build Coastguard Worker 
194*cf78ab8cSAndroid Build Coastguard Worker     #[test]
test_publish_to_multiple_subscribers()195*cf78ab8cSAndroid Build Coastguard Worker     fn test_publish_to_multiple_subscribers() {
196*cf78ab8cSAndroid Build Coastguard Worker         let events = Events::new();
197*cf78ab8cSAndroid Build Coastguard Worker 
198*cf78ab8cSAndroid Build Coastguard Worker         let num_subscribers = 10;
199*cf78ab8cSAndroid Build Coastguard Worker         let mut handles = Vec::with_capacity(num_subscribers);
200*cf78ab8cSAndroid Build Coastguard Worker         for _ in 0..num_subscribers {
201*cf78ab8cSAndroid Build Coastguard Worker             let events_clone = Arc::clone(&events);
202*cf78ab8cSAndroid Build Coastguard Worker             let rx = events_clone.lock().unwrap().subscribe();
203*cf78ab8cSAndroid Build Coastguard Worker             let handle = thread::spawn(move || match rx.recv() {
204*cf78ab8cSAndroid Build Coastguard Worker                 Ok(Event::DeviceAdded(DeviceAdded { id, name, builtin, device_stats })) => {
205*cf78ab8cSAndroid Build Coastguard Worker                     assert_eq!(id.0, 123);
206*cf78ab8cSAndroid Build Coastguard Worker                     assert_eq!(name, "Device1");
207*cf78ab8cSAndroid Build Coastguard Worker                     assert!(!builtin);
208*cf78ab8cSAndroid Build Coastguard Worker                     assert_eq!(device_stats, ProtoDeviceStats::new());
209*cf78ab8cSAndroid Build Coastguard Worker                 }
210*cf78ab8cSAndroid Build Coastguard Worker                 _ => panic!("Unexpected event"),
211*cf78ab8cSAndroid Build Coastguard Worker             });
212*cf78ab8cSAndroid Build Coastguard Worker             handles.push(handle);
213*cf78ab8cSAndroid Build Coastguard Worker         }
214*cf78ab8cSAndroid Build Coastguard Worker 
215*cf78ab8cSAndroid Build Coastguard Worker         events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
216*cf78ab8cSAndroid Build Coastguard Worker             id: DeviceIdentifier(123),
217*cf78ab8cSAndroid Build Coastguard Worker             name: "Device1".into(),
218*cf78ab8cSAndroid Build Coastguard Worker             builtin: false,
219*cf78ab8cSAndroid Build Coastguard Worker             device_stats: ProtoDeviceStats::new(),
220*cf78ab8cSAndroid Build Coastguard Worker         }));
221*cf78ab8cSAndroid Build Coastguard Worker 
222*cf78ab8cSAndroid Build Coastguard Worker         // Wait for the other threads to process the message.
223*cf78ab8cSAndroid Build Coastguard Worker         for handle in handles {
224*cf78ab8cSAndroid Build Coastguard Worker             handle.join().unwrap();
225*cf78ab8cSAndroid Build Coastguard Worker         }
226*cf78ab8cSAndroid Build Coastguard Worker     }
227*cf78ab8cSAndroid Build Coastguard Worker 
228*cf78ab8cSAndroid Build Coastguard Worker     #[test]
229*cf78ab8cSAndroid Build Coastguard Worker     // Test the case where the subscriber half of the channel returned
230*cf78ab8cSAndroid Build Coastguard Worker     // by subscribe() is dropped. We expect the subscriber to be auto
231*cf78ab8cSAndroid Build Coastguard Worker     // removed when send() notices an error.
test_publish_to_dropped_subscriber()232*cf78ab8cSAndroid Build Coastguard Worker     fn test_publish_to_dropped_subscriber() {
233*cf78ab8cSAndroid Build Coastguard Worker         let events = Events::new();
234*cf78ab8cSAndroid Build Coastguard Worker         let rx = events.lock().unwrap().subscribe();
235*cf78ab8cSAndroid Build Coastguard Worker         assert_eq!(events.lock().unwrap().subscribers.len(), 1);
236*cf78ab8cSAndroid Build Coastguard Worker         std::mem::drop(rx);
237*cf78ab8cSAndroid Build Coastguard Worker         events.lock().unwrap().publish(Event::DeviceAdded(DeviceAdded {
238*cf78ab8cSAndroid Build Coastguard Worker             id: DeviceIdentifier(123),
239*cf78ab8cSAndroid Build Coastguard Worker             name: "Device1".into(),
240*cf78ab8cSAndroid Build Coastguard Worker             builtin: false,
241*cf78ab8cSAndroid Build Coastguard Worker             device_stats: ProtoDeviceStats::new(),
242*cf78ab8cSAndroid Build Coastguard Worker         }));
243*cf78ab8cSAndroid Build Coastguard Worker         assert_eq!(events.lock().unwrap().subscribers.len(), 0);
244*cf78ab8cSAndroid Build Coastguard Worker     }
245*cf78ab8cSAndroid Build Coastguard Worker }
246