xref: /aosp_15_r20/external/OpenCL-CTS/test_common/harness/ThreadPool.cpp (revision 6467f958c7de8070b317fc65bcb0f6472e388d82)
1 //
2 // Copyright (c) 2017 The Khronos Group Inc.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //    http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 #include "ThreadPool.h"
17 #include "errorHelpers.h"
18 #include "fpcontrol.h"
19 #include <stdio.h>
20 #include <stdlib.h>
21 
22 #if defined(__APPLE__) || defined(__linux__) || defined(_WIN32)
23 // or any other POSIX system
24 
25 #include <atomic>
26 #include <vector>
27 
28 #if defined(_WIN32)
29 #include <windows.h>
30 #if defined(_MSC_VER)
31 #include <intrin.h>
32 #endif
33 #include "mingw_compat.h"
34 #include <process.h>
35 #else // !_WIN32
36 #include <pthread.h>
37 #include <unistd.h>
38 #include <sys/errno.h>
39 #ifdef __linux__
40 #include <sched.h>
41 #endif
42 #endif // !_WIN32
43 
44 // declarations
45 #ifdef _WIN32
46 void ThreadPool_WorkerFunc(void *p);
47 #else
48 void *ThreadPool_WorkerFunc(void *p);
49 #endif
50 void ThreadPool_Init(void);
51 void ThreadPool_Exit(void);
52 
53 #if defined(__MINGW32__)
54 // Mutex for implementing super heavy atomic operations if you don't have GCC or
55 // MSVC
56 CRITICAL_SECTION gAtomicLock;
57 #elif defined(__GNUC__) || defined(_MSC_VER)
58 #else
59 pthread_mutex_t gAtomicLock;
60 #endif
61 
62 #if !defined(_WIN32)
63 // Keep track of pthread_t's created in ThreadPool_Init() so they can be joined
64 // in ThreadPool_Exit() and avoid thread leaks.
65 static std::vector<pthread_t> pthreads;
66 #endif
67 
68 // Atomic add operator with mem barrier.  Mem barrier needed to protect state
69 // modified by the worker functions.
ThreadPool_AtomicAdd(volatile cl_int * a,cl_int b)70 cl_int ThreadPool_AtomicAdd(volatile cl_int *a, cl_int b)
71 {
72 #if defined(__MINGW32__)
73     // No atomics on Mingw32
74     EnterCriticalSection(&gAtomicLock);
75     cl_int old = *a;
76     *a = old + b;
77     LeaveCriticalSection(&gAtomicLock);
78     return old;
79 #elif defined(__GNUC__)
80     // GCC extension:
81     // http://gcc.gnu.org/onlinedocs/gcc/Atomic-Builtins.html#Atomic-Builtins
82     return __sync_fetch_and_add(a, b);
83     // do we need __sync_synchronize() here, too?  GCC docs are unclear whether
84     // __sync_fetch_and_add does a synchronize
85 #elif defined(_MSC_VER)
86     return (cl_int)_InterlockedExchangeAdd((volatile LONG *)a, (LONG)b);
87 #else
88 #warning Please add a atomic add implementation here, with memory barrier.  Fallback code is slow.
89     if (pthread_mutex_lock(&gAtomicLock))
90         log_error("Atomic operation failed. pthread_mutex_lock(&gAtomicLock) "
91                   "returned an error\n");
92     cl_int old = *a;
93     *a = old + b;
94     if (pthread_mutex_unlock(&gAtomicLock))
95         log_error("Failed to release gAtomicLock. Further atomic operations "
96                   "may deadlock!\n");
97     return old;
98 #endif
99 }
100 
101 #if defined(_WIN32)
102 // Uncomment the following line if Windows XP support is not required.
103 // #define HAS_INIT_ONCE_EXECUTE_ONCE 1
104 
105 #if defined(HAS_INIT_ONCE_EXECUTE_ONCE)
106 #define _INIT_ONCE INIT_ONCE
107 #define _PINIT_ONCE PINIT_ONCE
108 #define _InitOnceExecuteOnce InitOnceExecuteOnce
109 #else // !HAS_INIT_ONCE_EXECUTE_ONCE
110 
111 typedef volatile LONG _INIT_ONCE;
112 typedef _INIT_ONCE *_PINIT_ONCE;
113 typedef BOOL(CALLBACK *_PINIT_ONCE_FN)(_PINIT_ONCE, PVOID, PVOID *);
114 
115 #define _INIT_ONCE_UNINITIALIZED 0
116 #define _INIT_ONCE_IN_PROGRESS 1
117 #define _INIT_ONCE_DONE 2
118 
_InitOnceExecuteOnce(_PINIT_ONCE InitOnce,_PINIT_ONCE_FN InitFn,PVOID Parameter,LPVOID * Context)119 static BOOL _InitOnceExecuteOnce(_PINIT_ONCE InitOnce, _PINIT_ONCE_FN InitFn,
120                                  PVOID Parameter, LPVOID *Context)
121 {
122     while (*InitOnce != _INIT_ONCE_DONE)
123     {
124         if (*InitOnce != _INIT_ONCE_IN_PROGRESS
125             && _InterlockedCompareExchange(InitOnce, _INIT_ONCE_IN_PROGRESS,
126                                            _INIT_ONCE_UNINITIALIZED)
127                 == _INIT_ONCE_UNINITIALIZED)
128         {
129             InitFn(InitOnce, Parameter, Context);
130             *InitOnce = _INIT_ONCE_DONE;
131             return TRUE;
132         }
133         Sleep(1);
134     }
135     return TRUE;
136 }
137 #endif // !HAS_INIT_ONCE_EXECUTE_ONCE
138 
139 // Uncomment the following line if Windows XP support is not required.
140 // #define HAS_CONDITION_VARIABLE 1
141 
142 #if defined(HAS_CONDITION_VARIABLE)
143 #define _CONDITION_VARIABLE CONDITION_VARIABLE
144 #define _InitializeConditionVariable InitializeConditionVariable
145 #define _SleepConditionVariableCS SleepConditionVariableCS
146 #define _WakeAllConditionVariable WakeAllConditionVariable
147 #else // !HAS_CONDITION_VARIABLE
148 typedef struct
149 {
150     HANDLE mEvent; // Used to park the thread.
151     // Used to protect mWaiters, mGeneration and mReleaseCount:
152     CRITICAL_SECTION mLock[1];
153     volatile cl_int mWaiters; // Number of threads waiting on this cond var.
154     volatile cl_int mGeneration; // Wait generation count.
155     volatile cl_int mReleaseCount; // Number of releases to execute before
156                                    // reseting the event.
157 } _CONDITION_VARIABLE;
158 
159 typedef _CONDITION_VARIABLE *_PCONDITION_VARIABLE;
160 
_InitializeConditionVariable(_PCONDITION_VARIABLE cond_var)161 static void _InitializeConditionVariable(_PCONDITION_VARIABLE cond_var)
162 {
163     cond_var->mEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
164     InitializeCriticalSection(cond_var->mLock);
165     cond_var->mWaiters = 0;
166     cond_var->mGeneration = 0;
167 #if !defined(NDEBUG)
168     cond_var->mReleaseCount = 0;
169 #endif // !NDEBUG
170 }
171 
_SleepConditionVariableCS(_PCONDITION_VARIABLE cond_var,PCRITICAL_SECTION cond_lock,DWORD ignored)172 static void _SleepConditionVariableCS(_PCONDITION_VARIABLE cond_var,
173                                       PCRITICAL_SECTION cond_lock,
174                                       DWORD ignored)
175 {
176     EnterCriticalSection(cond_var->mLock);
177     cl_int generation = cond_var->mGeneration;
178     ++cond_var->mWaiters;
179     LeaveCriticalSection(cond_var->mLock);
180     LeaveCriticalSection(cond_lock);
181 
182     while (TRUE)
183     {
184         WaitForSingleObject(cond_var->mEvent, INFINITE);
185         EnterCriticalSection(cond_var->mLock);
186         BOOL done =
187             cond_var->mReleaseCount > 0 && cond_var->mGeneration != generation;
188         LeaveCriticalSection(cond_var->mLock);
189         if (done)
190         {
191             break;
192         }
193     }
194 
195     EnterCriticalSection(cond_lock);
196     EnterCriticalSection(cond_var->mLock);
197     if (--cond_var->mReleaseCount == 0)
198     {
199         ResetEvent(cond_var->mEvent);
200     }
201     --cond_var->mWaiters;
202     LeaveCriticalSection(cond_var->mLock);
203 }
204 
_WakeAllConditionVariable(_PCONDITION_VARIABLE cond_var)205 static void _WakeAllConditionVariable(_PCONDITION_VARIABLE cond_var)
206 {
207     EnterCriticalSection(cond_var->mLock);
208     if (cond_var->mWaiters > 0)
209     {
210         ++cond_var->mGeneration;
211         cond_var->mReleaseCount = cond_var->mWaiters;
212         SetEvent(cond_var->mEvent);
213     }
214     LeaveCriticalSection(cond_var->mLock);
215 }
216 #endif // !HAS_CONDITION_VARIABLE
217 #endif // _WIN32
218 
219 #define MAX_COUNT (1 << 29)
220 
221 // Global state to coordinate whether the threads have been launched
222 // successfully or not
223 #if defined(_MSC_VER) && (_WIN32_WINNT >= 0x600)
224 static _INIT_ONCE threadpool_init_control;
225 #elif defined(_WIN32) // MingW of XP
226 static int threadpool_init_control;
227 #else // Posix platforms
228 pthread_once_t threadpool_init_control = PTHREAD_ONCE_INIT;
229 #endif
230 cl_int threadPoolInitErr = -1; // set to CL_SUCCESS on successful thread launch
231 
232 // critical region lock around ThreadPool_Do.  We can only run one ThreadPool_Do
233 // at a time, because we are too lazy to set up a queue here, and don't expect
234 // to need one.
235 #if defined(_WIN32)
236 CRITICAL_SECTION gThreadPoolLock[1];
237 #else // !_WIN32
238 pthread_mutex_t gThreadPoolLock;
239 #endif // !_WIN32
240 
241 // Condition variable to park ThreadPool threads when not working
242 #if defined(_WIN32)
243 CRITICAL_SECTION cond_lock[1];
244 _CONDITION_VARIABLE cond_var[1];
245 #else // !_WIN32
246 pthread_mutex_t cond_lock;
247 pthread_cond_t cond_var;
248 #endif // !_WIN32
249 
250 // Condition variable state. How many iterations on the function left to run,
251 // set to CL_INT_MAX to cause worker threads to exit. Note: this value might
252 // go negative.
253 std::atomic<cl_int> gRunCount{ 0 };
254 
255 // State that only changes when the threadpool is not working.
256 volatile TPFuncPtr gFunc_ptr = NULL;
257 volatile void *gUserInfo = NULL;
258 volatile cl_int gJobCount = 0;
259 
260 // State that may change while the thread pool is working
261 volatile cl_int jobError = CL_SUCCESS; // err code return for the job as a whole
262 
263 // Condition variable to park caller while waiting
264 #if defined(_WIN32)
265 HANDLE caller_event;
266 #else // !_WIN32
267 pthread_mutex_t caller_cond_lock;
268 pthread_cond_t caller_cond_var;
269 #endif // !_WIN32
270 
271 // # of threads intended to be running. Running threads will decrement this
272 // as they discover they've run out of work to do.
273 std::atomic<cl_int> gRunning{ 0 };
274 
275 // The total number of threads launched.
276 std::atomic<cl_int> gThreadCount{ 0 };
277 
278 #ifdef _WIN32
ThreadPool_WorkerFunc(void * p)279 void ThreadPool_WorkerFunc(void *p)
280 #else
281 void *ThreadPool_WorkerFunc(void *p)
282 #endif
283 {
284     auto &tid = *static_cast<std::atomic<cl_uint> *>(p);
285     cl_uint threadID = tid++;
286     cl_int item = gRunCount--;
287 
288     while (MAX_COUNT > item)
289     {
290         cl_int err;
291 
292         // check for more work to do
293         if (0 >= item)
294         {
295             // No work to do. Attempt to block waiting for work
296 #if defined(_WIN32)
297             EnterCriticalSection(cond_lock);
298 #else // !_WIN32
299             if ((err = pthread_mutex_lock(&cond_lock)))
300             {
301                 log_error(
302                     "Error %d from pthread_mutex_lock. Worker %d unable to "
303                     "block waiting for work. ThreadPool_WorkerFunc failed.\n",
304                     err, threadID);
305                 goto exit;
306             }
307 #endif // !_WIN32
308 
309             cl_int remaining = gRunning--;
310             if (1 == remaining)
311             { // last thread out signal the main thread to wake up
312 #if defined(_WIN32)
313                 SetEvent(caller_event);
314 #else // !_WIN32
315                 if ((err = pthread_mutex_lock(&caller_cond_lock)))
316                 {
317                     log_error("Error %d from pthread_mutex_lock. Unable to "
318                               "wake caller.\n",
319                               err);
320                     goto exit;
321                 }
322                 if ((err = pthread_cond_broadcast(&caller_cond_var)))
323                 {
324                     log_error(
325                         "Error %d from pthread_cond_broadcast. Unable to wake "
326                         "up main thread. ThreadPool_WorkerFunc failed.\n",
327                         err);
328                     goto exit;
329                 }
330                 if ((err = pthread_mutex_unlock(&caller_cond_lock)))
331                 {
332                     log_error("Error %d from pthread_mutex_lock. Unable to "
333                               "wake caller.\n",
334                               err);
335                     goto exit;
336                 }
337 #endif // !_WIN32
338             }
339 
340             // loop in case we are woken only to discover that some other thread
341             // already did all the work
342             while (0 >= item)
343             {
344 #if defined(_WIN32)
345                 _SleepConditionVariableCS(cond_var, cond_lock, INFINITE);
346 #else // !_WIN32
347                 if ((err = pthread_cond_wait(&cond_var, &cond_lock)))
348                 {
349                     log_error(
350                         "Error %d from pthread_cond_wait. Unable to block for "
351                         "waiting for work. ThreadPool_WorkerFunc failed.\n",
352                         err);
353                     pthread_mutex_unlock(&cond_lock);
354                     goto exit;
355                 }
356 #endif // !_WIN32
357 
358                 // try again to get a valid item id
359                 item = gRunCount--;
360                 if (MAX_COUNT <= item) // exit if we are done
361                 {
362 #if defined(_WIN32)
363                     LeaveCriticalSection(cond_lock);
364 #else // !_WIN32
365                     pthread_mutex_unlock(&cond_lock);
366 #endif // !_WIN32
367                     goto exit;
368                 }
369             }
370 
371             gRunning++;
372 
373 #if defined(_WIN32)
374             LeaveCriticalSection(cond_lock);
375 #else // !_WIN32
376             if ((err = pthread_mutex_unlock(&cond_lock)))
377             {
378                 log_error(
379                     "Error %d from pthread_mutex_unlock. Unable to block for "
380                     "waiting for work. ThreadPool_WorkerFunc failed.\n",
381                     err);
382                 goto exit;
383             }
384 #endif // !_WIN32
385         }
386 
387         // we have a valid item, so do the work
388         // but only if we haven't already encountered an error
389         if (CL_SUCCESS == jobError)
390         {
391             // log_info("Thread %d doing job %d\n", threadID, item - 1);
392 
393 #if defined(__APPLE__) && defined(__arm__)
394             // On most platforms which support denorm, default is FTZ off.
395             // However, on some hardware where the reference is computed,
396             // default might be flush denorms to zero e.g. arm. This creates
397             // issues in result verification. Since spec allows the
398             // implementation to either flush or not flush denorms to zero, an
399             // implementation may choose not be flush i.e. return denorm result
400             // whereas reference result may be zero (flushed denorm). Hence we
401             // need to disable denorm flushing on host side where reference is
402             // being computed to make sure we get non-flushed reference result.
403             // If implementation returns flushed result, we correctly take care
404             // of that in verification code.
405             FPU_mode_type oldMode;
406             DisableFTZ(&oldMode);
407 #endif
408 
409             // Call the user's function with this item ID
410             err = gFunc_ptr(item - 1, threadID, (void *)gUserInfo);
411 #if defined(__APPLE__) && defined(__arm__)
412             // Restore FP state
413             RestoreFPState(&oldMode);
414 #endif
415 
416             if (err)
417             {
418 #if (__MINGW32__)
419                 EnterCriticalSection(&gAtomicLock);
420                 if (jobError == CL_SUCCESS) jobError = err;
421                 gRunCount = 0;
422                 LeaveCriticalSection(&gAtomicLock);
423 #elif defined(__GNUC__)
424                 // GCC extension:
425                 // http://gcc.gnu.org/onlinedocs/gcc/Atomic-Builtins.html#Atomic-Builtins
426                 // set the new error if we are the first one there.
427                 __sync_val_compare_and_swap(&jobError, CL_SUCCESS, err);
428 
429                 // drop run count to 0
430                 gRunCount = 0;
431                 __sync_synchronize();
432 #elif defined(_MSC_VER)
433                 // set the new error if we are the first one there.
434                 _InterlockedCompareExchange((volatile LONG *)&jobError, err,
435                                             CL_SUCCESS);
436 
437                 // drop run count to 0
438                 gRunCount = 0;
439                 _mm_mfence();
440 #else
441                 if (pthread_mutex_lock(&gAtomicLock))
442                     log_error(
443                         "Atomic operation failed. "
444                         "pthread_mutex_lock(&gAtomicLock) returned an error\n");
445                 if (jobError == CL_SUCCESS) jobError = err;
446                 gRunCount = 0;
447                 if (pthread_mutex_unlock(&gAtomicLock))
448                     log_error("Failed to release gAtomicLock. Further atomic "
449                               "operations may deadlock\n");
450 #endif
451             }
452         }
453 
454         // get the next item
455         item = gRunCount--;
456     }
457 
458 exit:
459     log_info("ThreadPool: thread %d exiting.\n", threadID);
460     gThreadCount--;
461 #if !defined(_WIN32)
462     return NULL;
463 #endif
464 }
465 
466 // SetThreadCount() may be used to artifically set the number of worker threads
467 // If the value is 0 (the default) the number of threads will be determined
468 // based on the number of CPU cores.  If it is a unicore machine, then 2 will be
469 // used, so that we still get some testing for thread safety.
470 //
471 // If count < 2 or the CL_TEST_SINGLE_THREADED environment variable is set then
472 // the code will run single threaded, but will report an error to indicate that
473 // the test is invalid.  This option is intended for debugging purposes only. It
474 // is suggested as a convention that test apps set the thread count to 1 in
475 // response to the -m flag.
476 //
477 // SetThreadCount() must be called before the first call to GetThreadCount() or
478 // ThreadPool_Do(), otherwise the behavior is indefined.
SetThreadCount(int count)479 void SetThreadCount(int count)
480 {
481     if (threadPoolInitErr == CL_SUCCESS)
482     {
483         log_error("Error: It is illegal to set the thread count after the "
484                   "first call to ThreadPool_Do or GetThreadCount\n");
485         abort();
486     }
487 
488     gThreadCount = count;
489 }
490 
ThreadPool_Init(void)491 void ThreadPool_Init(void)
492 {
493     cl_int i;
494     int err;
495     std::atomic<cl_uint> threadID{ 0 };
496 
497     // Check for manual override of multithreading code. We add this for better
498     // debuggability.
499     if (getenv("CL_TEST_SINGLE_THREADED"))
500     {
501         log_error("ERROR: CL_TEST_SINGLE_THREADED is set in the environment. "
502                   "Running single threaded.\n*** TEST IS INVALID! ***\n");
503         gThreadCount = 1;
504         return;
505     }
506 
507     // Figure out how many threads to run -- check first for non-zero to give
508     // the implementation the chance
509     if (0 == gThreadCount)
510     {
511 #if defined(_MSC_VER) || defined(__MINGW64__)
512         PSYSTEM_LOGICAL_PROCESSOR_INFORMATION buffer = NULL;
513         DWORD length = 0;
514 
515         GetLogicalProcessorInformation(NULL, &length);
516         buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION)malloc(length);
517         if (buffer != NULL)
518         {
519             if (GetLogicalProcessorInformation(buffer, &length) == TRUE)
520             {
521                 PSYSTEM_LOGICAL_PROCESSOR_INFORMATION ptr = buffer;
522                 while (
523                     ptr
524                     < &buffer[length
525                               / sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION)])
526                 {
527                     if (ptr->Relationship == RelationProcessorCore)
528                     {
529                         // Count the number of bits in ProcessorMask (number of
530                         // logical cores)
531                         ULONG_PTR mask = ptr->ProcessorMask;
532                         while (mask)
533                         {
534                             ++gThreadCount;
535                             mask &= mask - 1; // Remove 1 bit at a time
536                         }
537                     }
538                     ++ptr;
539                 }
540             }
541             free(buffer);
542         }
543 #elif defined(__MINGW32__)
544         {
545 #warning How about this, instead of hard coding it to 2?
546             SYSTEM_INFO sysinfo;
547             GetSystemInfo(&sysinfo);
548             gThreadCount = sysinfo.dwNumberOfProcessors;
549         }
550 #elif defined(__linux__) && !defined(__ANDROID__)
551         cpu_set_t affinity;
552         if (0 == sched_getaffinity(0, sizeof(cpu_set_t), &affinity))
553         {
554 #if !(defined(CPU_COUNT))
555             gThreadCount = 1;
556 #else
557             gThreadCount = CPU_COUNT(&affinity);
558 #endif
559         }
560         else
561         {
562             // Hopefully your system returns logical cpus here, as does MacOS X
563             gThreadCount = (cl_int)sysconf(_SC_NPROCESSORS_CONF);
564         }
565 #else /* !_WIN32 */
566         // Hopefully your system returns logical cpus here, as does MacOS X
567         gThreadCount = (cl_int)sysconf(_SC_NPROCESSORS_CONF);
568 #endif // !_WIN32
569 
570         // Multithreaded tests are required to run multithreaded even on unicore
571         // systems so as to test thread safety
572         if (1 == gThreadCount) gThreadCount = 2;
573     }
574 
575 // When working in 32 bit limit the thread number to 12
576 // This fix was made due to memory issues in integer_ops test
577 // When running integer_ops, the test opens as many threads as the
578 // machine has and each thread allocates a fixed amount of memory
579 // When running this test on dual socket machine in 32-bit, the
580 // process memory is not sufficient and the test fails
581 #if defined(_WIN32) && !defined(_M_X64)
582     if (gThreadCount > 12)
583     {
584         gThreadCount = 12;
585     }
586 #endif
587 
588     // Allow the app to set thread count to <0 for debugging purposes.
589     // This will cause the test to run single threaded.
590     if (gThreadCount < 2)
591     {
592         log_error("ERROR: Running single threaded because thread count < 2. "
593                   "\n*** TEST IS INVALID! ***\n");
594         gThreadCount = 1;
595         return;
596     }
597 
598 #if defined(_WIN32)
599     InitializeCriticalSection(gThreadPoolLock);
600     InitializeCriticalSection(cond_lock);
601     _InitializeConditionVariable(cond_var);
602     caller_event = CreateEvent(NULL, FALSE, FALSE, NULL);
603 #elif defined(__GNUC__)
604     // Dont rely on PTHREAD_MUTEX_INITIALIZER for intialization of a mutex since
605     // it might cause problem with some flavors of gcc compilers.
606     pthread_cond_init(&cond_var, NULL);
607     pthread_mutex_init(&cond_lock, NULL);
608     pthread_cond_init(&caller_cond_var, NULL);
609     pthread_mutex_init(&caller_cond_lock, NULL);
610     pthread_mutex_init(&gThreadPoolLock, NULL);
611 #endif
612 
613 #if !(defined(__GNUC__) || defined(_MSC_VER) || defined(__MINGW32__))
614     pthread_mutex_initialize(gAtomicLock);
615 #elif defined(__MINGW32__)
616     InitializeCriticalSection(&gAtomicLock);
617 #endif
618     // Make sure the last thread done in the work pool doesn't signal us to wake
619     // before we get to the point where we are supposed to wait
620     //  That would cause a deadlock.
621 #if !defined(_WIN32)
622     if ((err = pthread_mutex_lock(&caller_cond_lock)))
623     {
624         log_error("Error %d from pthread_mutex_lock. Unable to block for work "
625                   "to finish. ThreadPool_Init failed.\n",
626                   err);
627         gThreadCount = 1;
628         return;
629     }
630 #endif // !_WIN32
631 
632     gRunning = gThreadCount.load();
633     // init threads
634     for (i = 0; i < gThreadCount; i++)
635     {
636 #if defined(_WIN32)
637         uintptr_t handle =
638             _beginthread(ThreadPool_WorkerFunc, 0, (void *)&threadID);
639         err = (handle == 0);
640 #else // !_WIN32
641         pthread_t tid = 0;
642         err = pthread_create(&tid, NULL, ThreadPool_WorkerFunc,
643                              (void *)&threadID);
644 #endif // !_WIN32
645         if (err)
646         {
647             log_error("Error %d launching thread %d\n", err, i);
648             threadPoolInitErr = err;
649             gThreadCount = i;
650             break;
651         }
652 #if !defined(_WIN32)
653         pthreads.push_back(tid);
654 #endif // !_WIN32
655     }
656 
657     atexit(ThreadPool_Exit);
658 
659     // block until they are done launching.
660     do
661     {
662 #if defined(_WIN32)
663         WaitForSingleObject(caller_event, INFINITE);
664 #else // !_WIN32
665         if ((err = pthread_cond_wait(&caller_cond_var, &caller_cond_lock)))
666         {
667             log_error("Error %d from pthread_cond_wait. Unable to block for "
668                       "work to finish. ThreadPool_Init failed.\n",
669                       err);
670             pthread_mutex_unlock(&caller_cond_lock);
671             return;
672         }
673 #endif // !_WIN32
674     } while (gRunCount != -gThreadCount);
675 #if !defined(_WIN32)
676     if ((err = pthread_mutex_unlock(&caller_cond_lock)))
677     {
678         log_error("Error %d from pthread_mutex_unlock. Unable to block for "
679                   "work to finish. ThreadPool_Init failed.\n",
680                   err);
681         return;
682     }
683 #endif // !_WIN32
684 
685     threadPoolInitErr = CL_SUCCESS;
686 }
687 
688 #if defined(_MSC_VER)
_ThreadPool_Init(_PINIT_ONCE InitOnce,PVOID Parameter,PVOID * lpContex)689 static BOOL CALLBACK _ThreadPool_Init(_PINIT_ONCE InitOnce, PVOID Parameter,
690                                       PVOID *lpContex)
691 {
692     ThreadPool_Init();
693     return TRUE;
694 }
695 #endif
696 
ThreadPool_Exit(void)697 void ThreadPool_Exit(void)
698 {
699     gRunCount = CL_INT_MAX;
700 
701 #if defined(__GNUC__)
702     // GCC extension:
703     // http://gcc.gnu.org/onlinedocs/gcc/Atomic-Builtins.html#Atomic-Builtins
704     __sync_synchronize();
705 #elif defined(_MSC_VER)
706     _mm_mfence();
707 #else
708 #warning If this is a weakly ordered memory system, please add a memory barrier here to force this and everything else to memory before we proceed
709 #endif
710 
711     // spin waiting for threads to die
712     for (int count = 0; 0 != gThreadCount && count < 1000; count++)
713     {
714 #if defined(_WIN32)
715         _WakeAllConditionVariable(cond_var);
716         Sleep(1);
717 #else // !_WIN32
718         if (int err = pthread_cond_broadcast(&cond_var))
719         {
720             log_error("Error %d from pthread_cond_broadcast. Unable to wake up "
721                       "work threads. ThreadPool_Exit failed.\n",
722                       err);
723             break;
724         }
725         usleep(1000);
726 #endif // !_WIN32
727     }
728 
729     if (gThreadCount)
730         log_error("Error: Thread pool timed out after 1 second with %d threads "
731                   "still active.\n",
732                   gThreadCount.load());
733     else
734     {
735 #if !defined(_WIN32)
736         for (pthread_t pthread : pthreads)
737         {
738             if (int err = pthread_join(pthread, nullptr))
739             {
740                 log_error("Error from %d from pthread_join. Unable to join "
741                           "work threads. ThreadPool_Exit failed.\n",
742                           err);
743             }
744         }
745 #endif
746         log_info("Thread pool exited in a orderly fashion.\n");
747     }
748 }
749 
750 
751 // Blocking API that farms out count jobs to a thread pool.
752 // It may return with some work undone if func_ptr() returns a non-zero
753 // result.
754 //
755 // This function obviously has its shortcommings. Only one call to ThreadPool_Do
756 // can be running at a time. It is not intended for general purpose use.
757 // If clEnqueueNativeKernelFn, out of order queues and a CL_DEVICE_TYPE_CPU were
758 // all available then it would make more sense to use those features.
ThreadPool_Do(TPFuncPtr func_ptr,cl_uint count,void * userInfo)759 cl_int ThreadPool_Do(TPFuncPtr func_ptr, cl_uint count, void *userInfo)
760 {
761 #ifndef _WIN32
762     cl_int newErr;
763 #endif
764     cl_int err = 0;
765     // Lazily set up our threads
766 #if defined(_MSC_VER) && (_WIN32_WINNT >= 0x600)
767     err = !_InitOnceExecuteOnce(&threadpool_init_control, _ThreadPool_Init,
768                                 NULL, NULL);
769 #elif defined(_WIN32)
770     if (threadpool_init_control == 0)
771     {
772 #warning This is buggy and race prone.  Find a better way.
773         ThreadPool_Init();
774         threadpool_init_control = 1;
775     }
776 #else // posix platform
777     err = pthread_once(&threadpool_init_control, ThreadPool_Init);
778     if (err)
779     {
780         log_error("Error %d from pthread_once. Unable to init threads. "
781                   "ThreadPool_Do failed.\n",
782                   err);
783         return err;
784     }
785 #endif
786     // Single threaded code to handle case where threadpool wasn't allocated or
787     // was disabled by environment variable
788     if (threadPoolInitErr)
789     {
790         cl_uint currentJob = 0;
791         cl_int result = CL_SUCCESS;
792 
793 #if defined(__APPLE__) && defined(__arm__)
794         // On most platforms which support denorm, default is FTZ off. However,
795         // on some hardware where the reference is computed, default might be
796         // flush denorms to zero e.g. arm. This creates issues in result
797         // verification. Since spec allows the implementation to either flush or
798         // not flush denorms to zero, an implementation may choose not be flush
799         // i.e. return denorm result whereas reference result may be zero
800         // (flushed denorm). Hence we need to disable denorm flushing on host
801         // side where reference is being computed to make sure we get
802         // non-flushed reference result. If implementation returns flushed
803         // result, we correctly take care of that in verification code.
804         FPU_mode_type oldMode;
805         DisableFTZ(&oldMode);
806 #endif
807         for (currentJob = 0; currentJob < count; currentJob++)
808             if ((result = func_ptr(currentJob, 0, userInfo)))
809             {
810 #if defined(__APPLE__) && defined(__arm__)
811                 // Restore FP state before leaving
812                 RestoreFPState(&oldMode);
813 #endif
814                 return result;
815             }
816 
817 #if defined(__APPLE__) && defined(__arm__)
818         // Restore FP state before leaving
819         RestoreFPState(&oldMode);
820 #endif
821 
822         return CL_SUCCESS;
823     }
824 
825     if (count >= MAX_COUNT)
826     {
827         log_error(
828             "Error: ThreadPool_Do count %d >= max threadpool count of %d\n",
829             count, MAX_COUNT);
830         return -1;
831     }
832 
833     // Enter critical region
834 #if defined(_WIN32)
835     EnterCriticalSection(gThreadPoolLock);
836 #else // !_WIN32
837     if ((err = pthread_mutex_lock(&gThreadPoolLock)))
838     {
839         switch (err)
840         {
841             case EDEADLK:
842                 log_error(
843                     "Error EDEADLK returned in ThreadPool_Do(). ThreadPool_Do "
844                     "is not designed to work recursively!\n");
845                 break;
846             case EINVAL:
847                 log_error("Error EINVAL returned in ThreadPool_Do(). How did "
848                           "we end up with an invalid gThreadPoolLock?\n");
849                 break;
850             default: break;
851         }
852         return err;
853     }
854 #endif // !_WIN32
855 
856     // Start modifying the job state observable by worker threads
857 #if defined(_WIN32)
858     EnterCriticalSection(cond_lock);
859 #else // !_WIN32
860     if ((err = pthread_mutex_lock(&cond_lock)))
861     {
862         log_error("Error %d from pthread_mutex_lock. Unable to wake up work "
863                   "threads. ThreadPool_Do failed.\n",
864                   err);
865         goto exit;
866     }
867 #endif // !_WIN32
868 
869     // Make sure the last thread done in the work pool doesn't signal us to wake
870     // before we get to the point where we are supposed to wait
871     //  That would cause a deadlock.
872 #if !defined(_WIN32)
873     if ((err = pthread_mutex_lock(&caller_cond_lock)))
874     {
875         log_error("Error %d from pthread_mutex_lock. Unable to block for work "
876                   "to finish. ThreadPool_Do failed.\n",
877                   err);
878         goto exit;
879     }
880 #endif // !_WIN32
881 
882     // Prime the worker threads to get going
883     jobError = CL_SUCCESS;
884     gRunCount = gJobCount = count;
885     gFunc_ptr = func_ptr;
886     gUserInfo = userInfo;
887 
888 #if defined(_WIN32)
889     ResetEvent(caller_event);
890     _WakeAllConditionVariable(cond_var);
891     LeaveCriticalSection(cond_lock);
892 #else // !_WIN32
893     if ((err = pthread_cond_broadcast(&cond_var)))
894     {
895         log_error("Error %d from pthread_cond_broadcast. Unable to wake up "
896                   "work threads. ThreadPool_Do failed.\n",
897                   err);
898         goto exit;
899     }
900     if ((err = pthread_mutex_unlock(&cond_lock)))
901     {
902         log_error("Error %d from pthread_mutex_unlock. Unable to wake up work "
903                   "threads. ThreadPool_Do failed.\n",
904                   err);
905         goto exit;
906     }
907 #endif // !_WIN32
908 
909     // block until they are done.  It would be slightly more efficient to do
910     // some of the work here though.
911     do
912     {
913 #if defined(_WIN32)
914         WaitForSingleObject(caller_event, INFINITE);
915 #else // !_WIN32
916         if ((err = pthread_cond_wait(&caller_cond_var, &caller_cond_lock)))
917         {
918             log_error("Error %d from pthread_cond_wait. Unable to block for "
919                       "work to finish. ThreadPool_Do failed.\n",
920                       err);
921             pthread_mutex_unlock(&caller_cond_lock);
922             goto exit;
923         }
924 #endif // !_WIN32
925     } while (gRunning);
926 #if !defined(_WIN32)
927     if ((err = pthread_mutex_unlock(&caller_cond_lock)))
928     {
929         log_error("Error %d from pthread_mutex_unlock. Unable to block for "
930                   "work to finish. ThreadPool_Do failed.\n",
931                   err);
932         goto exit;
933     }
934 #endif // !_WIN32
935 
936     err = jobError;
937 
938 #ifndef _WIN32
939 exit:
940 #endif
941     // exit critical region
942 #if defined(_WIN32)
943     LeaveCriticalSection(gThreadPoolLock);
944 #else // !_WIN32
945     newErr = pthread_mutex_unlock(&gThreadPoolLock);
946     if (newErr)
947     {
948         log_error("Error %d from pthread_mutex_unlock. Unable to exit critical "
949                   "region. ThreadPool_Do failed.\n",
950                   newErr);
951         return err;
952     }
953 #endif // !_WIN32
954 
955     return err;
956 }
957 
GetThreadCount(void)958 cl_uint GetThreadCount(void)
959 {
960     // Lazily set up our threads
961 #if defined(_MSC_VER) && (_WIN32_WINNT >= 0x600)
962     cl_int err = !_InitOnceExecuteOnce(&threadpool_init_control,
963                                        _ThreadPool_Init, NULL, NULL);
964 #elif defined(_WIN32)
965     if (threadpool_init_control == 0)
966     {
967 #warning This is buggy and race prone.  Find a better way.
968         ThreadPool_Init();
969         threadpool_init_control = 1;
970     }
971 #else
972     cl_int err = pthread_once(&threadpool_init_control, ThreadPool_Init);
973     if (err)
974     {
975         log_error("Error %d from pthread_once. Unable to init threads. "
976                   "ThreadPool_Do failed.\n",
977                   err);
978         return err;
979     }
980 #endif // !_WIN32
981 
982     if (gThreadCount < 1) return 1;
983 
984     return gThreadCount;
985 }
986 
987 #else
988 
989 #ifndef MY_OS_REALLY_REALLY_DOESNT_SUPPORT_THREADS
990 #error ThreadPool implementation has not been multithreaded for this operating system. You must multithread this section.
991 #endif
992 //
993 // We require multithreading in parts of the test as a means of simultaneously
994 // testing reentrancy requirements of OpenCL API, while also checking
995 //
996 // A sample single threaded implementation follows, for documentation /
997 // bootstrapping purposes. It is not okay to use this for conformance testing!!!
998 //
999 // Exception:  If your operating system does not support multithreaded execution
1000 // of any kind, then you may use this code.
1001 //
1002 
ThreadPool_AtomicAdd(volatile cl_int * a,cl_int b)1003 cl_int ThreadPool_AtomicAdd(volatile cl_int *a, cl_int b)
1004 {
1005     cl_uint r = *a;
1006 
1007     // since this fallback code path is not multithreaded, we just do a regular
1008     // add here. If your operating system supports memory-barrier-atomics, use
1009     // those here.
1010     *a = r + b;
1011 
1012     return r;
1013 }
1014 
1015 // Blocking API that farms out count jobs to a thread pool.
1016 // It may return with some work undone if func_ptr() returns a non-zero
1017 // result.
ThreadPool_Do(TPFuncPtr func_ptr,cl_uint count,void * userInfo)1018 cl_int ThreadPool_Do(TPFuncPtr func_ptr, cl_uint count, void *userInfo)
1019 {
1020     cl_uint currentJob = 0;
1021     cl_int result = CL_SUCCESS;
1022 
1023 #ifndef MY_OS_REALLY_REALLY_DOESNT_SUPPORT_THREADS
1024     // THIS FUNCTION IS NOT INTENDED FOR USE!!
1025     log_error("ERROR:  Test must be multithreaded!\n");
1026     exit(-1);
1027 #else
1028     static int spewCount = 0;
1029 
1030     if (0 == spewCount)
1031     {
1032         log_info("\nWARNING:  The operating system is claimed not to support "
1033                  "threads of any sort. Running single threaded.\n");
1034         spewCount = 1;
1035     }
1036 #endif
1037 
1038     // The multithreaded code should mimic this behavior:
1039     for (currentJob = 0; currentJob < count; currentJob++)
1040         if ((result = func_ptr(currentJob, 0, userInfo))) return result;
1041 
1042     return CL_SUCCESS;
1043 }
1044 
GetThreadCount(void)1045 cl_uint GetThreadCount(void) { return 1; }
1046 
SetThreadCount(int count)1047 void SetThreadCount(int count)
1048 {
1049     if (count > 1) log_info("WARNING: SetThreadCount(%d) ignored\n", count);
1050 }
1051 
1052 #endif
1053