1 // SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
2 #include <pthread.h>
3 #include <stdlib.h>
4 #include "erofs/workqueue.h"
5
worker_thread(void * arg)6 static void *worker_thread(void *arg)
7 {
8 struct erofs_workqueue *wq = arg;
9 struct erofs_work *work;
10 void *tlsp = NULL;
11
12 if (wq->on_start)
13 tlsp = (wq->on_start)(wq, NULL);
14
15 while (true) {
16 pthread_mutex_lock(&wq->lock);
17
18 while (wq->job_count == 0 && !wq->shutdown)
19 pthread_cond_wait(&wq->cond_empty, &wq->lock);
20 if (wq->job_count == 0 && wq->shutdown) {
21 pthread_mutex_unlock(&wq->lock);
22 break;
23 }
24
25 work = wq->head;
26 wq->head = work->next;
27 if (!wq->head)
28 wq->tail = NULL;
29 wq->job_count--;
30
31 if (wq->job_count == wq->max_jobs - 1)
32 pthread_cond_broadcast(&wq->cond_full);
33
34 pthread_mutex_unlock(&wq->lock);
35 work->fn(work, tlsp);
36 }
37
38 if (wq->on_exit)
39 (void)(wq->on_exit)(wq, tlsp);
40 return NULL;
41 }
42
erofs_alloc_workqueue(struct erofs_workqueue * wq,unsigned int nworker,unsigned int max_jobs,erofs_wq_func_t on_start,erofs_wq_func_t on_exit)43 int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker,
44 unsigned int max_jobs, erofs_wq_func_t on_start,
45 erofs_wq_func_t on_exit)
46 {
47 unsigned int i;
48 int ret;
49
50 if (!wq || nworker <= 0 || max_jobs <= 0)
51 return -EINVAL;
52
53 wq->head = wq->tail = NULL;
54 wq->nworker = nworker;
55 wq->max_jobs = max_jobs;
56 wq->job_count = 0;
57 wq->shutdown = false;
58 wq->on_start = on_start;
59 wq->on_exit = on_exit;
60 pthread_mutex_init(&wq->lock, NULL);
61 pthread_cond_init(&wq->cond_empty, NULL);
62 pthread_cond_init(&wq->cond_full, NULL);
63
64 wq->workers = malloc(nworker * sizeof(pthread_t));
65 if (!wq->workers)
66 return -ENOMEM;
67
68 for (i = 0; i < nworker; i++) {
69 ret = pthread_create(&wq->workers[i], NULL, worker_thread, wq);
70 if (ret) {
71 while (i)
72 pthread_cancel(wq->workers[--i]);
73 free(wq->workers);
74 return ret;
75 }
76 }
77 return 0;
78 }
79
erofs_queue_work(struct erofs_workqueue * wq,struct erofs_work * work)80 int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work)
81 {
82 if (!wq || !work)
83 return -EINVAL;
84
85 pthread_mutex_lock(&wq->lock);
86
87 while (wq->job_count == wq->max_jobs)
88 pthread_cond_wait(&wq->cond_full, &wq->lock);
89
90 work->next = NULL;
91 if (!wq->head)
92 wq->head = work;
93 else
94 wq->tail->next = work;
95 wq->tail = work;
96 wq->job_count++;
97
98 pthread_cond_signal(&wq->cond_empty);
99 pthread_mutex_unlock(&wq->lock);
100 return 0;
101 }
102
erofs_destroy_workqueue(struct erofs_workqueue * wq)103 int erofs_destroy_workqueue(struct erofs_workqueue *wq)
104 {
105 unsigned int i;
106
107 if (!wq)
108 return -EINVAL;
109
110 pthread_mutex_lock(&wq->lock);
111 wq->shutdown = true;
112 pthread_cond_broadcast(&wq->cond_empty);
113 pthread_mutex_unlock(&wq->lock);
114
115 for (i = 0; i < wq->nworker; i++)
116 pthread_join(wq->workers[i], NULL);
117
118 free(wq->workers);
119 pthread_mutex_destroy(&wq->lock);
120 pthread_cond_destroy(&wq->cond_empty);
121 pthread_cond_destroy(&wq->cond_full);
122 return 0;
123 }
124