xref: /aosp_15_r20/external/lzma/C/MtCoder.c (revision f6dc9357d832569d4d1f5d24eacdb3935a1ae8e6)
1 /* MtCoder.c -- Multi-thread Coder
2 2023-09-07 : Igor Pavlov : Public domain */
3 
4 #include "Precomp.h"
5 
6 #include "MtCoder.h"
7 
8 #ifndef Z7_ST
9 
MtProgressThunk_Progress(ICompressProgressPtr pp,UInt64 inSize,UInt64 outSize)10 static SRes MtProgressThunk_Progress(ICompressProgressPtr pp, UInt64 inSize, UInt64 outSize)
11 {
12   Z7_CONTAINER_FROM_VTBL_TO_DECL_VAR_pp_vt_p(CMtProgressThunk)
13   UInt64 inSize2 = 0;
14   UInt64 outSize2 = 0;
15   if (inSize != (UInt64)(Int64)-1)
16   {
17     inSize2 = inSize - p->inSize;
18     p->inSize = inSize;
19   }
20   if (outSize != (UInt64)(Int64)-1)
21   {
22     outSize2 = outSize - p->outSize;
23     p->outSize = outSize;
24   }
25   return MtProgress_ProgressAdd(p->mtProgress, inSize2, outSize2);
26 }
27 
28 
MtProgressThunk_CreateVTable(CMtProgressThunk * p)29 void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
30 {
31   p->vt.Progress = MtProgressThunk_Progress;
32 }
33 
34 
35 
36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
37 
38 
39 static THREAD_FUNC_DECL ThreadFunc(void *pp);
40 
41 
MtCoderThread_CreateAndStart(CMtCoderThread * t)42 static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
43 {
44   WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->startEvent);
45   if (wres == 0)
46   {
47     t->stop = False;
48     if (!Thread_WasCreated(&t->thread))
49       wres = Thread_Create(&t->thread, ThreadFunc, t);
50     if (wres == 0)
51       wres = Event_Set(&t->startEvent);
52   }
53   if (wres == 0)
54     return SZ_OK;
55   return MY_SRes_HRESULT_FROM_WRes(wres);
56 }
57 
58 
MtCoderThread_Destruct(CMtCoderThread * t)59 static void MtCoderThread_Destruct(CMtCoderThread *t)
60 {
61   if (Thread_WasCreated(&t->thread))
62   {
63     t->stop = 1;
64     Event_Set(&t->startEvent);
65     Thread_Wait_Close(&t->thread);
66   }
67 
68   Event_Close(&t->startEvent);
69 
70   if (t->inBuf)
71   {
72     ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
73     t->inBuf = NULL;
74   }
75 }
76 
77 
78 
79 
80 /*
81   ThreadFunc2() returns:
82   SZ_OK           - in all normal cases (even for stream error or memory allocation error)
83   SZ_ERROR_THREAD - in case of failure in system synch function
84 */
85 
ThreadFunc2(CMtCoderThread * t)86 static SRes ThreadFunc2(CMtCoderThread *t)
87 {
88   CMtCoder *mtc = t->mtCoder;
89 
90   for (;;)
91   {
92     unsigned bi;
93     SRes res;
94     SRes res2;
95     BoolInt finished;
96     unsigned bufIndex;
97     size_t size;
98     const Byte *inData;
99     UInt64 readProcessed = 0;
100 
101     RINOK_THREAD(Event_Wait(&mtc->readEvent))
102 
103     /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
104 
105     if (mtc->stopReading)
106     {
107       return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
108     }
109 
110     res = MtProgress_GetError(&mtc->mtProgress);
111 
112     size = 0;
113     inData = NULL;
114     finished = True;
115 
116     if (res == SZ_OK)
117     {
118       size = mtc->blockSize;
119       if (mtc->inStream)
120       {
121         if (!t->inBuf)
122         {
123           t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
124           if (!t->inBuf)
125             res = SZ_ERROR_MEM;
126         }
127         if (res == SZ_OK)
128         {
129           res = SeqInStream_ReadMax(mtc->inStream, t->inBuf, &size);
130           readProcessed = mtc->readProcessed + size;
131           mtc->readProcessed = readProcessed;
132         }
133         if (res != SZ_OK)
134         {
135           mtc->readRes = res;
136           /* after reading error - we can stop encoding of previous blocks */
137           MtProgress_SetError(&mtc->mtProgress, res);
138         }
139         else
140           finished = (size != mtc->blockSize);
141       }
142       else
143       {
144         size_t rem;
145         readProcessed = mtc->readProcessed;
146         rem = mtc->inDataSize - (size_t)readProcessed;
147         if (size > rem)
148           size = rem;
149         inData = mtc->inData + (size_t)readProcessed;
150         readProcessed += size;
151         mtc->readProcessed = readProcessed;
152         finished = (mtc->inDataSize == (size_t)readProcessed);
153       }
154     }
155 
156     /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
157 
158     res2 = SZ_OK;
159 
160     if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
161     {
162       res2 = SZ_ERROR_THREAD;
163       if (res == SZ_OK)
164       {
165         res = res2;
166         // MtProgress_SetError(&mtc->mtProgress, res);
167       }
168     }
169 
170     bi = mtc->blockIndex;
171 
172     if (++mtc->blockIndex >= mtc->numBlocksMax)
173       mtc->blockIndex = 0;
174 
175     bufIndex = (unsigned)(int)-1;
176 
177     if (res == SZ_OK)
178       res = MtProgress_GetError(&mtc->mtProgress);
179 
180     if (res != SZ_OK)
181       finished = True;
182 
183     if (!finished)
184     {
185       if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
186           && mtc->expectedDataSize != readProcessed)
187       {
188         res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
189         if (res == SZ_OK)
190           mtc->numStartedThreads++;
191         else
192         {
193           MtProgress_SetError(&mtc->mtProgress, res);
194           finished = True;
195         }
196       }
197     }
198 
199     if (finished)
200       mtc->stopReading = True;
201 
202     RINOK_THREAD(Event_Set(&mtc->readEvent))
203 
204     if (res2 != SZ_OK)
205       return res2;
206 
207     if (res == SZ_OK)
208     {
209       CriticalSection_Enter(&mtc->cs);
210       bufIndex = mtc->freeBlockHead;
211       mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
212       CriticalSection_Leave(&mtc->cs);
213 
214       res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
215           mtc->inStream ? t->inBuf : inData, size, finished);
216 
217       // MtProgress_Reinit(&mtc->mtProgress, t->index);
218 
219       if (res != SZ_OK)
220         MtProgress_SetError(&mtc->mtProgress, res);
221     }
222 
223     {
224       CMtCoderBlock *block = &mtc->blocks[bi];
225       block->res = res;
226       block->bufIndex = bufIndex;
227       block->finished = finished;
228     }
229 
230     #ifdef MTCODER_USE_WRITE_THREAD
231       RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
232     #else
233     {
234       unsigned wi;
235       {
236         CriticalSection_Enter(&mtc->cs);
237         wi = mtc->writeIndex;
238         if (wi == bi)
239           mtc->writeIndex = (unsigned)(int)-1;
240         else
241           mtc->ReadyBlocks[bi] = True;
242         CriticalSection_Leave(&mtc->cs);
243       }
244 
245       if (wi != bi)
246       {
247         if (res != SZ_OK || finished)
248           return 0;
249         continue;
250       }
251 
252       if (mtc->writeRes != SZ_OK)
253         res = mtc->writeRes;
254 
255       for (;;)
256       {
257         if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
258         {
259           res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
260           if (res != SZ_OK)
261           {
262             mtc->writeRes = res;
263             MtProgress_SetError(&mtc->mtProgress, res);
264           }
265         }
266 
267         if (++wi >= mtc->numBlocksMax)
268           wi = 0;
269         {
270           BoolInt isReady;
271 
272           CriticalSection_Enter(&mtc->cs);
273 
274           if (bufIndex != (unsigned)(int)-1)
275           {
276             mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
277             mtc->freeBlockHead = bufIndex;
278           }
279 
280           isReady = mtc->ReadyBlocks[wi];
281 
282           if (isReady)
283             mtc->ReadyBlocks[wi] = False;
284           else
285             mtc->writeIndex = wi;
286 
287           CriticalSection_Leave(&mtc->cs);
288 
289           RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
290 
291           if (!isReady)
292             break;
293         }
294 
295         {
296           CMtCoderBlock *block = &mtc->blocks[wi];
297           if (res == SZ_OK && block->res != SZ_OK)
298             res = block->res;
299           bufIndex = block->bufIndex;
300           finished = block->finished;
301         }
302       }
303     }
304     #endif
305 
306     if (finished || res != SZ_OK)
307       return 0;
308   }
309 }
310 
311 
ThreadFunc(void * pp)312 static THREAD_FUNC_DECL ThreadFunc(void *pp)
313 {
314   CMtCoderThread *t = (CMtCoderThread *)pp;
315   for (;;)
316   {
317     if (Event_Wait(&t->startEvent) != 0)
318       return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
319     if (t->stop)
320       return 0;
321     {
322       SRes res = ThreadFunc2(t);
323       CMtCoder *mtc = t->mtCoder;
324       if (res != SZ_OK)
325       {
326         MtProgress_SetError(&mtc->mtProgress, res);
327       }
328 
329       #ifndef MTCODER_USE_WRITE_THREAD
330       {
331         unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
332         if (numFinished == mtc->numStartedThreads)
333           if (Event_Set(&mtc->finishedEvent) != 0)
334             return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
335       }
336       #endif
337     }
338   }
339 }
340 
341 
342 
MtCoder_Construct(CMtCoder * p)343 void MtCoder_Construct(CMtCoder *p)
344 {
345   unsigned i;
346 
347   p->blockSize = 0;
348   p->numThreadsMax = 0;
349   p->expectedDataSize = (UInt64)(Int64)-1;
350 
351   p->inStream = NULL;
352   p->inData = NULL;
353   p->inDataSize = 0;
354 
355   p->progress = NULL;
356   p->allocBig = NULL;
357 
358   p->mtCallback = NULL;
359   p->mtCallbackObject = NULL;
360 
361   p->allocatedBufsSize = 0;
362 
363   Event_Construct(&p->readEvent);
364   Semaphore_Construct(&p->blocksSemaphore);
365 
366   for (i = 0; i < MTCODER_THREADS_MAX; i++)
367   {
368     CMtCoderThread *t = &p->threads[i];
369     t->mtCoder = p;
370     t->index = i;
371     t->inBuf = NULL;
372     t->stop = False;
373     Event_Construct(&t->startEvent);
374     Thread_CONSTRUCT(&t->thread)
375   }
376 
377   #ifdef MTCODER_USE_WRITE_THREAD
378     for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
379       Event_Construct(&p->writeEvents[i]);
380   #else
381     Event_Construct(&p->finishedEvent);
382   #endif
383 
384   CriticalSection_Init(&p->cs);
385   CriticalSection_Init(&p->mtProgress.cs);
386 }
387 
388 
389 
390 
MtCoder_Free(CMtCoder * p)391 static void MtCoder_Free(CMtCoder *p)
392 {
393   unsigned i;
394 
395   /*
396   p->stopReading = True;
397   if (Event_IsCreated(&p->readEvent))
398     Event_Set(&p->readEvent);
399   */
400 
401   for (i = 0; i < MTCODER_THREADS_MAX; i++)
402     MtCoderThread_Destruct(&p->threads[i]);
403 
404   Event_Close(&p->readEvent);
405   Semaphore_Close(&p->blocksSemaphore);
406 
407   #ifdef MTCODER_USE_WRITE_THREAD
408     for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
409       Event_Close(&p->writeEvents[i]);
410   #else
411     Event_Close(&p->finishedEvent);
412   #endif
413 }
414 
415 
MtCoder_Destruct(CMtCoder * p)416 void MtCoder_Destruct(CMtCoder *p)
417 {
418   MtCoder_Free(p);
419 
420   CriticalSection_Delete(&p->cs);
421   CriticalSection_Delete(&p->mtProgress.cs);
422 }
423 
424 
MtCoder_Code(CMtCoder * p)425 SRes MtCoder_Code(CMtCoder *p)
426 {
427   unsigned numThreads = p->numThreadsMax;
428   unsigned numBlocksMax;
429   unsigned i;
430   SRes res = SZ_OK;
431 
432   if (numThreads > MTCODER_THREADS_MAX)
433       numThreads = MTCODER_THREADS_MAX;
434   numBlocksMax = MTCODER_GET_NUM_BLOCKS_FROM_THREADS(numThreads);
435 
436   if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
437   if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
438   if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
439 
440   if (numBlocksMax > MTCODER_BLOCKS_MAX)
441       numBlocksMax = MTCODER_BLOCKS_MAX;
442 
443   if (p->blockSize != p->allocatedBufsSize)
444   {
445     for (i = 0; i < MTCODER_THREADS_MAX; i++)
446     {
447       CMtCoderThread *t = &p->threads[i];
448       if (t->inBuf)
449       {
450         ISzAlloc_Free(p->allocBig, t->inBuf);
451         t->inBuf = NULL;
452       }
453     }
454     p->allocatedBufsSize = p->blockSize;
455   }
456 
457   p->readRes = SZ_OK;
458 
459   MtProgress_Init(&p->mtProgress, p->progress);
460 
461   #ifdef MTCODER_USE_WRITE_THREAD
462     for (i = 0; i < numBlocksMax; i++)
463     {
464       RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->writeEvents[i]))
465     }
466   #else
467     RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
468   #endif
469 
470   {
471     RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->readEvent))
472     RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, (UInt32)numBlocksMax, (UInt32)numBlocksMax))
473   }
474 
475   for (i = 0; i < MTCODER_BLOCKS_MAX - 1; i++)
476     p->freeBlockList[i] = i + 1;
477   p->freeBlockList[MTCODER_BLOCKS_MAX - 1] = (unsigned)(int)-1;
478   p->freeBlockHead = 0;
479 
480   p->readProcessed = 0;
481   p->blockIndex = 0;
482   p->numBlocksMax = numBlocksMax;
483   p->stopReading = False;
484 
485   #ifndef MTCODER_USE_WRITE_THREAD
486     p->writeIndex = 0;
487     p->writeRes = SZ_OK;
488     for (i = 0; i < MTCODER_BLOCKS_MAX; i++)
489       p->ReadyBlocks[i] = False;
490     p->numFinishedThreads = 0;
491   #endif
492 
493   p->numStartedThreadsLimit = numThreads;
494   p->numStartedThreads = 0;
495 
496   // for (i = 0; i < numThreads; i++)
497   {
498     CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
499     RINOK(MtCoderThread_CreateAndStart(nextThread))
500   }
501 
502   RINOK_THREAD(Event_Set(&p->readEvent))
503 
504   #ifdef MTCODER_USE_WRITE_THREAD
505   {
506     unsigned bi = 0;
507 
508     for (;; bi++)
509     {
510       if (bi >= numBlocksMax)
511         bi = 0;
512 
513       RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
514 
515       {
516         const CMtCoderBlock *block = &p->blocks[bi];
517         unsigned bufIndex = block->bufIndex;
518         BoolInt finished = block->finished;
519         if (res == SZ_OK && block->res != SZ_OK)
520           res = block->res;
521 
522         if (bufIndex != (unsigned)(int)-1)
523         {
524           if (res == SZ_OK)
525           {
526             res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
527             if (res != SZ_OK)
528               MtProgress_SetError(&p->mtProgress, res);
529           }
530 
531           CriticalSection_Enter(&p->cs);
532           {
533             p->freeBlockList[bufIndex] = p->freeBlockHead;
534             p->freeBlockHead = bufIndex;
535           }
536           CriticalSection_Leave(&p->cs);
537         }
538 
539         RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
540 
541         if (finished)
542           break;
543       }
544     }
545   }
546   #else
547   {
548     WRes wres = Event_Wait(&p->finishedEvent);
549     res = MY_SRes_HRESULT_FROM_WRes(wres);
550   }
551   #endif
552 
553   if (res == SZ_OK)
554     res = p->readRes;
555 
556   if (res == SZ_OK)
557     res = p->mtProgress.res;
558 
559   #ifndef MTCODER_USE_WRITE_THREAD
560     if (res == SZ_OK)
561       res = p->writeRes;
562   #endif
563 
564   if (res != SZ_OK)
565     MtCoder_Free(p);
566   return res;
567 }
568 
569 #endif
570 
571 #undef RINOK_THREAD
572