1 // Copyright 2023 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::future::Future; 6 use std::io::Result; 7 use std::pin::Pin; 8 use std::sync::atomic::AtomicI32; 9 use std::sync::atomic::Ordering; 10 use std::sync::Arc; 11 use std::sync::Weak; 12 use std::task::Context; 13 use std::task::Poll; 14 15 use async_task::Task; 16 use base::warn; 17 use base::AsRawDescriptor; 18 use base::AsRawDescriptors; 19 use base::RawDescriptor; 20 use futures::task::noop_waker; 21 use pin_utils::pin_mut; 22 use sync::Mutex; 23 24 use crate::queue::RunnableQueue; 25 use crate::waker::WeakWake; 26 use crate::AsyncError; 27 use crate::AsyncResult; 28 use crate::BlockingPool; 29 use crate::DetachedTasks; 30 use crate::ExecutorTrait; 31 use crate::IntoAsync; 32 use crate::IoSource; 33 use crate::TaskHandle; 34 35 /// Abstraction for IO backends. 36 pub trait Reactor: Send + Sync + Sized { new() -> Result<Self>37 fn new() -> Result<Self>; 38 39 /// Called when the executor is being dropped to allow orderly shutdown (e.g. cancelling IO 40 /// work). The returned future will be run to completion. 41 /// 42 /// Note that, since this is called from `RawExecutor::drop`, there will not be any 43 /// `Arc<Executor>` left, so weak references to the executor will always fail to upgrade at 44 /// this point. Reactors can potentially make use of this fact to keep more IO work from being 45 /// submitted. on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>46 fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>; 47 48 /// Called when an executor run loop starts on a thread. on_thread_start(&self)49 fn on_thread_start(&self) {} 50 51 /// Block until an event occurs (e.g. IO work is ready) or until `wake` is called. 52 /// 53 /// As an optimization, `set_processing` should be called immediately after wake up (i.e. 54 /// before any book keeping is done) so that concurrent calls to wakers can safely skip making 55 /// redundant calls to `Reactor::wake`. wait_for_work(&self, set_processing: impl Fn()) -> Result<()>56 fn wait_for_work(&self, set_processing: impl Fn()) -> Result<()>; 57 58 /// Wake up any pending `wait_for_work` calls. If there are none pending, then wake up the next 59 /// `wait_for_work` call (necessary to avoid race conditions). wake(&self)60 fn wake(&self); 61 62 /// Create an `IoSource` for the backend. new_source<F: AsRawDescriptor>( &self, ex: &Arc<RawExecutor<Self>>, f: F, ) -> AsyncResult<IoSource<F>>63 fn new_source<F: AsRawDescriptor>( 64 &self, 65 ex: &Arc<RawExecutor<Self>>, 66 f: F, 67 ) -> AsyncResult<IoSource<F>>; 68 wrap_task_handle<R>(task: RawTaskHandle<Self, R>) -> TaskHandle<R>69 fn wrap_task_handle<R>(task: RawTaskHandle<Self, R>) -> TaskHandle<R>; 70 } 71 72 // Indicates the executor is either within or about to make a `Reactor::wait_for_work` call. When a 73 // waker sees this value, it will call `Reactor::wake`. 74 const WAITING: i32 = 0x1d5b_c019u32 as i32; 75 76 // Indicates the executor is processing futures. 77 const PROCESSING: i32 = 0xd474_77bcu32 as i32; 78 79 // Indicates one or more futures may be ready to make progress (i.e. causes the main loop to return 80 // diretly to PROCESSING instead of WAITING). 81 const WOKEN: i32 = 0x3e4d_3276u32 as i32; 82 83 pub struct RawExecutor<Re: Reactor + 'static> { 84 pub reactor: Re, 85 queue: RunnableQueue, 86 blocking_pool: BlockingPool, 87 state: AtomicI32, 88 detached_tasks: Mutex<DetachedTasks>, 89 } 90 91 impl<Re: Reactor> RawExecutor<Re> { new_with(reactor: Re) -> AsyncResult<Arc<Self>>92 pub fn new_with(reactor: Re) -> AsyncResult<Arc<Self>> { 93 Ok(Arc::new(RawExecutor { 94 reactor, 95 queue: RunnableQueue::new(), 96 blocking_pool: Default::default(), 97 state: AtomicI32::new(PROCESSING), 98 detached_tasks: Mutex::new(DetachedTasks::new()), 99 })) 100 } 101 new() -> AsyncResult<Arc<Self>>102 pub fn new() -> AsyncResult<Arc<Self>> { 103 Self::new_with(Re::new().map_err(AsyncError::Io)?) 104 } 105 wake(&self)106 fn wake(&self) { 107 let oldstate = self.state.swap(WOKEN, Ordering::AcqRel); 108 if oldstate == WAITING { 109 self.reactor.wake(); 110 } 111 } 112 run_internal<F: Future>(&self, cx: &mut Context, done: F) -> AsyncResult<F::Output>113 fn run_internal<F: Future>(&self, cx: &mut Context, done: F) -> AsyncResult<F::Output> { 114 self.reactor.on_thread_start(); 115 116 pin_mut!(done); 117 118 loop { 119 self.state.store(PROCESSING, Ordering::Release); 120 for runnable in self.queue.iter() { 121 runnable.run(); 122 } 123 124 if let Ok(mut tasks) = self.detached_tasks.try_lock() { 125 tasks.poll(cx); 126 } 127 128 if let Poll::Ready(val) = done.as_mut().poll(cx) { 129 return Ok(val); 130 } 131 132 let oldstate = self.state.compare_exchange( 133 PROCESSING, 134 WAITING, 135 Ordering::AcqRel, 136 Ordering::Acquire, 137 ); 138 if let Err(oldstate) = oldstate { 139 debug_assert_eq!(oldstate, WOKEN); 140 // One or more futures have become runnable. 141 continue; 142 } 143 144 self.reactor 145 .wait_for_work(|| self.state.store(PROCESSING, Ordering::Release)) 146 .map_err(AsyncError::Io)?; 147 } 148 } 149 } 150 151 impl<Re: Reactor + 'static> ExecutorTrait for Arc<RawExecutor<Re>> { async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>152 fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> { 153 self.reactor.new_source(self, f) 154 } 155 spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,156 fn spawn<F>(&self, f: F) -> TaskHandle<F::Output> 157 where 158 F: Future + Send + 'static, 159 F::Output: Send + 'static, 160 { 161 let raw = Arc::downgrade(self); 162 let schedule = move |runnable| { 163 if let Some(r) = raw.upgrade() { 164 r.queue.push_back(runnable); 165 r.wake(); 166 } 167 }; 168 let (runnable, task) = async_task::spawn(f, schedule); 169 runnable.schedule(); 170 Re::wrap_task_handle(RawTaskHandle { 171 task, 172 raw: Arc::downgrade(self), 173 }) 174 } 175 spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,176 fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> 177 where 178 F: Future + 'static, 179 F::Output: 'static, 180 { 181 let raw = Arc::downgrade(self); 182 let schedule = move |runnable| { 183 if let Some(r) = raw.upgrade() { 184 r.queue.push_back(runnable); 185 r.wake(); 186 } 187 }; 188 let (runnable, task) = async_task::spawn_local(f, schedule); 189 runnable.schedule(); 190 Re::wrap_task_handle(RawTaskHandle { 191 task, 192 raw: Arc::downgrade(self), 193 }) 194 } 195 spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,196 fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> 197 where 198 F: FnOnce() -> R + Send + 'static, 199 R: Send + 'static, 200 { 201 self.spawn(self.blocking_pool.spawn(f)) 202 } 203 run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>204 fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> { 205 let waker = super::waker::new_waker(Arc::downgrade(self)); 206 let mut ctx = Context::from_waker(&waker); 207 208 self.run_internal(&mut ctx, f) 209 } 210 } 211 212 impl<Re: Reactor + AsRawDescriptors> AsRawDescriptors for RawExecutor<Re> { as_raw_descriptors(&self) -> Vec<RawDescriptor>213 fn as_raw_descriptors(&self) -> Vec<RawDescriptor> { 214 self.reactor.as_raw_descriptors() 215 } 216 } 217 218 impl<Re: Reactor> WeakWake for RawExecutor<Re> { wake_by_ref(weak_self: &Weak<Self>)219 fn wake_by_ref(weak_self: &Weak<Self>) { 220 if let Some(arc_self) = weak_self.upgrade() { 221 RawExecutor::wake(&arc_self); 222 } 223 } 224 } 225 226 impl<Re: Reactor> Drop for RawExecutor<Re> { drop(&mut self)227 fn drop(&mut self) { 228 let final_future = self.reactor.on_executor_drop(); 229 230 let waker = noop_waker(); 231 let mut cx = Context::from_waker(&waker); 232 if let Err(e) = self.run_internal(&mut cx, final_future) { 233 warn!("Failed to drive RawExecutor to completion: {}", e); 234 } 235 } 236 } 237 238 pub struct RawTaskHandle<Re: Reactor + 'static, R> { 239 task: Task<R>, 240 raw: Weak<RawExecutor<Re>>, 241 } 242 243 impl<Re: Reactor, R: Send + 'static> RawTaskHandle<Re, R> { detach(self)244 pub fn detach(self) { 245 if let Some(raw) = self.raw.upgrade() { 246 raw.detached_tasks.lock().push(self.task); 247 } 248 } 249 cancel(self) -> Option<R>250 pub async fn cancel(self) -> Option<R> { 251 self.task.cancel().await 252 } 253 } 254 255 impl<Re: Reactor, R: 'static> Future for RawTaskHandle<Re, R> { 256 type Output = R; 257 poll( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context, ) -> std::task::Poll<Self::Output>258 fn poll( 259 mut self: std::pin::Pin<&mut Self>, 260 cx: &mut std::task::Context, 261 ) -> std::task::Poll<Self::Output> { 262 Pin::new(&mut self.task).poll(cx) 263 } 264 } 265