xref: /aosp_15_r20/external/pthreadpool/src/pthreads.c (revision b095b0533730c2930f947df924a4486d266faa1a)
1*b095b053SXin Li /* Standard C headers */
2*b095b053SXin Li #include <assert.h>
3*b095b053SXin Li #include <limits.h>
4*b095b053SXin Li #include <stdbool.h>
5*b095b053SXin Li #include <stdint.h>
6*b095b053SXin Li #include <stdlib.h>
7*b095b053SXin Li #include <string.h>
8*b095b053SXin Li 
9*b095b053SXin Li /* Configuration header */
10*b095b053SXin Li #include "threadpool-common.h"
11*b095b053SXin Li 
12*b095b053SXin Li /* POSIX headers */
13*b095b053SXin Li #include <pthread.h>
14*b095b053SXin Li #include <unistd.h>
15*b095b053SXin Li 
16*b095b053SXin Li /* Futex-specific headers */
17*b095b053SXin Li #if PTHREADPOOL_USE_FUTEX
18*b095b053SXin Li 	#if defined(__linux__)
19*b095b053SXin Li 		#include <sys/syscall.h>
20*b095b053SXin Li 		#include <linux/futex.h>
21*b095b053SXin Li 
22*b095b053SXin Li 		/* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */
23*b095b053SXin Li 		#ifndef SYS_futex
24*b095b053SXin Li 			#define SYS_futex __NR_futex
25*b095b053SXin Li 		#endif
26*b095b053SXin Li 		#ifndef FUTEX_PRIVATE_FLAG
27*b095b053SXin Li 			#define FUTEX_PRIVATE_FLAG 128
28*b095b053SXin Li 		#endif
29*b095b053SXin Li 	#elif defined(__EMSCRIPTEN__)
30*b095b053SXin Li 		/* math.h for INFINITY constant */
31*b095b053SXin Li 		#include <math.h>
32*b095b053SXin Li 
33*b095b053SXin Li 		#include <emscripten/threading.h>
34*b095b053SXin Li 	#else
35*b095b053SXin Li 		#error "Platform-specific implementation of futex_wait and futex_wake_all required"
36*b095b053SXin Li 	#endif
37*b095b053SXin Li #endif
38*b095b053SXin Li 
39*b095b053SXin Li /* Windows-specific headers */
40*b095b053SXin Li #ifdef _WIN32
41*b095b053SXin Li 	#include <sysinfoapi.h>
42*b095b053SXin Li #endif
43*b095b053SXin Li 
44*b095b053SXin Li /* Dependencies */
45*b095b053SXin Li #if PTHREADPOOL_USE_CPUINFO
46*b095b053SXin Li 	#include <cpuinfo.h>
47*b095b053SXin Li #endif
48*b095b053SXin Li 
49*b095b053SXin Li /* Public library header */
50*b095b053SXin Li #include <pthreadpool.h>
51*b095b053SXin Li 
52*b095b053SXin Li /* Internal library headers */
53*b095b053SXin Li #include "threadpool-atomics.h"
54*b095b053SXin Li #include "threadpool-object.h"
55*b095b053SXin Li #include "threadpool-utils.h"
56*b095b053SXin Li 
57*b095b053SXin Li 
58*b095b053SXin Li #if PTHREADPOOL_USE_FUTEX
59*b095b053SXin Li 	#if defined(__linux__)
futex_wait(pthreadpool_atomic_uint32_t * address,uint32_t value)60*b095b053SXin Li 		static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
61*b095b053SXin Li 			return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL);
62*b095b053SXin Li 		}
63*b095b053SXin Li 
futex_wake_all(pthreadpool_atomic_uint32_t * address)64*b095b053SXin Li 		static int futex_wake_all(pthreadpool_atomic_uint32_t* address) {
65*b095b053SXin Li 			return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX);
66*b095b053SXin Li 		}
67*b095b053SXin Li 	#elif defined(__EMSCRIPTEN__)
futex_wait(pthreadpool_atomic_uint32_t * address,uint32_t value)68*b095b053SXin Li 		static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
69*b095b053SXin Li 			return emscripten_futex_wait((volatile void*) address, value, INFINITY);
70*b095b053SXin Li 		}
71*b095b053SXin Li 
futex_wake_all(pthreadpool_atomic_uint32_t * address)72*b095b053SXin Li 		static int futex_wake_all(pthreadpool_atomic_uint32_t* address) {
73*b095b053SXin Li 			return emscripten_futex_wake((volatile void*) address, INT_MAX);
74*b095b053SXin Li 		}
75*b095b053SXin Li 	#else
76*b095b053SXin Li 		#error "Platform-specific implementation of futex_wait and futex_wake_all required"
77*b095b053SXin Li 	#endif
78*b095b053SXin Li #endif
79*b095b053SXin Li 
checkin_worker_thread(struct pthreadpool * threadpool)80*b095b053SXin Li static void checkin_worker_thread(struct pthreadpool* threadpool) {
81*b095b053SXin Li 	#if PTHREADPOOL_USE_FUTEX
82*b095b053SXin Li 		if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) {
83*b095b053SXin Li 			pthreadpool_store_release_uint32_t(&threadpool->has_active_threads, 0);
84*b095b053SXin Li 			futex_wake_all(&threadpool->has_active_threads);
85*b095b053SXin Li 		}
86*b095b053SXin Li 	#else
87*b095b053SXin Li 		pthread_mutex_lock(&threadpool->completion_mutex);
88*b095b053SXin Li 		if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) {
89*b095b053SXin Li 			pthread_cond_signal(&threadpool->completion_condvar);
90*b095b053SXin Li 		}
91*b095b053SXin Li 		pthread_mutex_unlock(&threadpool->completion_mutex);
92*b095b053SXin Li 	#endif
93*b095b053SXin Li }
94*b095b053SXin Li 
wait_worker_threads(struct pthreadpool * threadpool)95*b095b053SXin Li static void wait_worker_threads(struct pthreadpool* threadpool) {
96*b095b053SXin Li 	/* Initial check */
97*b095b053SXin Li 	#if PTHREADPOOL_USE_FUTEX
98*b095b053SXin Li 		uint32_t has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
99*b095b053SXin Li 		if (has_active_threads == 0) {
100*b095b053SXin Li 			return;
101*b095b053SXin Li 		}
102*b095b053SXin Li 	#else
103*b095b053SXin Li 		size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
104*b095b053SXin Li 		if (active_threads == 0) {
105*b095b053SXin Li 			return;
106*b095b053SXin Li 		}
107*b095b053SXin Li 	#endif
108*b095b053SXin Li 
109*b095b053SXin Li 	/* Spin-wait */
110*b095b053SXin Li 	for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
111*b095b053SXin Li 		pthreadpool_yield();
112*b095b053SXin Li 
113*b095b053SXin Li 		#if PTHREADPOOL_USE_FUTEX
114*b095b053SXin Li 			has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
115*b095b053SXin Li 			if (has_active_threads == 0) {
116*b095b053SXin Li 				return;
117*b095b053SXin Li 			}
118*b095b053SXin Li 		#else
119*b095b053SXin Li 			active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
120*b095b053SXin Li 			if (active_threads == 0) {
121*b095b053SXin Li 				return;
122*b095b053SXin Li 			}
123*b095b053SXin Li 		#endif
124*b095b053SXin Li 	}
125*b095b053SXin Li 
126*b095b053SXin Li 	/* Fall-back to mutex/futex wait */
127*b095b053SXin Li 	#if PTHREADPOOL_USE_FUTEX
128*b095b053SXin Li 		while ((has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads)) != 0) {
129*b095b053SXin Li 			futex_wait(&threadpool->has_active_threads, 1);
130*b095b053SXin Li 		}
131*b095b053SXin Li 	#else
132*b095b053SXin Li 		pthread_mutex_lock(&threadpool->completion_mutex);
133*b095b053SXin Li 		while (pthreadpool_load_acquire_size_t(&threadpool->active_threads) != 0) {
134*b095b053SXin Li 			pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex);
135*b095b053SXin Li 		};
136*b095b053SXin Li 		pthread_mutex_unlock(&threadpool->completion_mutex);
137*b095b053SXin Li 	#endif
138*b095b053SXin Li }
139*b095b053SXin Li 
wait_for_new_command(struct pthreadpool * threadpool,uint32_t last_command,uint32_t last_flags)140*b095b053SXin Li static uint32_t wait_for_new_command(
141*b095b053SXin Li 	struct pthreadpool* threadpool,
142*b095b053SXin Li 	uint32_t last_command,
143*b095b053SXin Li 	uint32_t last_flags)
144*b095b053SXin Li {
145*b095b053SXin Li 	uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
146*b095b053SXin Li 	if (command != last_command) {
147*b095b053SXin Li 		return command;
148*b095b053SXin Li 	}
149*b095b053SXin Li 
150*b095b053SXin Li 	if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
151*b095b053SXin Li 		/* Spin-wait loop */
152*b095b053SXin Li 		for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
153*b095b053SXin Li 			pthreadpool_yield();
154*b095b053SXin Li 
155*b095b053SXin Li 			command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
156*b095b053SXin Li 			if (command != last_command) {
157*b095b053SXin Li 				return command;
158*b095b053SXin Li 			}
159*b095b053SXin Li 		}
160*b095b053SXin Li 	}
161*b095b053SXin Li 
162*b095b053SXin Li 	/* Spin-wait disabled or timed out, fall back to mutex/futex wait */
163*b095b053SXin Li 	#if PTHREADPOOL_USE_FUTEX
164*b095b053SXin Li 		do {
165*b095b053SXin Li 			futex_wait(&threadpool->command, last_command);
166*b095b053SXin Li 			command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
167*b095b053SXin Li 		} while (command == last_command);
168*b095b053SXin Li 	#else
169*b095b053SXin Li 		/* Lock the command mutex */
170*b095b053SXin Li 		pthread_mutex_lock(&threadpool->command_mutex);
171*b095b053SXin Li 		/* Read the command */
172*b095b053SXin Li 		while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == last_command) {
173*b095b053SXin Li 			/* Wait for new command */
174*b095b053SXin Li 			pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex);
175*b095b053SXin Li 		}
176*b095b053SXin Li 		/* Read a new command */
177*b095b053SXin Li 		pthread_mutex_unlock(&threadpool->command_mutex);
178*b095b053SXin Li 	#endif
179*b095b053SXin Li 	return command;
180*b095b053SXin Li }
181*b095b053SXin Li 
thread_main(void * arg)182*b095b053SXin Li static void* thread_main(void* arg) {
183*b095b053SXin Li 	struct thread_info* thread = (struct thread_info*) arg;
184*b095b053SXin Li 	struct pthreadpool* threadpool = thread->threadpool;
185*b095b053SXin Li 	uint32_t last_command = threadpool_command_init;
186*b095b053SXin Li 	struct fpu_state saved_fpu_state = { 0 };
187*b095b053SXin Li 	uint32_t flags = 0;
188*b095b053SXin Li 
189*b095b053SXin Li 	/* Check in */
190*b095b053SXin Li 	checkin_worker_thread(threadpool);
191*b095b053SXin Li 
192*b095b053SXin Li 	/* Monitor new commands and act accordingly */
193*b095b053SXin Li 	for (;;) {
194*b095b053SXin Li 		uint32_t command = wait_for_new_command(threadpool, last_command, flags);
195*b095b053SXin Li 		pthreadpool_fence_acquire();
196*b095b053SXin Li 
197*b095b053SXin Li 		flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
198*b095b053SXin Li 
199*b095b053SXin Li 		/* Process command */
200*b095b053SXin Li 		switch (command & THREADPOOL_COMMAND_MASK) {
201*b095b053SXin Li 			case threadpool_command_parallelize:
202*b095b053SXin Li 			{
203*b095b053SXin Li 				const thread_function_t thread_function =
204*b095b053SXin Li 					(thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
205*b095b053SXin Li 				if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
206*b095b053SXin Li 					saved_fpu_state = get_fpu_state();
207*b095b053SXin Li 					disable_fpu_denormals();
208*b095b053SXin Li 				}
209*b095b053SXin Li 
210*b095b053SXin Li 				thread_function(threadpool, thread);
211*b095b053SXin Li 				if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
212*b095b053SXin Li 					set_fpu_state(saved_fpu_state);
213*b095b053SXin Li 				}
214*b095b053SXin Li 				break;
215*b095b053SXin Li 			}
216*b095b053SXin Li 			case threadpool_command_shutdown:
217*b095b053SXin Li 				/* Exit immediately: the master thread is waiting on pthread_join */
218*b095b053SXin Li 				return NULL;
219*b095b053SXin Li 			case threadpool_command_init:
220*b095b053SXin Li 				/* To inhibit compiler warning */
221*b095b053SXin Li 				break;
222*b095b053SXin Li 		}
223*b095b053SXin Li 		/* Notify the master thread that we finished processing */
224*b095b053SXin Li 		checkin_worker_thread(threadpool);
225*b095b053SXin Li 		/* Update last command */
226*b095b053SXin Li 		last_command = command;
227*b095b053SXin Li 	};
228*b095b053SXin Li }
229*b095b053SXin Li 
pthreadpool_create(size_t threads_count)230*b095b053SXin Li struct pthreadpool* pthreadpool_create(size_t threads_count) {
231*b095b053SXin Li 	#if PTHREADPOOL_USE_CPUINFO
232*b095b053SXin Li 		if (!cpuinfo_initialize()) {
233*b095b053SXin Li 			return NULL;
234*b095b053SXin Li 		}
235*b095b053SXin Li 	#endif
236*b095b053SXin Li 
237*b095b053SXin Li 	if (threads_count == 0) {
238*b095b053SXin Li 		#if PTHREADPOOL_USE_CPUINFO
239*b095b053SXin Li 			threads_count = cpuinfo_get_processors_count();
240*b095b053SXin Li 		#elif defined(_SC_NPROCESSORS_ONLN)
241*b095b053SXin Li 			threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN);
242*b095b053SXin Li 			#if defined(__EMSCRIPTEN_PTHREADS__)
243*b095b053SXin Li 				/* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */
244*b095b053SXin Li 				if (threads_count >= 8) {
245*b095b053SXin Li 					threads_count = 8;
246*b095b053SXin Li 				}
247*b095b053SXin Li 			#endif
248*b095b053SXin Li 		#elif defined(_WIN32)
249*b095b053SXin Li 			SYSTEM_INFO system_info;
250*b095b053SXin Li 			ZeroMemory(&system_info, sizeof(system_info));
251*b095b053SXin Li 			GetSystemInfo(&system_info);
252*b095b053SXin Li 			threads_count = (size_t) system_info.dwNumberOfProcessors;
253*b095b053SXin Li 		#else
254*b095b053SXin Li 			#error "Platform-specific implementation of sysconf(_SC_NPROCESSORS_ONLN) required"
255*b095b053SXin Li 		#endif
256*b095b053SXin Li 	}
257*b095b053SXin Li 
258*b095b053SXin Li 	struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
259*b095b053SXin Li 	if (threadpool == NULL) {
260*b095b053SXin Li 		return NULL;
261*b095b053SXin Li 	}
262*b095b053SXin Li 	threadpool->threads_count = fxdiv_init_size_t(threads_count);
263*b095b053SXin Li 	for (size_t tid = 0; tid < threads_count; tid++) {
264*b095b053SXin Li 		threadpool->threads[tid].thread_number = tid;
265*b095b053SXin Li 		threadpool->threads[tid].threadpool = threadpool;
266*b095b053SXin Li 	}
267*b095b053SXin Li 
268*b095b053SXin Li 	/* Thread pool with a single thread computes everything on the caller thread. */
269*b095b053SXin Li 	if (threads_count > 1) {
270*b095b053SXin Li 		pthread_mutex_init(&threadpool->execution_mutex, NULL);
271*b095b053SXin Li 		#if !PTHREADPOOL_USE_FUTEX
272*b095b053SXin Li 			pthread_mutex_init(&threadpool->completion_mutex, NULL);
273*b095b053SXin Li 			pthread_cond_init(&threadpool->completion_condvar, NULL);
274*b095b053SXin Li 			pthread_mutex_init(&threadpool->command_mutex, NULL);
275*b095b053SXin Li 			pthread_cond_init(&threadpool->command_condvar, NULL);
276*b095b053SXin Li 		#endif
277*b095b053SXin Li 
278*b095b053SXin Li 		#if PTHREADPOOL_USE_FUTEX
279*b095b053SXin Li 			pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
280*b095b053SXin Li 		#endif
281*b095b053SXin Li 		pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
282*b095b053SXin Li 
283*b095b053SXin Li 		/* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
284*b095b053SXin Li 		for (size_t tid = 1; tid < threads_count; tid++) {
285*b095b053SXin Li 			pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]);
286*b095b053SXin Li 		}
287*b095b053SXin Li 
288*b095b053SXin Li 		/* Wait until all threads initialize */
289*b095b053SXin Li 		wait_worker_threads(threadpool);
290*b095b053SXin Li 	}
291*b095b053SXin Li 	return threadpool;
292*b095b053SXin Li }
293*b095b053SXin Li 
pthreadpool_parallelize(struct pthreadpool * threadpool,thread_function_t thread_function,const void * params,size_t params_size,void * task,void * context,size_t linear_range,uint32_t flags)294*b095b053SXin Li PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
295*b095b053SXin Li 	struct pthreadpool* threadpool,
296*b095b053SXin Li 	thread_function_t thread_function,
297*b095b053SXin Li 	const void* params,
298*b095b053SXin Li 	size_t params_size,
299*b095b053SXin Li 	void* task,
300*b095b053SXin Li 	void* context,
301*b095b053SXin Li 	size_t linear_range,
302*b095b053SXin Li 	uint32_t flags)
303*b095b053SXin Li {
304*b095b053SXin Li 	assert(threadpool != NULL);
305*b095b053SXin Li 	assert(thread_function != NULL);
306*b095b053SXin Li 	assert(task != NULL);
307*b095b053SXin Li 	assert(linear_range > 1);
308*b095b053SXin Li 
309*b095b053SXin Li 	/* Protect the global threadpool structures */
310*b095b053SXin Li 	pthread_mutex_lock(&threadpool->execution_mutex);
311*b095b053SXin Li 
312*b095b053SXin Li 	#if !PTHREADPOOL_USE_FUTEX
313*b095b053SXin Li 		/* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */
314*b095b053SXin Li 		pthread_mutex_lock(&threadpool->command_mutex);
315*b095b053SXin Li 	#endif
316*b095b053SXin Li 
317*b095b053SXin Li 	/* Setup global arguments */
318*b095b053SXin Li 	pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
319*b095b053SXin Li 	pthreadpool_store_relaxed_void_p(&threadpool->task, task);
320*b095b053SXin Li 	pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
321*b095b053SXin Li 	pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
322*b095b053SXin Li 
323*b095b053SXin Li 	/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
324*b095b053SXin Li 	const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
325*b095b053SXin Li 	pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */);
326*b095b053SXin Li 	#if PTHREADPOOL_USE_FUTEX
327*b095b053SXin Li 		pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
328*b095b053SXin Li 	#endif
329*b095b053SXin Li 
330*b095b053SXin Li 	if (params_size != 0) {
331*b095b053SXin Li 		memcpy(&threadpool->params, params, params_size);
332*b095b053SXin Li 		pthreadpool_fence_release();
333*b095b053SXin Li 	}
334*b095b053SXin Li 
335*b095b053SXin Li 	/* Spread the work between threads */
336*b095b053SXin Li 	const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
337*b095b053SXin Li 	size_t range_start = 0;
338*b095b053SXin Li 	for (size_t tid = 0; tid < threads_count.value; tid++) {
339*b095b053SXin Li 		struct thread_info* thread = &threadpool->threads[tid];
340*b095b053SXin Li 		const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
341*b095b053SXin Li 		const size_t range_end = range_start + range_length;
342*b095b053SXin Li 		pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
343*b095b053SXin Li 		pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
344*b095b053SXin Li 		pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
345*b095b053SXin Li 
346*b095b053SXin Li 		/* The next subrange starts where the previous ended */
347*b095b053SXin Li 		range_start = range_end;
348*b095b053SXin Li 	}
349*b095b053SXin Li 
350*b095b053SXin Li 	/*
351*b095b053SXin Li 	 * Update the threadpool command.
352*b095b053SXin Li 	 * Imporantly, do it after initializing command parameters (range, task, argument, flags)
353*b095b053SXin Li 	 * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
354*b095b053SXin Li 	 * to ensure the unmasked command is different then the last command, because worker threads
355*b095b053SXin Li 	 * monitor for change in the unmasked command.
356*b095b053SXin Li 	 */
357*b095b053SXin Li 	const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
358*b095b053SXin Li 	const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize;
359*b095b053SXin Li 
360*b095b053SXin Li 	/*
361*b095b053SXin Li 	 * Store the command with release semantics to guarantee that if a worker thread observes
362*b095b053SXin Li 	 * the new command value, it also observes the updated command parameters.
363*b095b053SXin Li 	 *
364*b095b053SXin Li 	 * Note: release semantics is necessary even with a conditional variable, because the workers might
365*b095b053SXin Li 	 * be waiting in a spin-loop rather than the conditional variable.
366*b095b053SXin Li 	 */
367*b095b053SXin Li 	pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
368*b095b053SXin Li 	#if PTHREADPOOL_USE_FUTEX
369*b095b053SXin Li 		/* Wake up the threads */
370*b095b053SXin Li 		futex_wake_all(&threadpool->command);
371*b095b053SXin Li 	#else
372*b095b053SXin Li 		/* Unlock the command variables before waking up the threads for better performance */
373*b095b053SXin Li 		pthread_mutex_unlock(&threadpool->command_mutex);
374*b095b053SXin Li 
375*b095b053SXin Li 		/* Wake up the threads */
376*b095b053SXin Li 		pthread_cond_broadcast(&threadpool->command_condvar);
377*b095b053SXin Li 	#endif
378*b095b053SXin Li 
379*b095b053SXin Li 	/* Save and modify FPU denormals control, if needed */
380*b095b053SXin Li 	struct fpu_state saved_fpu_state = { 0 };
381*b095b053SXin Li 	if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
382*b095b053SXin Li 		saved_fpu_state = get_fpu_state();
383*b095b053SXin Li 		disable_fpu_denormals();
384*b095b053SXin Li 	}
385*b095b053SXin Li 
386*b095b053SXin Li 	/* Do computations as worker #0 */
387*b095b053SXin Li 	thread_function(threadpool, &threadpool->threads[0]);
388*b095b053SXin Li 
389*b095b053SXin Li 	/* Restore FPU denormals control, if needed */
390*b095b053SXin Li 	if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
391*b095b053SXin Li 		set_fpu_state(saved_fpu_state);
392*b095b053SXin Li 	}
393*b095b053SXin Li 
394*b095b053SXin Li 	/* Wait until the threads finish computation */
395*b095b053SXin Li 	wait_worker_threads(threadpool);
396*b095b053SXin Li 
397*b095b053SXin Li 	/* Make changes by other threads visible to this thread */
398*b095b053SXin Li 	pthreadpool_fence_acquire();
399*b095b053SXin Li 
400*b095b053SXin Li 	/* Unprotect the global threadpool structures */
401*b095b053SXin Li 	pthread_mutex_unlock(&threadpool->execution_mutex);
402*b095b053SXin Li }
403*b095b053SXin Li 
pthreadpool_destroy(struct pthreadpool * threadpool)404*b095b053SXin Li void pthreadpool_destroy(struct pthreadpool* threadpool) {
405*b095b053SXin Li 	if (threadpool != NULL) {
406*b095b053SXin Li 		const size_t threads_count = threadpool->threads_count.value;
407*b095b053SXin Li 		if (threads_count > 1) {
408*b095b053SXin Li 			#if PTHREADPOOL_USE_FUTEX
409*b095b053SXin Li 				pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
410*b095b053SXin Li 				pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
411*b095b053SXin Li 
412*b095b053SXin Li 				/*
413*b095b053SXin Li 				 * Store the command with release semantics to guarantee that if a worker thread observes
414*b095b053SXin Li 				 * the new command value, it also observes the updated active_threads/has_active_threads values.
415*b095b053SXin Li 				 */
416*b095b053SXin Li 				pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
417*b095b053SXin Li 
418*b095b053SXin Li 				/* Wake up worker threads */
419*b095b053SXin Li 				futex_wake_all(&threadpool->command);
420*b095b053SXin Li 			#else
421*b095b053SXin Li 				/* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */
422*b095b053SXin Li 				pthread_mutex_lock(&threadpool->command_mutex);
423*b095b053SXin Li 
424*b095b053SXin Li 				pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
425*b095b053SXin Li 
426*b095b053SXin Li 				/*
427*b095b053SXin Li 				 * Store the command with release semantics to guarantee that if a worker thread observes
428*b095b053SXin Li 				 * the new command value, it also observes the updated active_threads value.
429*b095b053SXin Li 				 *
430*b095b053SXin Li 				 * Note: the release fence inside pthread_mutex_unlock is insufficient,
431*b095b053SXin Li 				 * because the workers might be waiting in a spin-loop rather than the conditional variable.
432*b095b053SXin Li 				 */
433*b095b053SXin Li 				pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
434*b095b053SXin Li 
435*b095b053SXin Li 				/* Wake up worker threads */
436*b095b053SXin Li 				pthread_cond_broadcast(&threadpool->command_condvar);
437*b095b053SXin Li 
438*b095b053SXin Li 				/* Commit the state changes and let workers start processing */
439*b095b053SXin Li 				pthread_mutex_unlock(&threadpool->command_mutex);
440*b095b053SXin Li 			#endif
441*b095b053SXin Li 
442*b095b053SXin Li 			/* Wait until all threads return */
443*b095b053SXin Li 			for (size_t thread = 1; thread < threads_count; thread++) {
444*b095b053SXin Li 				pthread_join(threadpool->threads[thread].thread_object, NULL);
445*b095b053SXin Li 			}
446*b095b053SXin Li 
447*b095b053SXin Li 			/* Release resources */
448*b095b053SXin Li 			pthread_mutex_destroy(&threadpool->execution_mutex);
449*b095b053SXin Li 			#if !PTHREADPOOL_USE_FUTEX
450*b095b053SXin Li 				pthread_mutex_destroy(&threadpool->completion_mutex);
451*b095b053SXin Li 				pthread_cond_destroy(&threadpool->completion_condvar);
452*b095b053SXin Li 				pthread_mutex_destroy(&threadpool->command_mutex);
453*b095b053SXin Li 				pthread_cond_destroy(&threadpool->command_condvar);
454*b095b053SXin Li 			#endif
455*b095b053SXin Li 		}
456*b095b053SXin Li 		#if PTHREADPOOL_USE_CPUINFO
457*b095b053SXin Li 			cpuinfo_deinitialize();
458*b095b053SXin Li 		#endif
459*b095b053SXin Li 		pthreadpool_deallocate(threadpool);
460*b095b053SXin Li 	}
461*b095b053SXin Li }
462