1 use crate::api::icd::*; 2 use crate::api::types::*; 3 use crate::core::context::*; 4 use crate::core::queue::*; 5 use crate::impl_cl_type_trait; 6 7 use mesa_rust::pipe::query::*; 8 use mesa_rust_gen::*; 9 use mesa_rust_util::static_assert; 10 use rusticl_opencl_gen::*; 11 12 use std::collections::HashSet; 13 use std::mem; 14 use std::sync::Arc; 15 use std::sync::Condvar; 16 use std::sync::Mutex; 17 use std::sync::MutexGuard; 18 use std::time::Duration; 19 20 // we assert that those are a continous range of numbers so we won't have to use HashMaps 21 static_assert!(CL_COMPLETE == 0); 22 static_assert!(CL_RUNNING == 1); 23 static_assert!(CL_SUBMITTED == 2); 24 static_assert!(CL_QUEUED == 3); 25 26 pub type EventSig = Box<dyn FnOnce(&Arc<Queue>, &QueueContext) -> CLResult<()> + Send + Sync>; 27 28 pub enum EventTimes { 29 Queued = CL_PROFILING_COMMAND_QUEUED as isize, 30 Submit = CL_PROFILING_COMMAND_SUBMIT as isize, 31 Start = CL_PROFILING_COMMAND_START as isize, 32 End = CL_PROFILING_COMMAND_END as isize, 33 } 34 35 #[derive(Default)] 36 struct EventMutState { 37 status: cl_int, 38 cbs: [Vec<EventCB>; 3], 39 work: Option<EventSig>, 40 time_queued: cl_ulong, 41 time_submit: cl_ulong, 42 time_start: cl_ulong, 43 time_end: cl_ulong, 44 } 45 46 pub struct Event { 47 pub base: CLObjectBase<CL_INVALID_EVENT>, 48 pub context: Arc<Context>, 49 pub queue: Option<Arc<Queue>>, 50 pub cmd_type: cl_command_type, 51 pub deps: Vec<Arc<Event>>, 52 state: Mutex<EventMutState>, 53 cv: Condvar, 54 } 55 56 impl_cl_type_trait!(cl_event, Event, CL_INVALID_EVENT); 57 58 impl Event { new( queue: &Arc<Queue>, cmd_type: cl_command_type, deps: Vec<Arc<Event>>, work: EventSig, ) -> Arc<Event>59 pub fn new( 60 queue: &Arc<Queue>, 61 cmd_type: cl_command_type, 62 deps: Vec<Arc<Event>>, 63 work: EventSig, 64 ) -> Arc<Event> { 65 Arc::new(Self { 66 base: CLObjectBase::new(RusticlTypes::Event), 67 context: queue.context.clone(), 68 queue: Some(queue.clone()), 69 cmd_type: cmd_type, 70 deps: deps, 71 state: Mutex::new(EventMutState { 72 status: CL_QUEUED as cl_int, 73 work: Some(work), 74 ..Default::default() 75 }), 76 cv: Condvar::new(), 77 }) 78 } 79 new_user(context: Arc<Context>) -> Arc<Event>80 pub fn new_user(context: Arc<Context>) -> Arc<Event> { 81 Arc::new(Self { 82 base: CLObjectBase::new(RusticlTypes::Event), 83 context: context, 84 queue: None, 85 cmd_type: CL_COMMAND_USER, 86 deps: Vec::new(), 87 state: Mutex::new(EventMutState { 88 status: CL_SUBMITTED as cl_int, 89 ..Default::default() 90 }), 91 cv: Condvar::new(), 92 }) 93 } 94 state(&self) -> MutexGuard<EventMutState>95 fn state(&self) -> MutexGuard<EventMutState> { 96 self.state.lock().unwrap() 97 } 98 status(&self) -> cl_int99 pub fn status(&self) -> cl_int { 100 self.state().status 101 } 102 set_status(&self, mut lock: MutexGuard<EventMutState>, new: cl_int)103 fn set_status(&self, mut lock: MutexGuard<EventMutState>, new: cl_int) { 104 lock.status = new; 105 106 // signal on completion or an error 107 if new <= CL_COMPLETE as cl_int { 108 self.cv.notify_all(); 109 } 110 111 // errors we treat as CL_COMPLETE 112 let cb_max = if new < 0 { CL_COMPLETE } else { new as u32 }; 113 114 // there are only callbacks for those 115 if ![CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&cb_max) { 116 return; 117 } 118 119 let mut cbs = Vec::new(); 120 // Collect all cbs we need to call first. Technically it is not required to call them in 121 // order, but let's be nice to applications as it's for free 122 for idx in (cb_max..=CL_SUBMITTED).rev() { 123 cbs.extend( 124 // use mem::take as each callback is only supposed to be called exactly once 125 mem::take(&mut lock.cbs[idx as usize]) 126 .into_iter() 127 // we need to save the status this cb was registered with 128 .map(|cb| (idx as cl_int, cb)), 129 ); 130 } 131 132 // applications might want to access the event in the callback, so drop the lock before 133 // calling into the callbacks. 134 drop(lock); 135 136 for (idx, cb) in cbs { 137 // from the CL spec: 138 // 139 // event_command_status is equal to the command_exec_callback_type used while 140 // registering the callback. [...] If the callback is called as the result of the 141 // command associated with event being abnormally terminated, an appropriate error code 142 // for the error that caused the termination will be passed to event_command_status 143 // instead. 144 let status = if new < 0 { new } else { idx }; 145 cb.call(self, status); 146 } 147 } 148 set_user_status(&self, status: cl_int)149 pub fn set_user_status(&self, status: cl_int) { 150 self.set_status(self.state(), status); 151 } 152 is_error(&self) -> bool153 pub fn is_error(&self) -> bool { 154 self.status() < 0 155 } 156 is_user(&self) -> bool157 pub fn is_user(&self) -> bool { 158 self.cmd_type == CL_COMMAND_USER 159 } 160 set_time(&self, which: EventTimes, value: cl_ulong)161 pub fn set_time(&self, which: EventTimes, value: cl_ulong) { 162 let mut lock = self.state(); 163 match which { 164 EventTimes::Queued => lock.time_queued = value, 165 EventTimes::Submit => lock.time_submit = value, 166 EventTimes::Start => lock.time_start = value, 167 EventTimes::End => lock.time_end = value, 168 } 169 } 170 get_time(&self, which: EventTimes) -> cl_ulong171 pub fn get_time(&self, which: EventTimes) -> cl_ulong { 172 let lock = self.state(); 173 174 match which { 175 EventTimes::Queued => lock.time_queued, 176 EventTimes::Submit => lock.time_submit, 177 EventTimes::Start => lock.time_start, 178 EventTimes::End => lock.time_end, 179 } 180 } 181 add_cb(&self, state: cl_int, cb: EventCB)182 pub fn add_cb(&self, state: cl_int, cb: EventCB) { 183 let mut lock = self.state(); 184 let status = lock.status; 185 186 // call cb if the status was already reached 187 if state >= status { 188 drop(lock); 189 cb.call(self, state); 190 } else { 191 lock.cbs.get_mut(state as usize).unwrap().push(cb); 192 } 193 } 194 signal(&self)195 pub(super) fn signal(&self) { 196 let state = self.state(); 197 // we don't want to call signal on errored events, but if that still happens, handle it 198 // gracefully 199 debug_assert_eq!(state.status, CL_SUBMITTED as cl_int); 200 if state.status < 0 { 201 return; 202 } 203 self.set_status(state, CL_RUNNING as cl_int); 204 self.set_status(self.state(), CL_COMPLETE as cl_int); 205 } 206 wait(&self) -> cl_int207 pub fn wait(&self) -> cl_int { 208 let mut lock = self.state(); 209 while lock.status >= CL_RUNNING as cl_int { 210 lock = self 211 .cv 212 .wait_timeout(lock, Duration::from_secs(1)) 213 .unwrap() 214 .0; 215 } 216 lock.status 217 } 218 219 // We always assume that work here simply submits stuff to the hardware even if it's just doing 220 // sw emulation or nothing at all. 221 // If anything requets waiting, we will update the status through fencing later. call(&self, ctx: &QueueContext) -> cl_int222 pub fn call(&self, ctx: &QueueContext) -> cl_int { 223 let mut lock = self.state(); 224 let mut status = lock.status; 225 let queue = self.queue.as_ref().unwrap(); 226 let profiling_enabled = queue.is_profiling_enabled(); 227 if status == CL_QUEUED as cl_int { 228 if profiling_enabled { 229 // We already have the lock so can't call set_time on the event 230 lock.time_submit = queue.device.screen().get_timestamp(); 231 } 232 let mut query_start = None; 233 let mut query_end = None; 234 status = lock.work.take().map_or( 235 // if there is no work 236 CL_SUBMITTED as cl_int, 237 |w| { 238 if profiling_enabled { 239 query_start = 240 PipeQueryGen::<{ pipe_query_type::PIPE_QUERY_TIMESTAMP }>::new(ctx); 241 } 242 243 let res = w(queue, ctx).err().map_or( 244 // return the error if there is one 245 CL_SUBMITTED as cl_int, 246 |e| e, 247 ); 248 if profiling_enabled { 249 query_end = 250 PipeQueryGen::<{ pipe_query_type::PIPE_QUERY_TIMESTAMP }>::new(ctx); 251 } 252 res 253 }, 254 ); 255 256 if profiling_enabled { 257 lock.time_start = query_start.unwrap().read_blocked(); 258 lock.time_end = query_end.unwrap().read_blocked(); 259 } 260 self.set_status(lock, status); 261 } 262 status 263 } 264 deep_unflushed_deps_impl<'a>(&'a self, result: &mut HashSet<&'a Event>)265 fn deep_unflushed_deps_impl<'a>(&'a self, result: &mut HashSet<&'a Event>) { 266 if self.status() <= CL_SUBMITTED as i32 { 267 return; 268 } 269 270 // only scan dependencies if it's a new one 271 if result.insert(self) { 272 for e in &self.deps { 273 e.deep_unflushed_deps_impl(result); 274 } 275 } 276 } 277 278 /// does a deep search and returns a list of all dependencies including `events` which haven't 279 /// been flushed out yet deep_unflushed_deps(events: &[Arc<Event>]) -> HashSet<&Event>280 pub fn deep_unflushed_deps(events: &[Arc<Event>]) -> HashSet<&Event> { 281 let mut result = HashSet::new(); 282 283 for e in events { 284 e.deep_unflushed_deps_impl(&mut result); 285 } 286 287 result 288 } 289 290 /// does a deep search and returns a list of all queues which haven't been flushed yet deep_unflushed_queues(events: &[Arc<Event>]) -> HashSet<Arc<Queue>>291 pub fn deep_unflushed_queues(events: &[Arc<Event>]) -> HashSet<Arc<Queue>> { 292 Event::deep_unflushed_deps(events) 293 .iter() 294 .filter_map(|e| e.queue.clone()) 295 .collect() 296 } 297 } 298 299 impl Drop for Event { 300 // implement drop in order to prevent stack overflows of long dependency chains. 301 // 302 // This abuses the fact that `Arc::into_inner` only succeeds when there is one strong reference 303 // so we turn a recursive drop chain into a drop list for events having no other references. drop(&mut self)304 fn drop(&mut self) { 305 if self.deps.is_empty() { 306 return; 307 } 308 309 let mut deps_list = vec![mem::take(&mut self.deps)]; 310 while let Some(deps) = deps_list.pop() { 311 for dep in deps { 312 if let Some(mut dep) = Arc::into_inner(dep) { 313 deps_list.push(mem::take(&mut dep.deps)); 314 } 315 } 316 } 317 } 318 } 319 320 // TODO worker thread per device 321 // Condvar to wait on new events to work on 322 // notify condvar when flushing queue events to worker 323 // attach fence to flushed events on context->flush 324 // store "newest" event for in-order queues per queue 325 // reordering/graph building done in worker 326