1 /*
2 * Copyright © 2016 Advanced Micro Devices, Inc.
3 * All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sub license, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 * USE OR OTHER DEALINGS IN THE SOFTWARE.
21 *
22 * The above copyright notice and this permission notice (including the
23 * next paragraph) shall be included in all copies or substantial portions
24 * of the Software.
25 */
26
27 #include "u_queue.h"
28
29 #include "c11/threads.h"
30 #include "util/u_cpu_detect.h"
31 #include "util/os_time.h"
32 #include "util/u_string.h"
33 #include "util/u_thread.h"
34 #include "util/timespec.h"
35 #include "u_process.h"
36
37 #if defined(__linux__)
38 #include <sys/time.h>
39 #include <sys/resource.h>
40 #include <sys/syscall.h>
41 #endif
42
43
44 /* Define 256MB */
45 #define S_256MB (256 * 1024 * 1024)
46
47 static void
48 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
49 bool locked);
50
51 /****************************************************************************
52 * Wait for all queues to assert idle when exit() is called.
53 *
54 * Otherwise, C++ static variable destructors can be called while threads
55 * are using the static variables.
56 */
57
58 static once_flag atexit_once_flag = ONCE_FLAG_INIT;
59 static struct list_head queue_list = {
60 .next = &queue_list,
61 .prev = &queue_list,
62 };
63 static mtx_t exit_mutex;
64
65 static void
atexit_handler(void)66 atexit_handler(void)
67 {
68 struct util_queue *iter;
69
70 mtx_lock(&exit_mutex);
71 /* Wait for all queues to assert idle. */
72 LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
73 util_queue_kill_threads(iter, 0, false);
74 }
75 mtx_unlock(&exit_mutex);
76 }
77
78 static void
global_init(void)79 global_init(void)
80 {
81 mtx_init(&exit_mutex, mtx_plain);
82 atexit(atexit_handler);
83 }
84
85 static void
add_to_atexit_list(struct util_queue * queue)86 add_to_atexit_list(struct util_queue *queue)
87 {
88 call_once(&atexit_once_flag, global_init);
89
90 mtx_lock(&exit_mutex);
91 list_add(&queue->head, &queue_list);
92 mtx_unlock(&exit_mutex);
93 }
94
95 static void
remove_from_atexit_list(struct util_queue * queue)96 remove_from_atexit_list(struct util_queue *queue)
97 {
98 struct util_queue *iter, *tmp;
99
100 mtx_lock(&exit_mutex);
101 LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
102 if (iter == queue) {
103 list_del(&iter->head);
104 break;
105 }
106 }
107 mtx_unlock(&exit_mutex);
108 }
109
110 /****************************************************************************
111 * util_queue_fence
112 */
113
114 #ifdef UTIL_QUEUE_FENCE_FUTEX
115 static bool
do_futex_fence_wait(struct util_queue_fence * fence,bool timeout,int64_t abs_timeout)116 do_futex_fence_wait(struct util_queue_fence *fence,
117 bool timeout, int64_t abs_timeout)
118 {
119 uint32_t v = p_atomic_read_relaxed(&fence->val);
120 struct timespec ts;
121 ts.tv_sec = abs_timeout / (1000*1000*1000);
122 ts.tv_nsec = abs_timeout % (1000*1000*1000);
123
124 while (v != 0) {
125 if (v != 2) {
126 v = p_atomic_cmpxchg(&fence->val, 1, 2);
127 if (v == 0)
128 return true;
129 }
130
131 int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL);
132 if (timeout && r < 0) {
133 if (errno == ETIMEDOUT)
134 return false;
135 }
136
137 v = p_atomic_read_relaxed(&fence->val);
138 }
139
140 return true;
141 }
142
143 void
_util_queue_fence_wait(struct util_queue_fence * fence)144 _util_queue_fence_wait(struct util_queue_fence *fence)
145 {
146 do_futex_fence_wait(fence, false, 0);
147 }
148
149 bool
_util_queue_fence_wait_timeout(struct util_queue_fence * fence,int64_t abs_timeout)150 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
151 int64_t abs_timeout)
152 {
153 return do_futex_fence_wait(fence, true, abs_timeout);
154 }
155
156 #endif
157
158 #ifdef UTIL_QUEUE_FENCE_STANDARD
159 void
util_queue_fence_signal(struct util_queue_fence * fence)160 util_queue_fence_signal(struct util_queue_fence *fence)
161 {
162 mtx_lock(&fence->mutex);
163 fence->signalled = true;
164 u_cnd_monotonic_broadcast(&fence->cond);
165 mtx_unlock(&fence->mutex);
166 }
167
168 void
_util_queue_fence_wait(struct util_queue_fence * fence)169 _util_queue_fence_wait(struct util_queue_fence *fence)
170 {
171 mtx_lock(&fence->mutex);
172 while (!fence->signalled)
173 u_cnd_monotonic_wait(&fence->cond, &fence->mutex);
174 mtx_unlock(&fence->mutex);
175 }
176
177 bool
_util_queue_fence_wait_timeout(struct util_queue_fence * fence,int64_t abs_timeout)178 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
179 int64_t abs_timeout)
180 {
181 struct timespec ts;
182 timespec_from_nsec(&ts, abs_timeout);
183
184 mtx_lock(&fence->mutex);
185 while (!fence->signalled) {
186 if (u_cnd_monotonic_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success)
187 break;
188 }
189 mtx_unlock(&fence->mutex);
190
191 return fence->signalled;
192 }
193
194 void
util_queue_fence_init(struct util_queue_fence * fence)195 util_queue_fence_init(struct util_queue_fence *fence)
196 {
197 memset(fence, 0, sizeof(*fence));
198 (void) mtx_init(&fence->mutex, mtx_plain);
199 u_cnd_monotonic_init(&fence->cond);
200 fence->signalled = true;
201 }
202
203 void
util_queue_fence_destroy(struct util_queue_fence * fence)204 util_queue_fence_destroy(struct util_queue_fence *fence)
205 {
206 assert(fence->signalled);
207
208 /* Ensure that another thread is not in the middle of
209 * util_queue_fence_signal (having set the fence to signalled but still
210 * holding the fence mutex).
211 *
212 * A common contract between threads is that as soon as a fence is signalled
213 * by thread A, thread B is allowed to destroy it. Since
214 * util_queue_fence_is_signalled does not lock the fence mutex (for
215 * performance reasons), we must do so here.
216 */
217 mtx_lock(&fence->mutex);
218 mtx_unlock(&fence->mutex);
219
220 u_cnd_monotonic_destroy(&fence->cond);
221 mtx_destroy(&fence->mutex);
222 }
223 #endif
224
225 /****************************************************************************
226 * util_queue implementation
227 */
228
229 struct thread_input {
230 struct util_queue *queue;
231 int thread_index;
232 };
233
234 static int
util_queue_thread_func(void * input)235 util_queue_thread_func(void *input)
236 {
237 struct util_queue *queue = ((struct thread_input*)input)->queue;
238 int thread_index = ((struct thread_input*)input)->thread_index;
239
240 free(input);
241
242 if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
243 /* Don't inherit the thread affinity from the parent thread.
244 * Set the full mask.
245 */
246 uint32_t mask[UTIL_MAX_CPUS / 32];
247
248 memset(mask, 0xff, sizeof(mask));
249
250 util_set_current_thread_affinity(mask, NULL,
251 util_get_cpu_caps()->num_cpu_mask_bits);
252 }
253
254 #if defined(__linux__)
255 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
256 /* The nice() function can only set a maximum of 19. */
257 setpriority(PRIO_PROCESS, syscall(SYS_gettid), 19);
258 }
259 #endif
260
261 if (strlen(queue->name) > 0) {
262 char name[16];
263 snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
264 u_thread_setname(name);
265 }
266
267 while (1) {
268 struct util_queue_job job;
269
270 mtx_lock(&queue->lock);
271 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
272
273 /* wait if the queue is empty */
274 while (thread_index < queue->num_threads && queue->num_queued == 0)
275 cnd_wait(&queue->has_queued_cond, &queue->lock);
276
277 /* only kill threads that are above "num_threads" */
278 if (thread_index >= queue->num_threads) {
279 mtx_unlock(&queue->lock);
280 break;
281 }
282
283 job = queue->jobs[queue->read_idx];
284 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
285 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
286
287 queue->num_queued--;
288 cnd_signal(&queue->has_space_cond);
289 if (job.job)
290 queue->total_jobs_size -= job.job_size;
291 mtx_unlock(&queue->lock);
292
293 if (job.job) {
294 job.execute(job.job, job.global_data, thread_index);
295 if (job.fence)
296 util_queue_fence_signal(job.fence);
297 if (job.cleanup)
298 job.cleanup(job.job, job.global_data, thread_index);
299 }
300 }
301
302 /* signal remaining jobs if all threads are being terminated */
303 mtx_lock(&queue->lock);
304 if (queue->num_threads == 0) {
305 for (unsigned i = queue->read_idx; i != queue->write_idx;
306 i = (i + 1) % queue->max_jobs) {
307 if (queue->jobs[i].job) {
308 if (queue->jobs[i].fence)
309 util_queue_fence_signal(queue->jobs[i].fence);
310 queue->jobs[i].job = NULL;
311 }
312 }
313 queue->read_idx = queue->write_idx;
314 queue->num_queued = 0;
315 }
316 mtx_unlock(&queue->lock);
317 return 0;
318 }
319
320 static bool
util_queue_create_thread(struct util_queue * queue,unsigned index)321 util_queue_create_thread(struct util_queue *queue, unsigned index)
322 {
323 struct thread_input *input =
324 (struct thread_input *) malloc(sizeof(struct thread_input));
325 input->queue = queue;
326 input->thread_index = index;
327
328 if (thrd_success != u_thread_create(queue->threads + index, util_queue_thread_func, input)) {
329 free(input);
330 return false;
331 }
332
333 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
334 #if defined(__linux__) && defined(SCHED_BATCH)
335 struct sched_param sched_param = {0};
336
337 /* The nice() function can only set a maximum of 19.
338 * SCHED_BATCH gives the scheduler a hint that this is a latency
339 * insensitive thread.
340 *
341 * Note that Linux only allows decreasing the priority. The original
342 * priority can't be restored.
343 */
344 pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param);
345 #endif
346 }
347 return true;
348 }
349
350 void
util_queue_adjust_num_threads(struct util_queue * queue,unsigned num_threads,bool locked)351 util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads,
352 bool locked)
353 {
354 num_threads = MIN2(num_threads, queue->max_threads);
355 num_threads = MAX2(num_threads, 1);
356
357 if (!locked)
358 mtx_lock(&queue->lock);
359
360 unsigned old_num_threads = queue->num_threads;
361
362 if (num_threads == old_num_threads) {
363 if (!locked)
364 mtx_unlock(&queue->lock);
365 return;
366 }
367
368 if (num_threads < old_num_threads) {
369 util_queue_kill_threads(queue, num_threads, true);
370 if (!locked)
371 mtx_unlock(&queue->lock);
372 return;
373 }
374
375 /* Create threads.
376 *
377 * We need to update num_threads first, because threads terminate
378 * when thread_index < num_threads.
379 */
380 queue->num_threads = num_threads;
381 for (unsigned i = old_num_threads; i < num_threads; i++) {
382 if (!util_queue_create_thread(queue, i)) {
383 queue->num_threads = i;
384 break;
385 }
386 }
387
388 if (!locked)
389 mtx_unlock(&queue->lock);
390 }
391
392 bool
util_queue_init(struct util_queue * queue,const char * name,unsigned max_jobs,unsigned num_threads,unsigned flags,void * global_data)393 util_queue_init(struct util_queue *queue,
394 const char *name,
395 unsigned max_jobs,
396 unsigned num_threads,
397 unsigned flags,
398 void *global_data)
399 {
400 unsigned i;
401
402 /* Form the thread name from process_name and name, limited to 13
403 * characters. Characters 14-15 are reserved for the thread number.
404 * Character 16 should be 0. Final form: "process:name12"
405 *
406 * If name is too long, it's truncated. If any space is left, the process
407 * name fills it.
408 */
409 const char *process_name = util_get_process_name();
410 int process_len = process_name ? strlen(process_name) : 0;
411 int name_len = strlen(name);
412 const int max_chars = sizeof(queue->name) - 1;
413
414 name_len = MIN2(name_len, max_chars);
415
416 /* See if there is any space left for the process name, reserve 1 for
417 * the colon. */
418 process_len = MIN2(process_len, max_chars - name_len - 1);
419 process_len = MAX2(process_len, 0);
420
421 memset(queue, 0, sizeof(*queue));
422
423 if (process_len) {
424 snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
425 process_len, process_name, name);
426 } else {
427 snprintf(queue->name, sizeof(queue->name), "%s", name);
428 }
429
430 queue->create_threads_on_demand = true;
431 queue->flags = flags;
432 queue->max_threads = num_threads;
433 queue->num_threads = 1;
434 queue->max_jobs = max_jobs;
435 queue->global_data = global_data;
436
437 (void) mtx_init(&queue->lock, mtx_plain);
438
439 queue->num_queued = 0;
440 cnd_init(&queue->has_queued_cond);
441 cnd_init(&queue->has_space_cond);
442
443 queue->jobs = (struct util_queue_job*)
444 calloc(max_jobs, sizeof(struct util_queue_job));
445 if (!queue->jobs)
446 goto fail;
447
448 queue->threads = (thrd_t*) calloc(queue->max_threads, sizeof(thrd_t));
449 if (!queue->threads)
450 goto fail;
451
452 /* start threads */
453 for (i = 0; i < queue->num_threads; i++) {
454 if (!util_queue_create_thread(queue, i)) {
455 if (i == 0) {
456 /* no threads created, fail */
457 goto fail;
458 } else {
459 /* at least one thread created, so use it */
460 queue->num_threads = i;
461 break;
462 }
463 }
464 }
465
466 add_to_atexit_list(queue);
467 return true;
468
469 fail:
470 free(queue->threads);
471
472 if (queue->jobs) {
473 cnd_destroy(&queue->has_space_cond);
474 cnd_destroy(&queue->has_queued_cond);
475 mtx_destroy(&queue->lock);
476 free(queue->jobs);
477 }
478 /* also util_queue_is_initialized can be used to check for success */
479 memset(queue, 0, sizeof(*queue));
480 return false;
481 }
482
483 static void
util_queue_kill_threads(struct util_queue * queue,unsigned keep_num_threads,bool locked)484 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
485 bool locked)
486 {
487 /* Signal all threads to terminate. */
488 if (!locked)
489 mtx_lock(&queue->lock);
490
491 if (keep_num_threads >= queue->num_threads) {
492 if (!locked)
493 mtx_unlock(&queue->lock);
494 return;
495 }
496
497 unsigned old_num_threads = queue->num_threads;
498 /* Setting num_threads is what causes the threads to terminate.
499 * Then cnd_broadcast wakes them up and they will exit their function.
500 */
501 queue->num_threads = keep_num_threads;
502 cnd_broadcast(&queue->has_queued_cond);
503
504 /* Wait for threads to terminate. */
505 if (keep_num_threads < old_num_threads) {
506 /* We need to unlock the mutex to allow threads to terminate. */
507 mtx_unlock(&queue->lock);
508 for (unsigned i = keep_num_threads; i < old_num_threads; i++)
509 thrd_join(queue->threads[i], NULL);
510 if (locked)
511 mtx_lock(&queue->lock);
512 } else {
513 if (!locked)
514 mtx_unlock(&queue->lock);
515 }
516 }
517
518 static void
util_queue_finish_execute(void * data,void * gdata,int num_thread)519 util_queue_finish_execute(void *data, void *gdata, int num_thread)
520 {
521 util_barrier *barrier = data;
522 if (util_barrier_wait(barrier))
523 util_barrier_destroy(barrier);
524 }
525
526 void
util_queue_destroy(struct util_queue * queue)527 util_queue_destroy(struct util_queue *queue)
528 {
529 util_queue_kill_threads(queue, 0, false);
530
531 /* This makes it safe to call on a queue that failed util_queue_init. */
532 if (queue->head.next != NULL)
533 remove_from_atexit_list(queue);
534
535 cnd_destroy(&queue->has_space_cond);
536 cnd_destroy(&queue->has_queued_cond);
537 mtx_destroy(&queue->lock);
538 free(queue->jobs);
539 free(queue->threads);
540 }
541
542 static void
util_queue_add_job_locked(struct util_queue * queue,void * job,struct util_queue_fence * fence,util_queue_execute_func execute,util_queue_execute_func cleanup,const size_t job_size,bool locked)543 util_queue_add_job_locked(struct util_queue *queue,
544 void *job,
545 struct util_queue_fence *fence,
546 util_queue_execute_func execute,
547 util_queue_execute_func cleanup,
548 const size_t job_size,
549 bool locked)
550 {
551 struct util_queue_job *ptr;
552
553 if (!locked)
554 mtx_lock(&queue->lock);
555 if (queue->num_threads == 0) {
556 if (!locked)
557 mtx_unlock(&queue->lock);
558 /* well no good option here, but any leaks will be
559 * short-lived as things are shutting down..
560 */
561 return;
562 }
563
564 if (fence)
565 util_queue_fence_reset(fence);
566
567 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
568
569 /* Scale the number of threads up if there's already one job waiting. */
570 if (queue->num_queued > 0 &&
571 queue->create_threads_on_demand &&
572 execute != util_queue_finish_execute &&
573 queue->num_threads < queue->max_threads) {
574 util_queue_adjust_num_threads(queue, queue->num_threads + 1, true);
575 }
576
577 if (queue->num_queued == queue->max_jobs) {
578 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL &&
579 queue->total_jobs_size + job_size < S_256MB) {
580 /* If the queue is full, make it larger to avoid waiting for a free
581 * slot.
582 */
583 unsigned new_max_jobs = queue->max_jobs + 8;
584 struct util_queue_job *jobs =
585 (struct util_queue_job*)calloc(new_max_jobs,
586 sizeof(struct util_queue_job));
587 assert(jobs);
588
589 /* Copy all queued jobs into the new list. */
590 unsigned num_jobs = 0;
591 unsigned i = queue->read_idx;
592
593 do {
594 jobs[num_jobs++] = queue->jobs[i];
595 i = (i + 1) % queue->max_jobs;
596 } while (i != queue->write_idx);
597
598 assert(num_jobs == queue->num_queued);
599
600 free(queue->jobs);
601 queue->jobs = jobs;
602 queue->read_idx = 0;
603 queue->write_idx = num_jobs;
604 queue->max_jobs = new_max_jobs;
605 } else {
606 /* Wait until there is a free slot. */
607 while (queue->num_queued == queue->max_jobs)
608 cnd_wait(&queue->has_space_cond, &queue->lock);
609 }
610 }
611
612 ptr = &queue->jobs[queue->write_idx];
613 assert(ptr->job == NULL);
614 ptr->job = job;
615 ptr->global_data = queue->global_data;
616 ptr->fence = fence;
617 ptr->execute = execute;
618 ptr->cleanup = cleanup;
619 ptr->job_size = job_size;
620
621 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
622 queue->total_jobs_size += ptr->job_size;
623
624 queue->num_queued++;
625 cnd_signal(&queue->has_queued_cond);
626 if (!locked)
627 mtx_unlock(&queue->lock);
628 }
629
630 void
util_queue_add_job(struct util_queue * queue,void * job,struct util_queue_fence * fence,util_queue_execute_func execute,util_queue_execute_func cleanup,const size_t job_size)631 util_queue_add_job(struct util_queue *queue,
632 void *job,
633 struct util_queue_fence *fence,
634 util_queue_execute_func execute,
635 util_queue_execute_func cleanup,
636 const size_t job_size)
637 {
638 util_queue_add_job_locked(queue, job, fence, execute, cleanup, job_size,
639 false);
640 }
641
642 /**
643 * Remove a queued job. If the job hasn't started execution, it's removed from
644 * the queue. If the job has started execution, the function waits for it to
645 * complete.
646 *
647 * In all cases, the fence is signalled when the function returns.
648 *
649 * The function can be used when destroying an object associated with the job
650 * when you don't care about the job completion state.
651 */
652 void
util_queue_drop_job(struct util_queue * queue,struct util_queue_fence * fence)653 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
654 {
655 bool removed = false;
656
657 if (util_queue_fence_is_signalled(fence))
658 return;
659
660 mtx_lock(&queue->lock);
661 for (unsigned i = queue->read_idx; i != queue->write_idx;
662 i = (i + 1) % queue->max_jobs) {
663 if (queue->jobs[i].fence == fence) {
664 if (queue->jobs[i].cleanup)
665 queue->jobs[i].cleanup(queue->jobs[i].job, queue->global_data, -1);
666
667 /* Just clear it. The threads will treat as a no-op job. */
668 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
669 removed = true;
670 break;
671 }
672 }
673 mtx_unlock(&queue->lock);
674
675 if (removed)
676 util_queue_fence_signal(fence);
677 else
678 util_queue_fence_wait(fence);
679 }
680
681 /**
682 * Wait until all previously added jobs have completed.
683 */
684 void
util_queue_finish(struct util_queue * queue)685 util_queue_finish(struct util_queue *queue)
686 {
687 util_barrier barrier;
688 struct util_queue_fence *fences;
689
690 /* If 2 threads were adding jobs for 2 different barries at the same time,
691 * a deadlock would happen, because 1 barrier requires that all threads
692 * wait for it exclusively.
693 */
694 mtx_lock(&queue->lock);
695
696 /* The number of threads can be changed to 0, e.g. by the atexit handler. */
697 if (!queue->num_threads) {
698 mtx_unlock(&queue->lock);
699 return;
700 }
701
702 /* We need to disable adding new threads in util_queue_add_job because
703 * the finish operation requires a fixed number of threads.
704 *
705 * Also note that util_queue_add_job can unlock the mutex if there is not
706 * enough space in the queue and wait for space.
707 */
708 queue->create_threads_on_demand = false;
709
710 fences = malloc(queue->num_threads * sizeof(*fences));
711 util_barrier_init(&barrier, queue->num_threads);
712
713 for (unsigned i = 0; i < queue->num_threads; ++i) {
714 util_queue_fence_init(&fences[i]);
715 util_queue_add_job_locked(queue, &barrier, &fences[i],
716 util_queue_finish_execute, NULL, 0, true);
717 }
718 queue->create_threads_on_demand = true;
719 mtx_unlock(&queue->lock);
720
721 for (unsigned i = 0; i < queue->num_threads; ++i) {
722 util_queue_fence_wait(&fences[i]);
723 util_queue_fence_destroy(&fences[i]);
724 }
725
726 free(fences);
727 }
728
729 int64_t
util_queue_get_thread_time_nano(struct util_queue * queue,unsigned thread_index)730 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
731 {
732 /* Allow some flexibility by not raising an error. */
733 if (thread_index >= queue->num_threads)
734 return 0;
735
736 return util_thread_get_time_nano(queue->threads[thread_index]);
737 }
738