xref: /aosp_15_r20/external/erofs-utils/lib/workqueue.c (revision 33b1fccf6a0fada2c2875d400ed01119b7676ee5)
1 // SPDX-License-Identifier: GPL-2.0+ OR Apache-2.0
2 #include <pthread.h>
3 #include <stdlib.h>
4 #include "erofs/workqueue.h"
5 
worker_thread(void * arg)6 static void *worker_thread(void *arg)
7 {
8 	struct erofs_workqueue *wq = arg;
9 	struct erofs_work *work;
10 	void *tlsp = NULL;
11 
12 	if (wq->on_start)
13 		tlsp = (wq->on_start)(wq, NULL);
14 
15 	while (true) {
16 		pthread_mutex_lock(&wq->lock);
17 
18 		while (wq->job_count == 0 && !wq->shutdown)
19 			pthread_cond_wait(&wq->cond_empty, &wq->lock);
20 		if (wq->job_count == 0 && wq->shutdown) {
21 			pthread_mutex_unlock(&wq->lock);
22 			break;
23 		}
24 
25 		work = wq->head;
26 		wq->head = work->next;
27 		if (!wq->head)
28 			wq->tail = NULL;
29 		wq->job_count--;
30 
31 		if (wq->job_count == wq->max_jobs - 1)
32 			pthread_cond_broadcast(&wq->cond_full);
33 
34 		pthread_mutex_unlock(&wq->lock);
35 		work->fn(work, tlsp);
36 	}
37 
38 	if (wq->on_exit)
39 		(void)(wq->on_exit)(wq, tlsp);
40 	return NULL;
41 }
42 
erofs_alloc_workqueue(struct erofs_workqueue * wq,unsigned int nworker,unsigned int max_jobs,erofs_wq_func_t on_start,erofs_wq_func_t on_exit)43 int erofs_alloc_workqueue(struct erofs_workqueue *wq, unsigned int nworker,
44 			  unsigned int max_jobs, erofs_wq_func_t on_start,
45 			  erofs_wq_func_t on_exit)
46 {
47 	unsigned int i;
48 	int ret;
49 
50 	if (!wq || nworker <= 0 || max_jobs <= 0)
51 		return -EINVAL;
52 
53 	wq->head = wq->tail = NULL;
54 	wq->nworker = nworker;
55 	wq->max_jobs = max_jobs;
56 	wq->job_count = 0;
57 	wq->shutdown = false;
58 	wq->on_start = on_start;
59 	wq->on_exit = on_exit;
60 	pthread_mutex_init(&wq->lock, NULL);
61 	pthread_cond_init(&wq->cond_empty, NULL);
62 	pthread_cond_init(&wq->cond_full, NULL);
63 
64 	wq->workers = malloc(nworker * sizeof(pthread_t));
65 	if (!wq->workers)
66 		return -ENOMEM;
67 
68 	for (i = 0; i < nworker; i++) {
69 		ret = pthread_create(&wq->workers[i], NULL, worker_thread, wq);
70 		if (ret) {
71 			while (i)
72 				pthread_cancel(wq->workers[--i]);
73 			free(wq->workers);
74 			return ret;
75 		}
76 	}
77 	return 0;
78 }
79 
erofs_queue_work(struct erofs_workqueue * wq,struct erofs_work * work)80 int erofs_queue_work(struct erofs_workqueue *wq, struct erofs_work *work)
81 {
82 	if (!wq || !work)
83 		return -EINVAL;
84 
85 	pthread_mutex_lock(&wq->lock);
86 
87 	while (wq->job_count == wq->max_jobs)
88 		pthread_cond_wait(&wq->cond_full, &wq->lock);
89 
90 	work->next = NULL;
91 	if (!wq->head)
92 		wq->head = work;
93 	else
94 		wq->tail->next = work;
95 	wq->tail = work;
96 	wq->job_count++;
97 
98 	pthread_cond_signal(&wq->cond_empty);
99 	pthread_mutex_unlock(&wq->lock);
100 	return 0;
101 }
102 
erofs_destroy_workqueue(struct erofs_workqueue * wq)103 int erofs_destroy_workqueue(struct erofs_workqueue *wq)
104 {
105 	unsigned int i;
106 
107 	if (!wq)
108 		return -EINVAL;
109 
110 	pthread_mutex_lock(&wq->lock);
111 	wq->shutdown = true;
112 	pthread_cond_broadcast(&wq->cond_empty);
113 	pthread_mutex_unlock(&wq->lock);
114 
115 	for (i = 0; i < wq->nworker; i++)
116 		pthread_join(wq->workers[i], NULL);
117 
118 	free(wq->workers);
119 	pthread_mutex_destroy(&wq->lock);
120 	pthread_cond_destroy(&wq->cond_empty);
121 	pthread_cond_destroy(&wq->cond_full);
122 	return 0;
123 }
124