xref: /aosp_15_r20/external/mesa3d/src/gallium/frontends/rusticl/core/event.rs (revision 6104692788411f58d303aa86923a9ff6ecaded22)
1 use crate::api::icd::*;
2 use crate::api::types::*;
3 use crate::core::context::*;
4 use crate::core::queue::*;
5 use crate::impl_cl_type_trait;
6 
7 use mesa_rust::pipe::query::*;
8 use mesa_rust_gen::*;
9 use mesa_rust_util::static_assert;
10 use rusticl_opencl_gen::*;
11 
12 use std::collections::HashSet;
13 use std::mem;
14 use std::sync::Arc;
15 use std::sync::Condvar;
16 use std::sync::Mutex;
17 use std::sync::MutexGuard;
18 use std::time::Duration;
19 
20 // we assert that those are a continous range of numbers so we won't have to use HashMaps
21 static_assert!(CL_COMPLETE == 0);
22 static_assert!(CL_RUNNING == 1);
23 static_assert!(CL_SUBMITTED == 2);
24 static_assert!(CL_QUEUED == 3);
25 
26 pub type EventSig = Box<dyn FnOnce(&Arc<Queue>, &QueueContext) -> CLResult<()> + Send + Sync>;
27 
28 pub enum EventTimes {
29     Queued = CL_PROFILING_COMMAND_QUEUED as isize,
30     Submit = CL_PROFILING_COMMAND_SUBMIT as isize,
31     Start = CL_PROFILING_COMMAND_START as isize,
32     End = CL_PROFILING_COMMAND_END as isize,
33 }
34 
35 #[derive(Default)]
36 struct EventMutState {
37     status: cl_int,
38     cbs: [Vec<EventCB>; 3],
39     work: Option<EventSig>,
40     time_queued: cl_ulong,
41     time_submit: cl_ulong,
42     time_start: cl_ulong,
43     time_end: cl_ulong,
44 }
45 
46 pub struct Event {
47     pub base: CLObjectBase<CL_INVALID_EVENT>,
48     pub context: Arc<Context>,
49     pub queue: Option<Arc<Queue>>,
50     pub cmd_type: cl_command_type,
51     pub deps: Vec<Arc<Event>>,
52     state: Mutex<EventMutState>,
53     cv: Condvar,
54 }
55 
56 impl_cl_type_trait!(cl_event, Event, CL_INVALID_EVENT);
57 
58 impl Event {
new( queue: &Arc<Queue>, cmd_type: cl_command_type, deps: Vec<Arc<Event>>, work: EventSig, ) -> Arc<Event>59     pub fn new(
60         queue: &Arc<Queue>,
61         cmd_type: cl_command_type,
62         deps: Vec<Arc<Event>>,
63         work: EventSig,
64     ) -> Arc<Event> {
65         Arc::new(Self {
66             base: CLObjectBase::new(RusticlTypes::Event),
67             context: queue.context.clone(),
68             queue: Some(queue.clone()),
69             cmd_type: cmd_type,
70             deps: deps,
71             state: Mutex::new(EventMutState {
72                 status: CL_QUEUED as cl_int,
73                 work: Some(work),
74                 ..Default::default()
75             }),
76             cv: Condvar::new(),
77         })
78     }
79 
new_user(context: Arc<Context>) -> Arc<Event>80     pub fn new_user(context: Arc<Context>) -> Arc<Event> {
81         Arc::new(Self {
82             base: CLObjectBase::new(RusticlTypes::Event),
83             context: context,
84             queue: None,
85             cmd_type: CL_COMMAND_USER,
86             deps: Vec::new(),
87             state: Mutex::new(EventMutState {
88                 status: CL_SUBMITTED as cl_int,
89                 ..Default::default()
90             }),
91             cv: Condvar::new(),
92         })
93     }
94 
state(&self) -> MutexGuard<EventMutState>95     fn state(&self) -> MutexGuard<EventMutState> {
96         self.state.lock().unwrap()
97     }
98 
status(&self) -> cl_int99     pub fn status(&self) -> cl_int {
100         self.state().status
101     }
102 
set_status(&self, mut lock: MutexGuard<EventMutState>, new: cl_int)103     fn set_status(&self, mut lock: MutexGuard<EventMutState>, new: cl_int) {
104         lock.status = new;
105 
106         // signal on completion or an error
107         if new <= CL_COMPLETE as cl_int {
108             self.cv.notify_all();
109         }
110 
111         // errors we treat as CL_COMPLETE
112         let cb_max = if new < 0 { CL_COMPLETE } else { new as u32 };
113 
114         // there are only callbacks for those
115         if ![CL_COMPLETE, CL_RUNNING, CL_SUBMITTED].contains(&cb_max) {
116             return;
117         }
118 
119         let mut cbs = Vec::new();
120         // Collect all cbs we need to call first. Technically it is not required to call them in
121         // order, but let's be nice to applications as it's for free
122         for idx in (cb_max..=CL_SUBMITTED).rev() {
123             cbs.extend(
124                 // use mem::take as each callback is only supposed to be called exactly once
125                 mem::take(&mut lock.cbs[idx as usize])
126                     .into_iter()
127                     // we need to save the status this cb was registered with
128                     .map(|cb| (idx as cl_int, cb)),
129             );
130         }
131 
132         // applications might want to access the event in the callback, so drop the lock before
133         // calling into the callbacks.
134         drop(lock);
135 
136         for (idx, cb) in cbs {
137             // from the CL spec:
138             //
139             // event_command_status is equal to the command_exec_callback_type used while
140             // registering the callback. [...] If the callback is called as the result of the
141             // command associated with event being abnormally terminated, an appropriate error code
142             // for the error that caused the termination will be passed to event_command_status
143             // instead.
144             let status = if new < 0 { new } else { idx };
145             cb.call(self, status);
146         }
147     }
148 
set_user_status(&self, status: cl_int)149     pub fn set_user_status(&self, status: cl_int) {
150         self.set_status(self.state(), status);
151     }
152 
is_error(&self) -> bool153     pub fn is_error(&self) -> bool {
154         self.status() < 0
155     }
156 
is_user(&self) -> bool157     pub fn is_user(&self) -> bool {
158         self.cmd_type == CL_COMMAND_USER
159     }
160 
set_time(&self, which: EventTimes, value: cl_ulong)161     pub fn set_time(&self, which: EventTimes, value: cl_ulong) {
162         let mut lock = self.state();
163         match which {
164             EventTimes::Queued => lock.time_queued = value,
165             EventTimes::Submit => lock.time_submit = value,
166             EventTimes::Start => lock.time_start = value,
167             EventTimes::End => lock.time_end = value,
168         }
169     }
170 
get_time(&self, which: EventTimes) -> cl_ulong171     pub fn get_time(&self, which: EventTimes) -> cl_ulong {
172         let lock = self.state();
173 
174         match which {
175             EventTimes::Queued => lock.time_queued,
176             EventTimes::Submit => lock.time_submit,
177             EventTimes::Start => lock.time_start,
178             EventTimes::End => lock.time_end,
179         }
180     }
181 
add_cb(&self, state: cl_int, cb: EventCB)182     pub fn add_cb(&self, state: cl_int, cb: EventCB) {
183         let mut lock = self.state();
184         let status = lock.status;
185 
186         // call cb if the status was already reached
187         if state >= status {
188             drop(lock);
189             cb.call(self, state);
190         } else {
191             lock.cbs.get_mut(state as usize).unwrap().push(cb);
192         }
193     }
194 
signal(&self)195     pub(super) fn signal(&self) {
196         let state = self.state();
197         // we don't want to call signal on errored events, but if that still happens, handle it
198         // gracefully
199         debug_assert_eq!(state.status, CL_SUBMITTED as cl_int);
200         if state.status < 0 {
201             return;
202         }
203         self.set_status(state, CL_RUNNING as cl_int);
204         self.set_status(self.state(), CL_COMPLETE as cl_int);
205     }
206 
wait(&self) -> cl_int207     pub fn wait(&self) -> cl_int {
208         let mut lock = self.state();
209         while lock.status >= CL_RUNNING as cl_int {
210             lock = self
211                 .cv
212                 .wait_timeout(lock, Duration::from_secs(1))
213                 .unwrap()
214                 .0;
215         }
216         lock.status
217     }
218 
219     // We always assume that work here simply submits stuff to the hardware even if it's just doing
220     // sw emulation or nothing at all.
221     // If anything requets waiting, we will update the status through fencing later.
call(&self, ctx: &QueueContext) -> cl_int222     pub fn call(&self, ctx: &QueueContext) -> cl_int {
223         let mut lock = self.state();
224         let mut status = lock.status;
225         let queue = self.queue.as_ref().unwrap();
226         let profiling_enabled = queue.is_profiling_enabled();
227         if status == CL_QUEUED as cl_int {
228             if profiling_enabled {
229                 // We already have the lock so can't call set_time on the event
230                 lock.time_submit = queue.device.screen().get_timestamp();
231             }
232             let mut query_start = None;
233             let mut query_end = None;
234             status = lock.work.take().map_or(
235                 // if there is no work
236                 CL_SUBMITTED as cl_int,
237                 |w| {
238                     if profiling_enabled {
239                         query_start =
240                             PipeQueryGen::<{ pipe_query_type::PIPE_QUERY_TIMESTAMP }>::new(ctx);
241                     }
242 
243                     let res = w(queue, ctx).err().map_or(
244                         // return the error if there is one
245                         CL_SUBMITTED as cl_int,
246                         |e| e,
247                     );
248                     if profiling_enabled {
249                         query_end =
250                             PipeQueryGen::<{ pipe_query_type::PIPE_QUERY_TIMESTAMP }>::new(ctx);
251                     }
252                     res
253                 },
254             );
255 
256             if profiling_enabled {
257                 lock.time_start = query_start.unwrap().read_blocked();
258                 lock.time_end = query_end.unwrap().read_blocked();
259             }
260             self.set_status(lock, status);
261         }
262         status
263     }
264 
deep_unflushed_deps_impl<'a>(&'a self, result: &mut HashSet<&'a Event>)265     fn deep_unflushed_deps_impl<'a>(&'a self, result: &mut HashSet<&'a Event>) {
266         if self.status() <= CL_SUBMITTED as i32 {
267             return;
268         }
269 
270         // only scan dependencies if it's a new one
271         if result.insert(self) {
272             for e in &self.deps {
273                 e.deep_unflushed_deps_impl(result);
274             }
275         }
276     }
277 
278     /// does a deep search and returns a list of all dependencies including `events` which haven't
279     /// been flushed out yet
deep_unflushed_deps(events: &[Arc<Event>]) -> HashSet<&Event>280     pub fn deep_unflushed_deps(events: &[Arc<Event>]) -> HashSet<&Event> {
281         let mut result = HashSet::new();
282 
283         for e in events {
284             e.deep_unflushed_deps_impl(&mut result);
285         }
286 
287         result
288     }
289 
290     /// does a deep search and returns a list of all queues which haven't been flushed yet
deep_unflushed_queues(events: &[Arc<Event>]) -> HashSet<Arc<Queue>>291     pub fn deep_unflushed_queues(events: &[Arc<Event>]) -> HashSet<Arc<Queue>> {
292         Event::deep_unflushed_deps(events)
293             .iter()
294             .filter_map(|e| e.queue.clone())
295             .collect()
296     }
297 }
298 
299 impl Drop for Event {
300     // implement drop in order to prevent stack overflows of long dependency chains.
301     //
302     // This abuses the fact that `Arc::into_inner` only succeeds when there is one strong reference
303     // so we turn a recursive drop chain into a drop list for events having no other references.
drop(&mut self)304     fn drop(&mut self) {
305         if self.deps.is_empty() {
306             return;
307         }
308 
309         let mut deps_list = vec![mem::take(&mut self.deps)];
310         while let Some(deps) = deps_list.pop() {
311             for dep in deps {
312                 if let Some(mut dep) = Arc::into_inner(dep) {
313                     deps_list.push(mem::take(&mut dep.deps));
314                 }
315             }
316         }
317     }
318 }
319 
320 // TODO worker thread per device
321 // Condvar to wait on new events to work on
322 // notify condvar when flushing queue events to worker
323 // attach fence to flushed events on context->flush
324 // store "newest" event for in-order queues per queue
325 // reordering/graph building done in worker
326