1 // Copyright 2022, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! The core event loop for Rust modules. Here Rust modules are started in
16 //! dependency order.
17 
18 use gatt::{channel::AttTransport, GattCallbacks};
19 use log::{info, warn};
20 use tokio::task::LocalSet;
21 
22 use std::{rc::Rc, sync::Mutex};
23 use tokio::runtime::Builder;
24 
25 use tokio::sync::mpsc;
26 
27 pub mod core;
28 pub mod gatt;
29 pub mod packets;
30 pub mod utils;
31 
32 /// The owner of the main Rust thread on which all Rust modules run
33 struct GlobalModuleRegistry {
34     pub task_tx: MainThreadTx,
35 }
36 
37 /// The ModuleViews lets us access all publicly accessible Rust modules from
38 /// Java / C++ while the stack is running. If a module should not be exposed
39 /// outside of Rust GD, there is no need to include it here.
40 pub struct ModuleViews<'a> {
41     /// Lets us call out into C++
42     pub gatt_outgoing_callbacks: Rc<dyn GattCallbacks>,
43     /// Receives synchronous callbacks from JNI
44     pub gatt_incoming_callbacks: Rc<gatt::callbacks::CallbackTransactionManager>,
45     /// Proxies calls into GATT server
46     pub gatt_module: &'a mut gatt::server::GattModule,
47 }
48 
49 static GLOBAL_MODULE_REGISTRY: Mutex<Option<GlobalModuleRegistry>> = Mutex::new(None);
50 
51 impl GlobalModuleRegistry {
52     /// Handles bringup of all Rust modules. This occurs after GD C++ modules
53     /// have started, but before the legacy stack has initialized.
54     /// Must be invoked from the Rust thread after JNI initializes it and passes
55     /// in JNI modules.
start( gatt_callbacks: Rc<dyn GattCallbacks>, att_transport: Rc<dyn AttTransport>, on_started: impl FnOnce(), )56     pub fn start(
57         gatt_callbacks: Rc<dyn GattCallbacks>,
58         att_transport: Rc<dyn AttTransport>,
59         on_started: impl FnOnce(),
60     ) {
61         info!("starting Rust modules");
62         let rt = Builder::new_current_thread()
63             .enable_all()
64             .build()
65             .expect("failed to start tokio runtime");
66         let local = LocalSet::new();
67 
68         let (tx, mut rx) = mpsc::unbounded_channel();
69         let prev_registry = GLOBAL_MODULE_REGISTRY.lock().unwrap().replace(Self { task_tx: tx });
70 
71         // initialization should only happen once
72         assert!(prev_registry.is_none());
73 
74         // First, setup FFI and C++ modules
75         let arbiter = gatt::arbiter::initialize_arbiter();
76 
77         // Now enter the runtime
78         local.block_on(&rt, async move {
79             // Then follow the pure-Rust modules
80             let gatt_incoming_callbacks =
81                 Rc::new(gatt::callbacks::CallbackTransactionManager::new(gatt_callbacks.clone()));
82             let gatt_module = &mut gatt::server::GattModule::new(att_transport.clone(), arbiter);
83 
84             // All modules that are visible from incoming JNI / top-level interfaces should
85             // be exposed here
86             let mut modules = ModuleViews {
87                 gatt_outgoing_callbacks: gatt_callbacks,
88                 gatt_incoming_callbacks,
89                 gatt_module,
90             };
91 
92             // notify upper layer that we are ready to receive messages
93             on_started();
94 
95             // This is the core event loop that serializes incoming requests into the Rust
96             // thread do_in_rust_thread lets us post into here from foreign
97             // threads
98             info!("starting Tokio event loop");
99             while let Some(message) = rx.recv().await {
100                 match message {
101                     MainThreadTxMessage::Callback(f) => f(&mut modules),
102                     MainThreadTxMessage::Stop => {
103                         break;
104                     }
105                 }
106             }
107         });
108         warn!("Rust thread queue has stopped, shutting down executor thread");
109         GLOBAL_MODULE_REGISTRY.lock().unwrap().take();
110         gatt::arbiter::clean_arbiter();
111     }
112 }
113 
114 type BoxedMainThreadCallback = Box<dyn for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static>;
115 enum MainThreadTxMessage {
116     Callback(BoxedMainThreadCallback),
117     Stop,
118 }
119 type MainThreadTx = mpsc::UnboundedSender<MainThreadTxMessage>;
120 
121 thread_local! {
122     /// The TX end of a channel into the Rust thread, so external callers can
123     /// access Rust modules. JNI / direct FFI should use do_in_rust_thread for
124     /// convenience, but objects passed into C++ as callbacks should
125     /// clone this channel to fail loudly if it's not yet initialized.
126     ///
127     /// This will be lazily initialized on first use from each client thread
128     static MAIN_THREAD_TX: MainThreadTx =
129         GLOBAL_MODULE_REGISTRY.lock().unwrap().as_ref().expect("stack not initialized").task_tx.clone();
130 }
131 
132 /// Posts a callback to the Rust thread and gives it access to public Rust
133 /// modules, used from JNI.
134 ///
135 /// Do not call this from Rust modules / the Rust thread! Instead, Rust modules
136 /// should receive references to their dependent modules at startup. If passing
137 /// callbacks into C++, don't use this method either - instead, acquire a clone
138 /// of MAIN_THREAD_TX when the callback is created. This ensures that there
139 /// never are "invalid" callbacks that may still work depending on when the
140 /// GLOBAL_MODULE_REGISTRY is initialized.
do_in_rust_thread<F>(f: F) where F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static,141 pub fn do_in_rust_thread<F>(f: F)
142 where
143     F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static,
144 {
145     let ret = MAIN_THREAD_TX.with(|tx| tx.send(MainThreadTxMessage::Callback(Box::new(f))));
146     if ret.is_err() {
147         panic!("Rust call failed");
148     }
149 }
150