xref: /aosp_15_r20/external/lz4/programs/threadpool.c (revision 27162e4e17433d5aa7cb38e7b6a433a09405fc7f)
1 /*
2   threadpool.h - part of lz4 project
3   Copyright (C) Yann Collet 2023
4   GPL v2 License
5 
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2 of the License, or
9   (at your option) any later version.
10 
11   This program is distributed in the hope that it will be useful,
12   but WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14   GNU General Public License for more details.
15 
16   You should have received a copy of the GNU General Public License along
17   with this program; if not, write to the Free Software Foundation, Inc.,
18   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 
20   You can contact the author at :
21   - LZ4 source repository : https://github.com/lz4/lz4
22   - LZ4 public forum : https://groups.google.com/forum/#!forum/lz4c
23 */
24 
25 
26 /* ======   Dependencies   ======= */
27 #include <assert.h>
28 #include "lz4conf.h"  /* LZ4IO_MULTITHREAD */
29 #include "threadpool.h"
30 
31 
32 /* ======   Compiler specifics   ====== */
33 #if defined(_MSC_VER)
34 #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
35 #endif
36 
37 #if !LZ4IO_MULTITHREAD
38 
39 /* ===================================================== */
40 /* Backup implementation with no multi-threading support */
41 /* ===================================================== */
42 
43 /* Non-zero size, to ensure g_poolCtx != NULL */
44 struct TPool_s {
45     int dummy;
46 };
47 static TPool g_poolCtx;
48 
TPool_create(int numThreads,int queueSize)49 TPool* TPool_create(int numThreads, int queueSize) {
50     (void)numThreads;
51     (void)queueSize;
52     return &g_poolCtx;
53 }
54 
TPool_free(TPool * ctx)55 void TPool_free(TPool* ctx) {
56     assert(!ctx || ctx == &g_poolCtx);
57     (void)ctx;
58 }
59 
TPool_submitJob(TPool * ctx,void (* job_function)(void *),void * arg)60 void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg) {
61     (void)ctx;
62     job_function(arg);
63 }
64 
TPool_jobsCompleted(TPool * ctx)65 void TPool_jobsCompleted(TPool* ctx) {
66     assert(!ctx || ctx == &g_poolCtx);
67     (void)ctx;
68 }
69 
70 
71 #elif defined(_WIN32)
72 
73 /* Window TPool implementation using Completion Ports */
74 #include <windows.h>
75 
76 typedef struct TPool_s {
77     HANDLE completionPort;
78     HANDLE* workerThreads;
79     int nbWorkers;
80     int queueSize;
81     LONG nbPendingJobs;
82     HANDLE jobSlotAvail;  /* For queue size control */
83     HANDLE allJobsCompleted; /* Event */
84 } TPool;
85 
TPool_free(TPool * pool)86 void TPool_free(TPool* pool)
87 {
88     if (!pool) return;
89 
90     /* Signal workers to exit by posting NULL completions */
91     {   int i;
92         for (i = 0; i < pool->nbWorkers; i++) {
93             PostQueuedCompletionStatus(pool->completionPort, 0, 0, NULL);
94         }
95     }
96 
97     /* Wait for worker threads to finish */
98     WaitForMultipleObjects(pool->nbWorkers, pool->workerThreads, TRUE, INFINITE);
99 
100     /* Close thread handles and completion port */
101     {   int i;
102         for (i = 0; i < pool->nbWorkers; i++) {
103             CloseHandle(pool->workerThreads[i]);
104         }
105     }
106     free(pool->workerThreads);
107     CloseHandle(pool->completionPort);
108 
109     /* Clean up synchronization objects */
110     CloseHandle(pool->jobSlotAvail);
111     CloseHandle(pool->allJobsCompleted);
112 
113     free(pool);
114 }
115 
WorkerThread(LPVOID lpParameter)116 static DWORD WINAPI WorkerThread(LPVOID lpParameter)
117 {
118     TPool* const pool = (TPool*)lpParameter;
119     DWORD bytesTransferred;
120     ULONG_PTR completionKey;
121     LPOVERLAPPED overlapped;
122 
123     while (GetQueuedCompletionStatus(pool->completionPort,
124                                     &bytesTransferred, &completionKey,
125                                     &overlapped, INFINITE)) {
126 
127         /* End signal */
128         if (overlapped == NULL) { break; }
129 
130         /* Execute job */
131         ((void (*)(void*))completionKey)(overlapped);
132 
133         /* Signal job completion */
134         if (InterlockedDecrement(&pool->nbPendingJobs) == 0) {
135             SetEvent(pool->allJobsCompleted);
136         }
137         ReleaseSemaphore(pool->jobSlotAvail, 1, NULL);
138     }
139 
140     return 0;
141 }
142 
TPool_create(int nbWorkers,int queueSize)143 TPool* TPool_create(int nbWorkers, int queueSize)
144 {
145     TPool* pool;
146 
147     /* parameters sanitization */
148     if (nbWorkers <= 0 || queueSize <= 0) return NULL;
149     if (nbWorkers>LZ4_NBWORKERS_MAX) nbWorkers=LZ4_NBWORKERS_MAX;
150 
151     pool = calloc(1, sizeof(TPool));
152     if (!pool) return NULL;
153 
154     /* Create completion port */
155     pool->completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nbWorkers);
156     if (!pool->completionPort) { goto _cleanup; }
157 
158     /* Create worker threads */
159     pool->nbWorkers = nbWorkers;
160     pool->workerThreads = (HANDLE*)malloc(sizeof(HANDLE) * nbWorkers);
161     if (pool->workerThreads == NULL) { goto _cleanup; }
162 
163     {   int i;
164         for (i = 0; i < nbWorkers; i++) {
165             pool->workerThreads[i] = CreateThread(NULL, 0, WorkerThread, pool, 0, NULL);
166             if (!pool->workerThreads[i]) { goto _cleanup; }
167         }
168     }
169 
170     /* Initialize sync objects members */
171     pool->queueSize = queueSize;
172     pool->nbPendingJobs = 0;
173 
174     pool->jobSlotAvail = CreateSemaphore(NULL, queueSize+nbWorkers, queueSize+nbWorkers, NULL);
175     if (!pool->jobSlotAvail) { goto _cleanup; }
176 
177     pool->allJobsCompleted = CreateEvent(NULL, FALSE, FALSE, NULL);
178     if (!pool->allJobsCompleted) { goto _cleanup; }
179 
180     return pool;
181 
182 _cleanup:
183     TPool_free(pool);
184     return NULL;
185 }
186 
187 
TPool_submitJob(TPool * pool,void (* job_function)(void *),void * arg)188 void TPool_submitJob(TPool* pool, void (*job_function)(void*), void* arg)
189 {
190     assert(pool);
191 
192     /* Atomically increment pending jobs and check for overflow */
193     WaitForSingleObject(pool->jobSlotAvail, INFINITE);
194     ResetEvent(pool->allJobsCompleted);
195     InterlockedIncrement(&pool->nbPendingJobs);
196 
197     /* Post the job directly to the completion port */
198     PostQueuedCompletionStatus(pool->completionPort,
199                                0, /* Bytes transferred not used */
200                                (ULONG_PTR)job_function, /* Store function pointer in completionKey */
201                                (LPOVERLAPPED)arg);      /* Store argument in overlapped */
202 }
203 
TPool_jobsCompleted(TPool * pool)204 void TPool_jobsCompleted(TPool* pool)
205 {
206     assert(pool);
207     WaitForSingleObject(pool->allJobsCompleted, INFINITE);
208 }
209 
210 #else
211 
212 /* pthread availability assumed */
213 #include <stdlib.h>  /* malloc, free */
214 #include <pthread.h> /* pthread_* */
215 
216 /* A job is just a function with an opaque argument */
217 typedef struct TPool_job_s {
218     void (*job_function)(void*);
219     void *arg;
220 } TPool_job;
221 
222 struct TPool_s {
223     pthread_t* threads;
224     size_t threadCapacity;
225     size_t threadLimit;
226 
227     /* The queue is a circular buffer */
228     TPool_job* queue;
229     size_t queueHead;
230     size_t queueTail;
231     size_t queueSize;
232 
233     /* The number of threads working on jobs */
234     size_t numThreadsBusy;
235     /* Indicates if the queue is empty */
236     int queueEmpty;
237 
238     /* The mutex protects the queue */
239     pthread_mutex_t queueMutex;
240     /* Condition variable for pushers to wait on when the queue is full */
241     pthread_cond_t queuePushCond;
242     /* Condition variables for poppers to wait on when the queue is empty */
243     pthread_cond_t queuePopCond;
244     /* Indicates if the queue is shutting down */
245     int shutdown;
246 };
247 
248 static void TPool_shutdown(TPool* ctx);
249 
TPool_free(TPool * ctx)250 void TPool_free(TPool* ctx) {
251     if (!ctx) { return; }
252     TPool_shutdown(ctx);
253     pthread_mutex_destroy(&ctx->queueMutex);
254     pthread_cond_destroy(&ctx->queuePushCond);
255     pthread_cond_destroy(&ctx->queuePopCond);
256     free(ctx->queue);
257     free(ctx->threads);
258     free(ctx);
259 }
260 
261 static void* TPool_thread(void* opaque);
262 
TPool_create(int nbThreads,int queueSize)263 TPool* TPool_create(int nbThreads, int queueSize)
264 {
265     TPool* ctx;
266     /* Check parameters */
267     if (nbThreads<1 || queueSize<1) { return NULL; }
268     /* Allocate the context and zero initialize */
269     ctx = (TPool*)calloc(1, sizeof(TPool));
270     if (!ctx) { return NULL; }
271     /* init pthread variables */
272     {   int error = 0;
273         error |= pthread_mutex_init(&ctx->queueMutex, NULL);
274         error |= pthread_cond_init(&ctx->queuePushCond, NULL);
275         error |= pthread_cond_init(&ctx->queuePopCond, NULL);
276         if (error) { TPool_free(ctx); return NULL; }
277     }
278     /* Initialize the job queue.
279      * It needs one extra space since one space is wasted to differentiate
280      * empty and full queues.
281      */
282     ctx->queueSize = (size_t)queueSize + 1;
283     ctx->queue = (TPool_job*)calloc(1, ctx->queueSize * sizeof(TPool_job));
284     if (ctx->queue == NULL) {
285         TPool_free(ctx);
286         return NULL;
287     }
288     ctx->queueHead = 0;
289     ctx->queueTail = 0;
290     ctx->numThreadsBusy = 0;
291     ctx->queueEmpty = 1;
292     ctx->shutdown = 0;
293     /* Allocate space for the thread handles */
294     ctx->threads = (pthread_t*)calloc(1, (size_t)nbThreads * sizeof(pthread_t));
295     if (ctx->threads == NULL) {
296         TPool_free(ctx);
297         return NULL;
298     }
299     ctx->threadCapacity = 0;
300     /* Initialize the threads */
301     {   int i;
302         for (i = 0; i < nbThreads; ++i) {
303             if (pthread_create(&ctx->threads[i], NULL, &TPool_thread, ctx)) {
304                 ctx->threadCapacity = (size_t)i;
305                 TPool_free(ctx);
306                 return NULL;
307         }   }
308         ctx->threadCapacity = (size_t)nbThreads;
309         ctx->threadLimit = (size_t)nbThreads;
310     }
311     return ctx;
312 }
313 
314 /* TPool_thread() :
315  * Work thread for the thread pool.
316  * Waits for jobs and executes them.
317  * @returns : NULL on failure else non-null.
318  */
TPool_thread(void * opaque)319 static void* TPool_thread(void* opaque) {
320     TPool* const ctx = (TPool*)opaque;
321     if (!ctx) { return NULL; }
322     for (;;) {
323         /* Lock the mutex and wait for a non-empty queue or until shutdown */
324         pthread_mutex_lock(&ctx->queueMutex);
325 
326         while ( ctx->queueEmpty
327             || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
328             if (ctx->shutdown) {
329                 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
330                  * a few threads will be shutdown while !queueEmpty,
331                  * but enough threads will remain active to finish the queue */
332                 pthread_mutex_unlock(&ctx->queueMutex);
333                 return opaque;
334             }
335             pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
336         }
337         /* Pop a job off the queue */
338         {   TPool_job const job = ctx->queue[ctx->queueHead];
339             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
340             ctx->numThreadsBusy++;
341             ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
342             /* Unlock the mutex, signal a pusher, and run the job */
343             pthread_cond_signal(&ctx->queuePushCond);
344             pthread_mutex_unlock(&ctx->queueMutex);
345 
346             job.job_function(job.arg);
347 
348             /* If the intended queue size was 0, signal after finishing job */
349             pthread_mutex_lock(&ctx->queueMutex);
350             ctx->numThreadsBusy--;
351             pthread_cond_signal(&ctx->queuePushCond);
352             pthread_mutex_unlock(&ctx->queueMutex);
353         }
354     }  /* for (;;) */
355     assert(0);  /* Unreachable */
356 }
357 
358 /*! TPool_shutdown() :
359     Shutdown the queue, wake any sleeping threads, and join all of the threads.
360 */
TPool_shutdown(TPool * ctx)361 static void TPool_shutdown(TPool* ctx) {
362     /* Shut down the queue */
363     pthread_mutex_lock(&ctx->queueMutex);
364     ctx->shutdown = 1;
365     pthread_mutex_unlock(&ctx->queueMutex);
366     /* Wake up sleeping threads */
367     pthread_cond_broadcast(&ctx->queuePushCond);
368     pthread_cond_broadcast(&ctx->queuePopCond);
369     /* Join all of the threads */
370     {   size_t i;
371         for (i = 0; i < ctx->threadCapacity; ++i) {
372             pthread_join(ctx->threads[i], NULL);  /* note : could fail */
373     }   }
374 }
375 
376 
377 /*! TPool_jobsCompleted() :
378  *  Waits for all queued jobs to finish executing.
379  */
TPool_jobsCompleted(TPool * ctx)380 void TPool_jobsCompleted(TPool* ctx){
381     pthread_mutex_lock(&ctx->queueMutex);
382     while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
383         pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
384     }
385     pthread_mutex_unlock(&ctx->queueMutex);
386 }
387 
388 /**
389  * Returns 1 if the queue is full and 0 otherwise.
390  *
391  * When queueSize is 1 (pool was created with an intended queueSize of 0),
392  * then a queue is empty if there is a thread free _and_ no job is waiting.
393  */
isQueueFull(TPool const * ctx)394 static int isQueueFull(TPool const* ctx) {
395     if (ctx->queueSize > 1) {
396         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
397     } else {
398         return (ctx->numThreadsBusy == ctx->threadLimit) ||
399                !ctx->queueEmpty;
400     }
401 }
402 
403 static void
TPool_submitJob_internal(TPool * ctx,void (* job_function)(void *),void * arg)404 TPool_submitJob_internal(TPool* ctx, void (*job_function)(void*), void *arg)
405 {
406     TPool_job job;
407     job.job_function = job_function;
408     job.arg = arg;
409     assert(ctx != NULL);
410     if (ctx->shutdown) return;
411 
412     ctx->queueEmpty = 0;
413     ctx->queue[ctx->queueTail] = job;
414     ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
415     pthread_cond_signal(&ctx->queuePopCond);
416 }
417 
TPool_submitJob(TPool * ctx,void (* job_function)(void *),void * arg)418 void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg)
419 {
420     assert(ctx != NULL);
421     pthread_mutex_lock(&ctx->queueMutex);
422     /* Wait until there is space in the queue for the new job */
423     while (isQueueFull(ctx) && (!ctx->shutdown)) {
424         pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
425     }
426     TPool_submitJob_internal(ctx, job_function, arg);
427     pthread_mutex_unlock(&ctx->queueMutex);
428 }
429 
430 #endif  /* LZ4IO_NO_MT */
431