xref: /aosp_15_r20/external/mesa3d/src/util/u_queue.c (revision 6104692788411f58d303aa86923a9ff6ecaded22)
1 /*
2  * Copyright © 2016 Advanced Micro Devices, Inc.
3  * All Rights Reserved.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sub license, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16  * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17  * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20  * USE OR OTHER DEALINGS IN THE SOFTWARE.
21  *
22  * The above copyright notice and this permission notice (including the
23  * next paragraph) shall be included in all copies or substantial portions
24  * of the Software.
25  */
26 
27 #include "u_queue.h"
28 
29 #include "c11/threads.h"
30 #include "util/u_cpu_detect.h"
31 #include "util/os_time.h"
32 #include "util/u_string.h"
33 #include "util/u_thread.h"
34 #include "util/timespec.h"
35 #include "u_process.h"
36 
37 #if defined(__linux__)
38 #include <sys/time.h>
39 #include <sys/resource.h>
40 #include <sys/syscall.h>
41 #endif
42 
43 
44 /* Define 256MB */
45 #define S_256MB (256 * 1024 * 1024)
46 
47 static void
48 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
49                         bool locked);
50 
51 /****************************************************************************
52  * Wait for all queues to assert idle when exit() is called.
53  *
54  * Otherwise, C++ static variable destructors can be called while threads
55  * are using the static variables.
56  */
57 
58 static once_flag atexit_once_flag = ONCE_FLAG_INIT;
59 static struct list_head queue_list = {
60    .next = &queue_list,
61    .prev = &queue_list,
62 };
63 static mtx_t exit_mutex;
64 
65 static void
atexit_handler(void)66 atexit_handler(void)
67 {
68    struct util_queue *iter;
69 
70    mtx_lock(&exit_mutex);
71    /* Wait for all queues to assert idle. */
72    LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
73       util_queue_kill_threads(iter, 0, false);
74    }
75    mtx_unlock(&exit_mutex);
76 }
77 
78 static void
global_init(void)79 global_init(void)
80 {
81    mtx_init(&exit_mutex, mtx_plain);
82    atexit(atexit_handler);
83 }
84 
85 static void
add_to_atexit_list(struct util_queue * queue)86 add_to_atexit_list(struct util_queue *queue)
87 {
88    call_once(&atexit_once_flag, global_init);
89 
90    mtx_lock(&exit_mutex);
91    list_add(&queue->head, &queue_list);
92    mtx_unlock(&exit_mutex);
93 }
94 
95 static void
remove_from_atexit_list(struct util_queue * queue)96 remove_from_atexit_list(struct util_queue *queue)
97 {
98    struct util_queue *iter, *tmp;
99 
100    mtx_lock(&exit_mutex);
101    LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
102       if (iter == queue) {
103          list_del(&iter->head);
104          break;
105       }
106    }
107    mtx_unlock(&exit_mutex);
108 }
109 
110 /****************************************************************************
111  * util_queue_fence
112  */
113 
114 #ifdef UTIL_QUEUE_FENCE_FUTEX
115 static bool
do_futex_fence_wait(struct util_queue_fence * fence,bool timeout,int64_t abs_timeout)116 do_futex_fence_wait(struct util_queue_fence *fence,
117                     bool timeout, int64_t abs_timeout)
118 {
119    uint32_t v = p_atomic_read_relaxed(&fence->val);
120    struct timespec ts;
121    ts.tv_sec = abs_timeout / (1000*1000*1000);
122    ts.tv_nsec = abs_timeout % (1000*1000*1000);
123 
124    while (v != 0) {
125       if (v != 2) {
126          v = p_atomic_cmpxchg(&fence->val, 1, 2);
127          if (v == 0)
128             return true;
129       }
130 
131       int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL);
132       if (timeout && r < 0) {
133          if (errno == ETIMEDOUT)
134             return false;
135       }
136 
137       v = p_atomic_read_relaxed(&fence->val);
138    }
139 
140    return true;
141 }
142 
143 void
_util_queue_fence_wait(struct util_queue_fence * fence)144 _util_queue_fence_wait(struct util_queue_fence *fence)
145 {
146    do_futex_fence_wait(fence, false, 0);
147 }
148 
149 bool
_util_queue_fence_wait_timeout(struct util_queue_fence * fence,int64_t abs_timeout)150 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
151                                int64_t abs_timeout)
152 {
153    return do_futex_fence_wait(fence, true, abs_timeout);
154 }
155 
156 #endif
157 
158 #ifdef UTIL_QUEUE_FENCE_STANDARD
159 void
util_queue_fence_signal(struct util_queue_fence * fence)160 util_queue_fence_signal(struct util_queue_fence *fence)
161 {
162    mtx_lock(&fence->mutex);
163    fence->signalled = true;
164    u_cnd_monotonic_broadcast(&fence->cond);
165    mtx_unlock(&fence->mutex);
166 }
167 
168 void
_util_queue_fence_wait(struct util_queue_fence * fence)169 _util_queue_fence_wait(struct util_queue_fence *fence)
170 {
171    mtx_lock(&fence->mutex);
172    while (!fence->signalled)
173       u_cnd_monotonic_wait(&fence->cond, &fence->mutex);
174    mtx_unlock(&fence->mutex);
175 }
176 
177 bool
_util_queue_fence_wait_timeout(struct util_queue_fence * fence,int64_t abs_timeout)178 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
179                                int64_t abs_timeout)
180 {
181    struct timespec ts;
182    timespec_from_nsec(&ts, abs_timeout);
183 
184    mtx_lock(&fence->mutex);
185    while (!fence->signalled) {
186       if (u_cnd_monotonic_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success)
187          break;
188    }
189    mtx_unlock(&fence->mutex);
190 
191    return fence->signalled;
192 }
193 
194 void
util_queue_fence_init(struct util_queue_fence * fence)195 util_queue_fence_init(struct util_queue_fence *fence)
196 {
197    memset(fence, 0, sizeof(*fence));
198    (void) mtx_init(&fence->mutex, mtx_plain);
199    u_cnd_monotonic_init(&fence->cond);
200    fence->signalled = true;
201 }
202 
203 void
util_queue_fence_destroy(struct util_queue_fence * fence)204 util_queue_fence_destroy(struct util_queue_fence *fence)
205 {
206    assert(fence->signalled);
207 
208    /* Ensure that another thread is not in the middle of
209     * util_queue_fence_signal (having set the fence to signalled but still
210     * holding the fence mutex).
211     *
212     * A common contract between threads is that as soon as a fence is signalled
213     * by thread A, thread B is allowed to destroy it. Since
214     * util_queue_fence_is_signalled does not lock the fence mutex (for
215     * performance reasons), we must do so here.
216     */
217    mtx_lock(&fence->mutex);
218    mtx_unlock(&fence->mutex);
219 
220    u_cnd_monotonic_destroy(&fence->cond);
221    mtx_destroy(&fence->mutex);
222 }
223 #endif
224 
225 /****************************************************************************
226  * util_queue implementation
227  */
228 
229 struct thread_input {
230    struct util_queue *queue;
231    int thread_index;
232 };
233 
234 static int
util_queue_thread_func(void * input)235 util_queue_thread_func(void *input)
236 {
237    struct util_queue *queue = ((struct thread_input*)input)->queue;
238    int thread_index = ((struct thread_input*)input)->thread_index;
239 
240    free(input);
241 
242    if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
243       /* Don't inherit the thread affinity from the parent thread.
244        * Set the full mask.
245        */
246       uint32_t mask[UTIL_MAX_CPUS / 32];
247 
248       memset(mask, 0xff, sizeof(mask));
249 
250       util_set_current_thread_affinity(mask, NULL,
251                                        util_get_cpu_caps()->num_cpu_mask_bits);
252    }
253 
254 #if defined(__linux__)
255    if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
256       /* The nice() function can only set a maximum of 19. */
257       setpriority(PRIO_PROCESS, syscall(SYS_gettid), 19);
258    }
259 #endif
260 
261    if (strlen(queue->name) > 0) {
262       char name[16];
263       snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
264       u_thread_setname(name);
265    }
266 
267    while (1) {
268       struct util_queue_job job;
269 
270       mtx_lock(&queue->lock);
271       assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
272 
273       /* wait if the queue is empty */
274       while (thread_index < queue->num_threads && queue->num_queued == 0)
275          cnd_wait(&queue->has_queued_cond, &queue->lock);
276 
277       /* only kill threads that are above "num_threads" */
278       if (thread_index >= queue->num_threads) {
279          mtx_unlock(&queue->lock);
280          break;
281       }
282 
283       job = queue->jobs[queue->read_idx];
284       memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
285       queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
286 
287       queue->num_queued--;
288       cnd_signal(&queue->has_space_cond);
289       if (job.job)
290          queue->total_jobs_size -= job.job_size;
291       mtx_unlock(&queue->lock);
292 
293       if (job.job) {
294          job.execute(job.job, job.global_data, thread_index);
295          if (job.fence)
296             util_queue_fence_signal(job.fence);
297          if (job.cleanup)
298             job.cleanup(job.job, job.global_data, thread_index);
299       }
300    }
301 
302    /* signal remaining jobs if all threads are being terminated */
303    mtx_lock(&queue->lock);
304    if (queue->num_threads == 0) {
305       for (unsigned i = queue->read_idx; i != queue->write_idx;
306            i = (i + 1) % queue->max_jobs) {
307          if (queue->jobs[i].job) {
308             if (queue->jobs[i].fence)
309                util_queue_fence_signal(queue->jobs[i].fence);
310             queue->jobs[i].job = NULL;
311          }
312       }
313       queue->read_idx = queue->write_idx;
314       queue->num_queued = 0;
315    }
316    mtx_unlock(&queue->lock);
317    return 0;
318 }
319 
320 static bool
util_queue_create_thread(struct util_queue * queue,unsigned index)321 util_queue_create_thread(struct util_queue *queue, unsigned index)
322 {
323    struct thread_input *input =
324       (struct thread_input *) malloc(sizeof(struct thread_input));
325    input->queue = queue;
326    input->thread_index = index;
327 
328    if (thrd_success != u_thread_create(queue->threads + index, util_queue_thread_func, input)) {
329       free(input);
330       return false;
331    }
332 
333    if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
334 #if defined(__linux__) && defined(SCHED_BATCH)
335       struct sched_param sched_param = {0};
336 
337       /* The nice() function can only set a maximum of 19.
338        * SCHED_BATCH gives the scheduler a hint that this is a latency
339        * insensitive thread.
340        *
341        * Note that Linux only allows decreasing the priority. The original
342        * priority can't be restored.
343        */
344       pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param);
345 #endif
346    }
347    return true;
348 }
349 
350 void
util_queue_adjust_num_threads(struct util_queue * queue,unsigned num_threads,bool locked)351 util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads,
352                               bool locked)
353 {
354    num_threads = MIN2(num_threads, queue->max_threads);
355    num_threads = MAX2(num_threads, 1);
356 
357    if (!locked)
358       mtx_lock(&queue->lock);
359 
360    unsigned old_num_threads = queue->num_threads;
361 
362    if (num_threads == old_num_threads) {
363       if (!locked)
364          mtx_unlock(&queue->lock);
365       return;
366    }
367 
368    if (num_threads < old_num_threads) {
369       util_queue_kill_threads(queue, num_threads, true);
370       if (!locked)
371          mtx_unlock(&queue->lock);
372       return;
373    }
374 
375    /* Create threads.
376     *
377     * We need to update num_threads first, because threads terminate
378     * when thread_index < num_threads.
379     */
380    queue->num_threads = num_threads;
381    for (unsigned i = old_num_threads; i < num_threads; i++) {
382       if (!util_queue_create_thread(queue, i)) {
383          queue->num_threads = i;
384          break;
385       }
386    }
387 
388    if (!locked)
389       mtx_unlock(&queue->lock);
390 }
391 
392 bool
util_queue_init(struct util_queue * queue,const char * name,unsigned max_jobs,unsigned num_threads,unsigned flags,void * global_data)393 util_queue_init(struct util_queue *queue,
394                 const char *name,
395                 unsigned max_jobs,
396                 unsigned num_threads,
397                 unsigned flags,
398                 void *global_data)
399 {
400    unsigned i;
401 
402    /* Form the thread name from process_name and name, limited to 13
403     * characters. Characters 14-15 are reserved for the thread number.
404     * Character 16 should be 0. Final form: "process:name12"
405     *
406     * If name is too long, it's truncated. If any space is left, the process
407     * name fills it.
408     */
409    const char *process_name = util_get_process_name();
410    int process_len = process_name ? strlen(process_name) : 0;
411    int name_len = strlen(name);
412    const int max_chars = sizeof(queue->name) - 1;
413 
414    name_len = MIN2(name_len, max_chars);
415 
416    /* See if there is any space left for the process name, reserve 1 for
417     * the colon. */
418    process_len = MIN2(process_len, max_chars - name_len - 1);
419    process_len = MAX2(process_len, 0);
420 
421    memset(queue, 0, sizeof(*queue));
422 
423    if (process_len) {
424       snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
425                process_len, process_name, name);
426    } else {
427       snprintf(queue->name, sizeof(queue->name), "%s", name);
428    }
429 
430    queue->create_threads_on_demand = true;
431    queue->flags = flags;
432    queue->max_threads = num_threads;
433    queue->num_threads = 1;
434    queue->max_jobs = max_jobs;
435    queue->global_data = global_data;
436 
437    (void) mtx_init(&queue->lock, mtx_plain);
438 
439    queue->num_queued = 0;
440    cnd_init(&queue->has_queued_cond);
441    cnd_init(&queue->has_space_cond);
442 
443    queue->jobs = (struct util_queue_job*)
444                  calloc(max_jobs, sizeof(struct util_queue_job));
445    if (!queue->jobs)
446       goto fail;
447 
448    queue->threads = (thrd_t*) calloc(queue->max_threads, sizeof(thrd_t));
449    if (!queue->threads)
450       goto fail;
451 
452    /* start threads */
453    for (i = 0; i < queue->num_threads; i++) {
454       if (!util_queue_create_thread(queue, i)) {
455          if (i == 0) {
456             /* no threads created, fail */
457             goto fail;
458          } else {
459             /* at least one thread created, so use it */
460             queue->num_threads = i;
461             break;
462          }
463       }
464    }
465 
466    add_to_atexit_list(queue);
467    return true;
468 
469 fail:
470    free(queue->threads);
471 
472    if (queue->jobs) {
473       cnd_destroy(&queue->has_space_cond);
474       cnd_destroy(&queue->has_queued_cond);
475       mtx_destroy(&queue->lock);
476       free(queue->jobs);
477    }
478    /* also util_queue_is_initialized can be used to check for success */
479    memset(queue, 0, sizeof(*queue));
480    return false;
481 }
482 
483 static void
util_queue_kill_threads(struct util_queue * queue,unsigned keep_num_threads,bool locked)484 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
485                         bool locked)
486 {
487    /* Signal all threads to terminate. */
488    if (!locked)
489       mtx_lock(&queue->lock);
490 
491    if (keep_num_threads >= queue->num_threads) {
492       if (!locked)
493          mtx_unlock(&queue->lock);
494       return;
495    }
496 
497    unsigned old_num_threads = queue->num_threads;
498    /* Setting num_threads is what causes the threads to terminate.
499     * Then cnd_broadcast wakes them up and they will exit their function.
500     */
501    queue->num_threads = keep_num_threads;
502    cnd_broadcast(&queue->has_queued_cond);
503 
504    /* Wait for threads to terminate. */
505    if (keep_num_threads < old_num_threads) {
506       /* We need to unlock the mutex to allow threads to terminate. */
507       mtx_unlock(&queue->lock);
508       for (unsigned i = keep_num_threads; i < old_num_threads; i++)
509          thrd_join(queue->threads[i], NULL);
510       if (locked)
511          mtx_lock(&queue->lock);
512    } else {
513       if (!locked)
514          mtx_unlock(&queue->lock);
515    }
516 }
517 
518 static void
util_queue_finish_execute(void * data,void * gdata,int num_thread)519 util_queue_finish_execute(void *data, void *gdata, int num_thread)
520 {
521    util_barrier *barrier = data;
522    if (util_barrier_wait(barrier))
523       util_barrier_destroy(barrier);
524 }
525 
526 void
util_queue_destroy(struct util_queue * queue)527 util_queue_destroy(struct util_queue *queue)
528 {
529    util_queue_kill_threads(queue, 0, false);
530 
531    /* This makes it safe to call on a queue that failed util_queue_init. */
532    if (queue->head.next != NULL)
533       remove_from_atexit_list(queue);
534 
535    cnd_destroy(&queue->has_space_cond);
536    cnd_destroy(&queue->has_queued_cond);
537    mtx_destroy(&queue->lock);
538    free(queue->jobs);
539    free(queue->threads);
540 }
541 
542 static void
util_queue_add_job_locked(struct util_queue * queue,void * job,struct util_queue_fence * fence,util_queue_execute_func execute,util_queue_execute_func cleanup,const size_t job_size,bool locked)543 util_queue_add_job_locked(struct util_queue *queue,
544                           void *job,
545                           struct util_queue_fence *fence,
546                           util_queue_execute_func execute,
547                           util_queue_execute_func cleanup,
548                           const size_t job_size,
549                           bool locked)
550 {
551    struct util_queue_job *ptr;
552 
553    if (!locked)
554       mtx_lock(&queue->lock);
555    if (queue->num_threads == 0) {
556       if (!locked)
557          mtx_unlock(&queue->lock);
558       /* well no good option here, but any leaks will be
559        * short-lived as things are shutting down..
560        */
561       return;
562    }
563 
564    if (fence)
565       util_queue_fence_reset(fence);
566 
567    assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
568 
569    /* Scale the number of threads up if there's already one job waiting. */
570    if (queue->num_queued > 0 &&
571        queue->create_threads_on_demand &&
572        execute != util_queue_finish_execute &&
573        queue->num_threads < queue->max_threads) {
574       util_queue_adjust_num_threads(queue, queue->num_threads + 1, true);
575    }
576 
577    if (queue->num_queued == queue->max_jobs) {
578       if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL &&
579           queue->total_jobs_size + job_size < S_256MB) {
580          /* If the queue is full, make it larger to avoid waiting for a free
581           * slot.
582           */
583          unsigned new_max_jobs = queue->max_jobs + 8;
584          struct util_queue_job *jobs =
585             (struct util_queue_job*)calloc(new_max_jobs,
586                                            sizeof(struct util_queue_job));
587          assert(jobs);
588 
589          /* Copy all queued jobs into the new list. */
590          unsigned num_jobs = 0;
591          unsigned i = queue->read_idx;
592 
593          do {
594             jobs[num_jobs++] = queue->jobs[i];
595             i = (i + 1) % queue->max_jobs;
596          } while (i != queue->write_idx);
597 
598          assert(num_jobs == queue->num_queued);
599 
600          free(queue->jobs);
601          queue->jobs = jobs;
602          queue->read_idx = 0;
603          queue->write_idx = num_jobs;
604          queue->max_jobs = new_max_jobs;
605       } else {
606          /* Wait until there is a free slot. */
607          while (queue->num_queued == queue->max_jobs)
608             cnd_wait(&queue->has_space_cond, &queue->lock);
609       }
610    }
611 
612    ptr = &queue->jobs[queue->write_idx];
613    assert(ptr->job == NULL);
614    ptr->job = job;
615    ptr->global_data = queue->global_data;
616    ptr->fence = fence;
617    ptr->execute = execute;
618    ptr->cleanup = cleanup;
619    ptr->job_size = job_size;
620 
621    queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
622    queue->total_jobs_size += ptr->job_size;
623 
624    queue->num_queued++;
625    cnd_signal(&queue->has_queued_cond);
626    if (!locked)
627       mtx_unlock(&queue->lock);
628 }
629 
630 void
util_queue_add_job(struct util_queue * queue,void * job,struct util_queue_fence * fence,util_queue_execute_func execute,util_queue_execute_func cleanup,const size_t job_size)631 util_queue_add_job(struct util_queue *queue,
632                    void *job,
633                    struct util_queue_fence *fence,
634                    util_queue_execute_func execute,
635                    util_queue_execute_func cleanup,
636                    const size_t job_size)
637 {
638    util_queue_add_job_locked(queue, job, fence, execute, cleanup, job_size,
639                              false);
640 }
641 
642 /**
643  * Remove a queued job. If the job hasn't started execution, it's removed from
644  * the queue. If the job has started execution, the function waits for it to
645  * complete.
646  *
647  * In all cases, the fence is signalled when the function returns.
648  *
649  * The function can be used when destroying an object associated with the job
650  * when you don't care about the job completion state.
651  */
652 void
util_queue_drop_job(struct util_queue * queue,struct util_queue_fence * fence)653 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
654 {
655    bool removed = false;
656 
657    if (util_queue_fence_is_signalled(fence))
658       return;
659 
660    mtx_lock(&queue->lock);
661    for (unsigned i = queue->read_idx; i != queue->write_idx;
662         i = (i + 1) % queue->max_jobs) {
663       if (queue->jobs[i].fence == fence) {
664          if (queue->jobs[i].cleanup)
665             queue->jobs[i].cleanup(queue->jobs[i].job, queue->global_data, -1);
666 
667          /* Just clear it. The threads will treat as a no-op job. */
668          memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
669          removed = true;
670          break;
671       }
672    }
673    mtx_unlock(&queue->lock);
674 
675    if (removed)
676       util_queue_fence_signal(fence);
677    else
678       util_queue_fence_wait(fence);
679 }
680 
681 /**
682  * Wait until all previously added jobs have completed.
683  */
684 void
util_queue_finish(struct util_queue * queue)685 util_queue_finish(struct util_queue *queue)
686 {
687    util_barrier barrier;
688    struct util_queue_fence *fences;
689 
690    /* If 2 threads were adding jobs for 2 different barries at the same time,
691     * a deadlock would happen, because 1 barrier requires that all threads
692     * wait for it exclusively.
693     */
694    mtx_lock(&queue->lock);
695 
696    /* The number of threads can be changed to 0, e.g. by the atexit handler. */
697    if (!queue->num_threads) {
698       mtx_unlock(&queue->lock);
699       return;
700    }
701 
702    /* We need to disable adding new threads in util_queue_add_job because
703     * the finish operation requires a fixed number of threads.
704     *
705     * Also note that util_queue_add_job can unlock the mutex if there is not
706     * enough space in the queue and wait for space.
707     */
708    queue->create_threads_on_demand = false;
709 
710    fences = malloc(queue->num_threads * sizeof(*fences));
711    util_barrier_init(&barrier, queue->num_threads);
712 
713    for (unsigned i = 0; i < queue->num_threads; ++i) {
714       util_queue_fence_init(&fences[i]);
715       util_queue_add_job_locked(queue, &barrier, &fences[i],
716                                 util_queue_finish_execute, NULL, 0, true);
717    }
718    queue->create_threads_on_demand = true;
719    mtx_unlock(&queue->lock);
720 
721    for (unsigned i = 0; i < queue->num_threads; ++i) {
722       util_queue_fence_wait(&fences[i]);
723       util_queue_fence_destroy(&fences[i]);
724    }
725 
726    free(fences);
727 }
728 
729 int64_t
util_queue_get_thread_time_nano(struct util_queue * queue,unsigned thread_index)730 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
731 {
732    /* Allow some flexibility by not raising an error. */
733    if (thread_index >= queue->num_threads)
734       return 0;
735 
736    return util_thread_get_time_nano(queue->threads[thread_index]);
737 }
738