1 /* MtCoder.c -- Multi-thread Coder
2 2023-09-07 : Igor Pavlov : Public domain */
3
4 #include "Precomp.h"
5
6 #include "MtCoder.h"
7
8 #ifndef Z7_ST
9
MtProgressThunk_Progress(ICompressProgressPtr pp,UInt64 inSize,UInt64 outSize)10 static SRes MtProgressThunk_Progress(ICompressProgressPtr pp, UInt64 inSize, UInt64 outSize)
11 {
12 Z7_CONTAINER_FROM_VTBL_TO_DECL_VAR_pp_vt_p(CMtProgressThunk)
13 UInt64 inSize2 = 0;
14 UInt64 outSize2 = 0;
15 if (inSize != (UInt64)(Int64)-1)
16 {
17 inSize2 = inSize - p->inSize;
18 p->inSize = inSize;
19 }
20 if (outSize != (UInt64)(Int64)-1)
21 {
22 outSize2 = outSize - p->outSize;
23 p->outSize = outSize;
24 }
25 return MtProgress_ProgressAdd(p->mtProgress, inSize2, outSize2);
26 }
27
28
MtProgressThunk_CreateVTable(CMtProgressThunk * p)29 void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
30 {
31 p->vt.Progress = MtProgressThunk_Progress;
32 }
33
34
35
36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
37
38
39 static THREAD_FUNC_DECL ThreadFunc(void *pp);
40
41
MtCoderThread_CreateAndStart(CMtCoderThread * t)42 static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
43 {
44 WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->startEvent);
45 if (wres == 0)
46 {
47 t->stop = False;
48 if (!Thread_WasCreated(&t->thread))
49 wres = Thread_Create(&t->thread, ThreadFunc, t);
50 if (wres == 0)
51 wres = Event_Set(&t->startEvent);
52 }
53 if (wres == 0)
54 return SZ_OK;
55 return MY_SRes_HRESULT_FROM_WRes(wres);
56 }
57
58
MtCoderThread_Destruct(CMtCoderThread * t)59 static void MtCoderThread_Destruct(CMtCoderThread *t)
60 {
61 if (Thread_WasCreated(&t->thread))
62 {
63 t->stop = 1;
64 Event_Set(&t->startEvent);
65 Thread_Wait_Close(&t->thread);
66 }
67
68 Event_Close(&t->startEvent);
69
70 if (t->inBuf)
71 {
72 ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
73 t->inBuf = NULL;
74 }
75 }
76
77
78
79
80 /*
81 ThreadFunc2() returns:
82 SZ_OK - in all normal cases (even for stream error or memory allocation error)
83 SZ_ERROR_THREAD - in case of failure in system synch function
84 */
85
ThreadFunc2(CMtCoderThread * t)86 static SRes ThreadFunc2(CMtCoderThread *t)
87 {
88 CMtCoder *mtc = t->mtCoder;
89
90 for (;;)
91 {
92 unsigned bi;
93 SRes res;
94 SRes res2;
95 BoolInt finished;
96 unsigned bufIndex;
97 size_t size;
98 const Byte *inData;
99 UInt64 readProcessed = 0;
100
101 RINOK_THREAD(Event_Wait(&mtc->readEvent))
102
103 /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
104
105 if (mtc->stopReading)
106 {
107 return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
108 }
109
110 res = MtProgress_GetError(&mtc->mtProgress);
111
112 size = 0;
113 inData = NULL;
114 finished = True;
115
116 if (res == SZ_OK)
117 {
118 size = mtc->blockSize;
119 if (mtc->inStream)
120 {
121 if (!t->inBuf)
122 {
123 t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
124 if (!t->inBuf)
125 res = SZ_ERROR_MEM;
126 }
127 if (res == SZ_OK)
128 {
129 res = SeqInStream_ReadMax(mtc->inStream, t->inBuf, &size);
130 readProcessed = mtc->readProcessed + size;
131 mtc->readProcessed = readProcessed;
132 }
133 if (res != SZ_OK)
134 {
135 mtc->readRes = res;
136 /* after reading error - we can stop encoding of previous blocks */
137 MtProgress_SetError(&mtc->mtProgress, res);
138 }
139 else
140 finished = (size != mtc->blockSize);
141 }
142 else
143 {
144 size_t rem;
145 readProcessed = mtc->readProcessed;
146 rem = mtc->inDataSize - (size_t)readProcessed;
147 if (size > rem)
148 size = rem;
149 inData = mtc->inData + (size_t)readProcessed;
150 readProcessed += size;
151 mtc->readProcessed = readProcessed;
152 finished = (mtc->inDataSize == (size_t)readProcessed);
153 }
154 }
155
156 /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
157
158 res2 = SZ_OK;
159
160 if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
161 {
162 res2 = SZ_ERROR_THREAD;
163 if (res == SZ_OK)
164 {
165 res = res2;
166 // MtProgress_SetError(&mtc->mtProgress, res);
167 }
168 }
169
170 bi = mtc->blockIndex;
171
172 if (++mtc->blockIndex >= mtc->numBlocksMax)
173 mtc->blockIndex = 0;
174
175 bufIndex = (unsigned)(int)-1;
176
177 if (res == SZ_OK)
178 res = MtProgress_GetError(&mtc->mtProgress);
179
180 if (res != SZ_OK)
181 finished = True;
182
183 if (!finished)
184 {
185 if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
186 && mtc->expectedDataSize != readProcessed)
187 {
188 res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
189 if (res == SZ_OK)
190 mtc->numStartedThreads++;
191 else
192 {
193 MtProgress_SetError(&mtc->mtProgress, res);
194 finished = True;
195 }
196 }
197 }
198
199 if (finished)
200 mtc->stopReading = True;
201
202 RINOK_THREAD(Event_Set(&mtc->readEvent))
203
204 if (res2 != SZ_OK)
205 return res2;
206
207 if (res == SZ_OK)
208 {
209 CriticalSection_Enter(&mtc->cs);
210 bufIndex = mtc->freeBlockHead;
211 mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
212 CriticalSection_Leave(&mtc->cs);
213
214 res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
215 mtc->inStream ? t->inBuf : inData, size, finished);
216
217 // MtProgress_Reinit(&mtc->mtProgress, t->index);
218
219 if (res != SZ_OK)
220 MtProgress_SetError(&mtc->mtProgress, res);
221 }
222
223 {
224 CMtCoderBlock *block = &mtc->blocks[bi];
225 block->res = res;
226 block->bufIndex = bufIndex;
227 block->finished = finished;
228 }
229
230 #ifdef MTCODER_USE_WRITE_THREAD
231 RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
232 #else
233 {
234 unsigned wi;
235 {
236 CriticalSection_Enter(&mtc->cs);
237 wi = mtc->writeIndex;
238 if (wi == bi)
239 mtc->writeIndex = (unsigned)(int)-1;
240 else
241 mtc->ReadyBlocks[bi] = True;
242 CriticalSection_Leave(&mtc->cs);
243 }
244
245 if (wi != bi)
246 {
247 if (res != SZ_OK || finished)
248 return 0;
249 continue;
250 }
251
252 if (mtc->writeRes != SZ_OK)
253 res = mtc->writeRes;
254
255 for (;;)
256 {
257 if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
258 {
259 res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
260 if (res != SZ_OK)
261 {
262 mtc->writeRes = res;
263 MtProgress_SetError(&mtc->mtProgress, res);
264 }
265 }
266
267 if (++wi >= mtc->numBlocksMax)
268 wi = 0;
269 {
270 BoolInt isReady;
271
272 CriticalSection_Enter(&mtc->cs);
273
274 if (bufIndex != (unsigned)(int)-1)
275 {
276 mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
277 mtc->freeBlockHead = bufIndex;
278 }
279
280 isReady = mtc->ReadyBlocks[wi];
281
282 if (isReady)
283 mtc->ReadyBlocks[wi] = False;
284 else
285 mtc->writeIndex = wi;
286
287 CriticalSection_Leave(&mtc->cs);
288
289 RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
290
291 if (!isReady)
292 break;
293 }
294
295 {
296 CMtCoderBlock *block = &mtc->blocks[wi];
297 if (res == SZ_OK && block->res != SZ_OK)
298 res = block->res;
299 bufIndex = block->bufIndex;
300 finished = block->finished;
301 }
302 }
303 }
304 #endif
305
306 if (finished || res != SZ_OK)
307 return 0;
308 }
309 }
310
311
ThreadFunc(void * pp)312 static THREAD_FUNC_DECL ThreadFunc(void *pp)
313 {
314 CMtCoderThread *t = (CMtCoderThread *)pp;
315 for (;;)
316 {
317 if (Event_Wait(&t->startEvent) != 0)
318 return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
319 if (t->stop)
320 return 0;
321 {
322 SRes res = ThreadFunc2(t);
323 CMtCoder *mtc = t->mtCoder;
324 if (res != SZ_OK)
325 {
326 MtProgress_SetError(&mtc->mtProgress, res);
327 }
328
329 #ifndef MTCODER_USE_WRITE_THREAD
330 {
331 unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
332 if (numFinished == mtc->numStartedThreads)
333 if (Event_Set(&mtc->finishedEvent) != 0)
334 return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
335 }
336 #endif
337 }
338 }
339 }
340
341
342
MtCoder_Construct(CMtCoder * p)343 void MtCoder_Construct(CMtCoder *p)
344 {
345 unsigned i;
346
347 p->blockSize = 0;
348 p->numThreadsMax = 0;
349 p->expectedDataSize = (UInt64)(Int64)-1;
350
351 p->inStream = NULL;
352 p->inData = NULL;
353 p->inDataSize = 0;
354
355 p->progress = NULL;
356 p->allocBig = NULL;
357
358 p->mtCallback = NULL;
359 p->mtCallbackObject = NULL;
360
361 p->allocatedBufsSize = 0;
362
363 Event_Construct(&p->readEvent);
364 Semaphore_Construct(&p->blocksSemaphore);
365
366 for (i = 0; i < MTCODER_THREADS_MAX; i++)
367 {
368 CMtCoderThread *t = &p->threads[i];
369 t->mtCoder = p;
370 t->index = i;
371 t->inBuf = NULL;
372 t->stop = False;
373 Event_Construct(&t->startEvent);
374 Thread_CONSTRUCT(&t->thread)
375 }
376
377 #ifdef MTCODER_USE_WRITE_THREAD
378 for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
379 Event_Construct(&p->writeEvents[i]);
380 #else
381 Event_Construct(&p->finishedEvent);
382 #endif
383
384 CriticalSection_Init(&p->cs);
385 CriticalSection_Init(&p->mtProgress.cs);
386 }
387
388
389
390
MtCoder_Free(CMtCoder * p)391 static void MtCoder_Free(CMtCoder *p)
392 {
393 unsigned i;
394
395 /*
396 p->stopReading = True;
397 if (Event_IsCreated(&p->readEvent))
398 Event_Set(&p->readEvent);
399 */
400
401 for (i = 0; i < MTCODER_THREADS_MAX; i++)
402 MtCoderThread_Destruct(&p->threads[i]);
403
404 Event_Close(&p->readEvent);
405 Semaphore_Close(&p->blocksSemaphore);
406
407 #ifdef MTCODER_USE_WRITE_THREAD
408 for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
409 Event_Close(&p->writeEvents[i]);
410 #else
411 Event_Close(&p->finishedEvent);
412 #endif
413 }
414
415
MtCoder_Destruct(CMtCoder * p)416 void MtCoder_Destruct(CMtCoder *p)
417 {
418 MtCoder_Free(p);
419
420 CriticalSection_Delete(&p->cs);
421 CriticalSection_Delete(&p->mtProgress.cs);
422 }
423
424
MtCoder_Code(CMtCoder * p)425 SRes MtCoder_Code(CMtCoder *p)
426 {
427 unsigned numThreads = p->numThreadsMax;
428 unsigned numBlocksMax;
429 unsigned i;
430 SRes res = SZ_OK;
431
432 if (numThreads > MTCODER_THREADS_MAX)
433 numThreads = MTCODER_THREADS_MAX;
434 numBlocksMax = MTCODER_GET_NUM_BLOCKS_FROM_THREADS(numThreads);
435
436 if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
437 if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
438 if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
439
440 if (numBlocksMax > MTCODER_BLOCKS_MAX)
441 numBlocksMax = MTCODER_BLOCKS_MAX;
442
443 if (p->blockSize != p->allocatedBufsSize)
444 {
445 for (i = 0; i < MTCODER_THREADS_MAX; i++)
446 {
447 CMtCoderThread *t = &p->threads[i];
448 if (t->inBuf)
449 {
450 ISzAlloc_Free(p->allocBig, t->inBuf);
451 t->inBuf = NULL;
452 }
453 }
454 p->allocatedBufsSize = p->blockSize;
455 }
456
457 p->readRes = SZ_OK;
458
459 MtProgress_Init(&p->mtProgress, p->progress);
460
461 #ifdef MTCODER_USE_WRITE_THREAD
462 for (i = 0; i < numBlocksMax; i++)
463 {
464 RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->writeEvents[i]))
465 }
466 #else
467 RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
468 #endif
469
470 {
471 RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->readEvent))
472 RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, (UInt32)numBlocksMax, (UInt32)numBlocksMax))
473 }
474
475 for (i = 0; i < MTCODER_BLOCKS_MAX - 1; i++)
476 p->freeBlockList[i] = i + 1;
477 p->freeBlockList[MTCODER_BLOCKS_MAX - 1] = (unsigned)(int)-1;
478 p->freeBlockHead = 0;
479
480 p->readProcessed = 0;
481 p->blockIndex = 0;
482 p->numBlocksMax = numBlocksMax;
483 p->stopReading = False;
484
485 #ifndef MTCODER_USE_WRITE_THREAD
486 p->writeIndex = 0;
487 p->writeRes = SZ_OK;
488 for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
489 p->ReadyBlocks[i] = False;
490 p->numFinishedThreads = 0;
491 #endif
492
493 p->numStartedThreadsLimit = numThreads;
494 p->numStartedThreads = 0;
495
496 // for (i = 0; i < numThreads; i++)
497 {
498 CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
499 RINOK(MtCoderThread_CreateAndStart(nextThread))
500 }
501
502 RINOK_THREAD(Event_Set(&p->readEvent))
503
504 #ifdef MTCODER_USE_WRITE_THREAD
505 {
506 unsigned bi = 0;
507
508 for (;; bi++)
509 {
510 if (bi >= numBlocksMax)
511 bi = 0;
512
513 RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
514
515 {
516 const CMtCoderBlock *block = &p->blocks[bi];
517 unsigned bufIndex = block->bufIndex;
518 BoolInt finished = block->finished;
519 if (res == SZ_OK && block->res != SZ_OK)
520 res = block->res;
521
522 if (bufIndex != (unsigned)(int)-1)
523 {
524 if (res == SZ_OK)
525 {
526 res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
527 if (res != SZ_OK)
528 MtProgress_SetError(&p->mtProgress, res);
529 }
530
531 CriticalSection_Enter(&p->cs);
532 {
533 p->freeBlockList[bufIndex] = p->freeBlockHead;
534 p->freeBlockHead = bufIndex;
535 }
536 CriticalSection_Leave(&p->cs);
537 }
538
539 RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
540
541 if (finished)
542 break;
543 }
544 }
545 }
546 #else
547 {
548 WRes wres = Event_Wait(&p->finishedEvent);
549 res = MY_SRes_HRESULT_FROM_WRes(wres);
550 }
551 #endif
552
553 if (res == SZ_OK)
554 res = p->readRes;
555
556 if (res == SZ_OK)
557 res = p->mtProgress.res;
558
559 #ifndef MTCODER_USE_WRITE_THREAD
560 if (res == SZ_OK)
561 res = p->writeRes;
562 #endif
563
564 if (res != SZ_OK)
565 MtCoder_Free(p);
566 return res;
567 }
568
569 #endif
570
571 #undef RINOK_THREAD
572