1 /*
2 threadpool.h - part of lz4 project
3 Copyright (C) Yann Collet 2023
4 GPL v2 License
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 You can contact the author at :
21 - LZ4 source repository : https://github.com/lz4/lz4
22 - LZ4 public forum : https://groups.google.com/forum/#!forum/lz4c
23 */
24
25
26 /* ====== Dependencies ======= */
27 #include <assert.h>
28 #include "lz4conf.h" /* LZ4IO_MULTITHREAD */
29 #include "threadpool.h"
30
31
32 /* ====== Compiler specifics ====== */
33 #if defined(_MSC_VER)
34 # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
35 #endif
36
37 #if !LZ4IO_MULTITHREAD
38
39 /* ===================================================== */
40 /* Backup implementation with no multi-threading support */
41 /* ===================================================== */
42
43 /* Non-zero size, to ensure g_poolCtx != NULL */
44 struct TPool_s {
45 int dummy;
46 };
47 static TPool g_poolCtx;
48
TPool_create(int numThreads,int queueSize)49 TPool* TPool_create(int numThreads, int queueSize) {
50 (void)numThreads;
51 (void)queueSize;
52 return &g_poolCtx;
53 }
54
TPool_free(TPool * ctx)55 void TPool_free(TPool* ctx) {
56 assert(!ctx || ctx == &g_poolCtx);
57 (void)ctx;
58 }
59
TPool_submitJob(TPool * ctx,void (* job_function)(void *),void * arg)60 void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg) {
61 (void)ctx;
62 job_function(arg);
63 }
64
TPool_jobsCompleted(TPool * ctx)65 void TPool_jobsCompleted(TPool* ctx) {
66 assert(!ctx || ctx == &g_poolCtx);
67 (void)ctx;
68 }
69
70
71 #elif defined(_WIN32)
72
73 /* Window TPool implementation using Completion Ports */
74 #include <windows.h>
75
76 typedef struct TPool_s {
77 HANDLE completionPort;
78 HANDLE* workerThreads;
79 int nbWorkers;
80 int queueSize;
81 LONG nbPendingJobs;
82 HANDLE jobSlotAvail; /* For queue size control */
83 HANDLE allJobsCompleted; /* Event */
84 } TPool;
85
TPool_free(TPool * pool)86 void TPool_free(TPool* pool)
87 {
88 if (!pool) return;
89
90 /* Signal workers to exit by posting NULL completions */
91 { int i;
92 for (i = 0; i < pool->nbWorkers; i++) {
93 PostQueuedCompletionStatus(pool->completionPort, 0, 0, NULL);
94 }
95 }
96
97 /* Wait for worker threads to finish */
98 WaitForMultipleObjects(pool->nbWorkers, pool->workerThreads, TRUE, INFINITE);
99
100 /* Close thread handles and completion port */
101 { int i;
102 for (i = 0; i < pool->nbWorkers; i++) {
103 CloseHandle(pool->workerThreads[i]);
104 }
105 }
106 free(pool->workerThreads);
107 CloseHandle(pool->completionPort);
108
109 /* Clean up synchronization objects */
110 CloseHandle(pool->jobSlotAvail);
111 CloseHandle(pool->allJobsCompleted);
112
113 free(pool);
114 }
115
WorkerThread(LPVOID lpParameter)116 static DWORD WINAPI WorkerThread(LPVOID lpParameter)
117 {
118 TPool* const pool = (TPool*)lpParameter;
119 DWORD bytesTransferred;
120 ULONG_PTR completionKey;
121 LPOVERLAPPED overlapped;
122
123 while (GetQueuedCompletionStatus(pool->completionPort,
124 &bytesTransferred, &completionKey,
125 &overlapped, INFINITE)) {
126
127 /* End signal */
128 if (overlapped == NULL) { break; }
129
130 /* Execute job */
131 ((void (*)(void*))completionKey)(overlapped);
132
133 /* Signal job completion */
134 if (InterlockedDecrement(&pool->nbPendingJobs) == 0) {
135 SetEvent(pool->allJobsCompleted);
136 }
137 ReleaseSemaphore(pool->jobSlotAvail, 1, NULL);
138 }
139
140 return 0;
141 }
142
TPool_create(int nbWorkers,int queueSize)143 TPool* TPool_create(int nbWorkers, int queueSize)
144 {
145 TPool* pool;
146
147 /* parameters sanitization */
148 if (nbWorkers <= 0 || queueSize <= 0) return NULL;
149 if (nbWorkers>LZ4_NBWORKERS_MAX) nbWorkers=LZ4_NBWORKERS_MAX;
150
151 pool = calloc(1, sizeof(TPool));
152 if (!pool) return NULL;
153
154 /* Create completion port */
155 pool->completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nbWorkers);
156 if (!pool->completionPort) { goto _cleanup; }
157
158 /* Create worker threads */
159 pool->nbWorkers = nbWorkers;
160 pool->workerThreads = (HANDLE*)malloc(sizeof(HANDLE) * nbWorkers);
161 if (pool->workerThreads == NULL) { goto _cleanup; }
162
163 { int i;
164 for (i = 0; i < nbWorkers; i++) {
165 pool->workerThreads[i] = CreateThread(NULL, 0, WorkerThread, pool, 0, NULL);
166 if (!pool->workerThreads[i]) { goto _cleanup; }
167 }
168 }
169
170 /* Initialize sync objects members */
171 pool->queueSize = queueSize;
172 pool->nbPendingJobs = 0;
173
174 pool->jobSlotAvail = CreateSemaphore(NULL, queueSize+nbWorkers, queueSize+nbWorkers, NULL);
175 if (!pool->jobSlotAvail) { goto _cleanup; }
176
177 pool->allJobsCompleted = CreateEvent(NULL, FALSE, FALSE, NULL);
178 if (!pool->allJobsCompleted) { goto _cleanup; }
179
180 return pool;
181
182 _cleanup:
183 TPool_free(pool);
184 return NULL;
185 }
186
187
TPool_submitJob(TPool * pool,void (* job_function)(void *),void * arg)188 void TPool_submitJob(TPool* pool, void (*job_function)(void*), void* arg)
189 {
190 assert(pool);
191
192 /* Atomically increment pending jobs and check for overflow */
193 WaitForSingleObject(pool->jobSlotAvail, INFINITE);
194 ResetEvent(pool->allJobsCompleted);
195 InterlockedIncrement(&pool->nbPendingJobs);
196
197 /* Post the job directly to the completion port */
198 PostQueuedCompletionStatus(pool->completionPort,
199 0, /* Bytes transferred not used */
200 (ULONG_PTR)job_function, /* Store function pointer in completionKey */
201 (LPOVERLAPPED)arg); /* Store argument in overlapped */
202 }
203
TPool_jobsCompleted(TPool * pool)204 void TPool_jobsCompleted(TPool* pool)
205 {
206 assert(pool);
207 WaitForSingleObject(pool->allJobsCompleted, INFINITE);
208 }
209
210 #else
211
212 /* pthread availability assumed */
213 #include <stdlib.h> /* malloc, free */
214 #include <pthread.h> /* pthread_* */
215
216 /* A job is just a function with an opaque argument */
217 typedef struct TPool_job_s {
218 void (*job_function)(void*);
219 void *arg;
220 } TPool_job;
221
222 struct TPool_s {
223 pthread_t* threads;
224 size_t threadCapacity;
225 size_t threadLimit;
226
227 /* The queue is a circular buffer */
228 TPool_job* queue;
229 size_t queueHead;
230 size_t queueTail;
231 size_t queueSize;
232
233 /* The number of threads working on jobs */
234 size_t numThreadsBusy;
235 /* Indicates if the queue is empty */
236 int queueEmpty;
237
238 /* The mutex protects the queue */
239 pthread_mutex_t queueMutex;
240 /* Condition variable for pushers to wait on when the queue is full */
241 pthread_cond_t queuePushCond;
242 /* Condition variables for poppers to wait on when the queue is empty */
243 pthread_cond_t queuePopCond;
244 /* Indicates if the queue is shutting down */
245 int shutdown;
246 };
247
248 static void TPool_shutdown(TPool* ctx);
249
TPool_free(TPool * ctx)250 void TPool_free(TPool* ctx) {
251 if (!ctx) { return; }
252 TPool_shutdown(ctx);
253 pthread_mutex_destroy(&ctx->queueMutex);
254 pthread_cond_destroy(&ctx->queuePushCond);
255 pthread_cond_destroy(&ctx->queuePopCond);
256 free(ctx->queue);
257 free(ctx->threads);
258 free(ctx);
259 }
260
261 static void* TPool_thread(void* opaque);
262
TPool_create(int nbThreads,int queueSize)263 TPool* TPool_create(int nbThreads, int queueSize)
264 {
265 TPool* ctx;
266 /* Check parameters */
267 if (nbThreads<1 || queueSize<1) { return NULL; }
268 /* Allocate the context and zero initialize */
269 ctx = (TPool*)calloc(1, sizeof(TPool));
270 if (!ctx) { return NULL; }
271 /* init pthread variables */
272 { int error = 0;
273 error |= pthread_mutex_init(&ctx->queueMutex, NULL);
274 error |= pthread_cond_init(&ctx->queuePushCond, NULL);
275 error |= pthread_cond_init(&ctx->queuePopCond, NULL);
276 if (error) { TPool_free(ctx); return NULL; }
277 }
278 /* Initialize the job queue.
279 * It needs one extra space since one space is wasted to differentiate
280 * empty and full queues.
281 */
282 ctx->queueSize = (size_t)queueSize + 1;
283 ctx->queue = (TPool_job*)calloc(1, ctx->queueSize * sizeof(TPool_job));
284 if (ctx->queue == NULL) {
285 TPool_free(ctx);
286 return NULL;
287 }
288 ctx->queueHead = 0;
289 ctx->queueTail = 0;
290 ctx->numThreadsBusy = 0;
291 ctx->queueEmpty = 1;
292 ctx->shutdown = 0;
293 /* Allocate space for the thread handles */
294 ctx->threads = (pthread_t*)calloc(1, (size_t)nbThreads * sizeof(pthread_t));
295 if (ctx->threads == NULL) {
296 TPool_free(ctx);
297 return NULL;
298 }
299 ctx->threadCapacity = 0;
300 /* Initialize the threads */
301 { int i;
302 for (i = 0; i < nbThreads; ++i) {
303 if (pthread_create(&ctx->threads[i], NULL, &TPool_thread, ctx)) {
304 ctx->threadCapacity = (size_t)i;
305 TPool_free(ctx);
306 return NULL;
307 } }
308 ctx->threadCapacity = (size_t)nbThreads;
309 ctx->threadLimit = (size_t)nbThreads;
310 }
311 return ctx;
312 }
313
314 /* TPool_thread() :
315 * Work thread for the thread pool.
316 * Waits for jobs and executes them.
317 * @returns : NULL on failure else non-null.
318 */
TPool_thread(void * opaque)319 static void* TPool_thread(void* opaque) {
320 TPool* const ctx = (TPool*)opaque;
321 if (!ctx) { return NULL; }
322 for (;;) {
323 /* Lock the mutex and wait for a non-empty queue or until shutdown */
324 pthread_mutex_lock(&ctx->queueMutex);
325
326 while ( ctx->queueEmpty
327 || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
328 if (ctx->shutdown) {
329 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
330 * a few threads will be shutdown while !queueEmpty,
331 * but enough threads will remain active to finish the queue */
332 pthread_mutex_unlock(&ctx->queueMutex);
333 return opaque;
334 }
335 pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
336 }
337 /* Pop a job off the queue */
338 { TPool_job const job = ctx->queue[ctx->queueHead];
339 ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
340 ctx->numThreadsBusy++;
341 ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
342 /* Unlock the mutex, signal a pusher, and run the job */
343 pthread_cond_signal(&ctx->queuePushCond);
344 pthread_mutex_unlock(&ctx->queueMutex);
345
346 job.job_function(job.arg);
347
348 /* If the intended queue size was 0, signal after finishing job */
349 pthread_mutex_lock(&ctx->queueMutex);
350 ctx->numThreadsBusy--;
351 pthread_cond_signal(&ctx->queuePushCond);
352 pthread_mutex_unlock(&ctx->queueMutex);
353 }
354 } /* for (;;) */
355 assert(0); /* Unreachable */
356 }
357
358 /*! TPool_shutdown() :
359 Shutdown the queue, wake any sleeping threads, and join all of the threads.
360 */
TPool_shutdown(TPool * ctx)361 static void TPool_shutdown(TPool* ctx) {
362 /* Shut down the queue */
363 pthread_mutex_lock(&ctx->queueMutex);
364 ctx->shutdown = 1;
365 pthread_mutex_unlock(&ctx->queueMutex);
366 /* Wake up sleeping threads */
367 pthread_cond_broadcast(&ctx->queuePushCond);
368 pthread_cond_broadcast(&ctx->queuePopCond);
369 /* Join all of the threads */
370 { size_t i;
371 for (i = 0; i < ctx->threadCapacity; ++i) {
372 pthread_join(ctx->threads[i], NULL); /* note : could fail */
373 } }
374 }
375
376
377 /*! TPool_jobsCompleted() :
378 * Waits for all queued jobs to finish executing.
379 */
TPool_jobsCompleted(TPool * ctx)380 void TPool_jobsCompleted(TPool* ctx){
381 pthread_mutex_lock(&ctx->queueMutex);
382 while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
383 pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
384 }
385 pthread_mutex_unlock(&ctx->queueMutex);
386 }
387
388 /**
389 * Returns 1 if the queue is full and 0 otherwise.
390 *
391 * When queueSize is 1 (pool was created with an intended queueSize of 0),
392 * then a queue is empty if there is a thread free _and_ no job is waiting.
393 */
isQueueFull(TPool const * ctx)394 static int isQueueFull(TPool const* ctx) {
395 if (ctx->queueSize > 1) {
396 return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
397 } else {
398 return (ctx->numThreadsBusy == ctx->threadLimit) ||
399 !ctx->queueEmpty;
400 }
401 }
402
403 static void
TPool_submitJob_internal(TPool * ctx,void (* job_function)(void *),void * arg)404 TPool_submitJob_internal(TPool* ctx, void (*job_function)(void*), void *arg)
405 {
406 TPool_job job;
407 job.job_function = job_function;
408 job.arg = arg;
409 assert(ctx != NULL);
410 if (ctx->shutdown) return;
411
412 ctx->queueEmpty = 0;
413 ctx->queue[ctx->queueTail] = job;
414 ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
415 pthread_cond_signal(&ctx->queuePopCond);
416 }
417
TPool_submitJob(TPool * ctx,void (* job_function)(void *),void * arg)418 void TPool_submitJob(TPool* ctx, void (*job_function)(void*), void* arg)
419 {
420 assert(ctx != NULL);
421 pthread_mutex_lock(&ctx->queueMutex);
422 /* Wait until there is space in the queue for the new job */
423 while (isQueueFull(ctx) && (!ctx->shutdown)) {
424 pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
425 }
426 TPool_submitJob_internal(ctx, job_function, arg);
427 pthread_mutex_unlock(&ctx->queueMutex);
428 }
429
430 #endif /* LZ4IO_NO_MT */
431