xref: /aosp_15_r20/external/deqp/framework/delibs/destream/deRingbuffer.c (revision 35238bce31c2a825756842865a792f8cf7f89930)
1 /*-------------------------------------------------------------------------
2  * drawElements Stream Library
3  * ---------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Thread safe ringbuffer
22  *//*--------------------------------------------------------------------*/
23 #include "deRingbuffer.h"
24 
25 #include "deInt32.h"
26 #include "deMemory.h"
27 #include "deSemaphore.h"
28 
29 #include <stdlib.h>
30 #include <stdio.h>
31 
32 struct deRingbuffer_s
33 {
34     int32_t blockSize;
35     int32_t blockCount;
36     int32_t *blockUsage;
37     uint8_t *buffer;
38 
39     deSemaphore emptyCount;
40     deSemaphore fullCount;
41 
42     int32_t outBlock;
43     int32_t outPos;
44 
45     int32_t inBlock;
46     int32_t inPos;
47 
48     bool stopNotified;
49     bool consumerStopping;
50 };
51 
deRingbuffer_create(int32_t blockSize,int32_t blockCount)52 deRingbuffer *deRingbuffer_create(int32_t blockSize, int32_t blockCount)
53 {
54     deRingbuffer *ringbuffer = (deRingbuffer *)deCalloc(sizeof(deRingbuffer));
55 
56     DE_ASSERT(ringbuffer);
57     DE_ASSERT(blockCount > 0);
58     DE_ASSERT(blockSize > 0);
59 
60     ringbuffer->blockSize  = blockSize;
61     ringbuffer->blockCount = blockCount;
62     ringbuffer->buffer     = (uint8_t *)deMalloc(sizeof(uint8_t) * (size_t)blockSize * (size_t)blockCount);
63     ringbuffer->blockUsage = (int32_t *)deMalloc(sizeof(uint32_t) * (size_t)blockCount);
64     ringbuffer->emptyCount = deSemaphore_create(ringbuffer->blockCount, DE_NULL);
65     ringbuffer->fullCount  = deSemaphore_create(0, DE_NULL);
66 
67     if (!ringbuffer->buffer || !ringbuffer->blockUsage || !ringbuffer->emptyCount || !ringbuffer->fullCount)
68     {
69         if (ringbuffer->emptyCount)
70             deSemaphore_destroy(ringbuffer->emptyCount);
71         if (ringbuffer->fullCount)
72             deSemaphore_destroy(ringbuffer->fullCount);
73         deFree(ringbuffer->buffer);
74         deFree(ringbuffer->blockUsage);
75         deFree(ringbuffer);
76         return DE_NULL;
77     }
78 
79     memset(ringbuffer->blockUsage, 0, sizeof(int32_t) * (size_t)blockCount);
80 
81     ringbuffer->outBlock = 0;
82     ringbuffer->outPos   = 0;
83 
84     ringbuffer->inBlock = 0;
85     ringbuffer->inPos   = 0;
86 
87     ringbuffer->stopNotified     = false;
88     ringbuffer->consumerStopping = false;
89 
90     return ringbuffer;
91 }
92 
deRingbuffer_stop(deRingbuffer * ringbuffer)93 void deRingbuffer_stop(deRingbuffer *ringbuffer)
94 {
95     /* Set notify to true and increment fullCount to let consumer continue */
96     ringbuffer->stopNotified = true;
97     deSemaphore_increment(ringbuffer->fullCount);
98 }
99 
deRingbuffer_destroy(deRingbuffer * ringbuffer)100 void deRingbuffer_destroy(deRingbuffer *ringbuffer)
101 {
102     deSemaphore_destroy(ringbuffer->emptyCount);
103     deSemaphore_destroy(ringbuffer->fullCount);
104 
105     free(ringbuffer->buffer);
106     free(ringbuffer->blockUsage);
107     free(ringbuffer);
108 }
109 
producerStream_write(deStreamData * stream,const void * buf,int32_t bufSize,int32_t * written)110 static deStreamResult producerStream_write(deStreamData *stream, const void *buf, int32_t bufSize, int32_t *written)
111 {
112     deRingbuffer *ringbuffer = (deRingbuffer *)stream;
113 
114     DE_ASSERT(stream);
115     /* If ringbuffer is stopping return error on write */
116     if (ringbuffer->stopNotified)
117     {
118         DE_ASSERT(false);
119         return DE_STREAMRESULT_ERROR;
120     }
121 
122     *written = 0;
123 
124     /* Write while more data available */
125     while (*written < bufSize)
126     {
127         int32_t writeSize = 0;
128         uint8_t *src      = DE_NULL;
129         uint8_t *dst      = DE_NULL;
130 
131         /* If between blocks accuire new block */
132         if (ringbuffer->inPos == 0)
133         {
134             deSemaphore_decrement(ringbuffer->emptyCount);
135         }
136 
137         writeSize = deMin32(ringbuffer->blockSize - ringbuffer->inPos, bufSize - *written);
138         dst       = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->inBlock + ringbuffer->inPos;
139         src       = (uint8_t *)buf + *written;
140 
141         deMemcpy(dst, src, (size_t)writeSize);
142 
143         ringbuffer->inPos += writeSize;
144         *written += writeSize;
145         ringbuffer->blockUsage[ringbuffer->inBlock] += writeSize;
146 
147         /* Block is full move to next one (or "between" this and next block) */
148         if (ringbuffer->inPos == ringbuffer->blockSize)
149         {
150             ringbuffer->inPos = 0;
151             ringbuffer->inBlock++;
152 
153             if (ringbuffer->inBlock == ringbuffer->blockCount)
154                 ringbuffer->inBlock = 0;
155             deSemaphore_increment(ringbuffer->fullCount);
156         }
157     }
158 
159     return DE_STREAMRESULT_SUCCESS;
160 }
161 
producerStream_flush(deStreamData * stream)162 static deStreamResult producerStream_flush(deStreamData *stream)
163 {
164     deRingbuffer *ringbuffer = (deRingbuffer *)stream;
165 
166     DE_ASSERT(stream);
167 
168     /* No blocks reserved by producer */
169     if (ringbuffer->inPos == 0)
170         return DE_STREAMRESULT_SUCCESS;
171 
172     ringbuffer->inPos = 0;
173     ringbuffer->inBlock++;
174 
175     if (ringbuffer->inBlock == ringbuffer->blockCount)
176         ringbuffer->inBlock = 0;
177 
178     deSemaphore_increment(ringbuffer->fullCount);
179     return DE_STREAMRESULT_SUCCESS;
180 }
181 
producerStream_deinit(deStreamData * stream)182 static deStreamResult producerStream_deinit(deStreamData *stream)
183 {
184     DE_ASSERT(stream);
185 
186     producerStream_flush(stream);
187 
188     /* \note mika Stream doesn't own ringbuffer, so it's not deallocated */
189     return DE_STREAMRESULT_SUCCESS;
190 }
191 
consumerStream_read(deStreamData * stream,void * buf,int32_t bufSize,int32_t * read)192 static deStreamResult consumerStream_read(deStreamData *stream, void *buf, int32_t bufSize, int32_t *read)
193 {
194     deRingbuffer *ringbuffer = (deRingbuffer *)stream;
195 
196     DE_ASSERT(stream);
197 
198     *read = 0;
199     DE_ASSERT(ringbuffer);
200 
201     while (*read < bufSize)
202     {
203         int32_t writeSize = 0;
204         uint8_t *src      = DE_NULL;
205         uint8_t *dst      = DE_NULL;
206 
207         /* If between blocks accuire new block */
208         if (ringbuffer->outPos == 0)
209         {
210             /* If consumer is set to stop after everything is consumed,
211              * do not block if there is no more input left
212              */
213             if (ringbuffer->consumerStopping)
214             {
215                 /* Try to accuire new block, if can't there is no more input */
216                 if (!deSemaphore_tryDecrement(ringbuffer->fullCount))
217                 {
218                     return DE_STREAMRESULT_END_OF_STREAM;
219                 }
220             }
221             else
222             {
223                 /* If not stopping block until there is more input */
224                 deSemaphore_decrement(ringbuffer->fullCount);
225                 /* Ringbuffer was set to stop */
226                 if (ringbuffer->stopNotified)
227                 {
228                     ringbuffer->consumerStopping = true;
229                 }
230             }
231         }
232 
233         writeSize = deMin32(ringbuffer->blockUsage[ringbuffer->outBlock] - ringbuffer->outPos, bufSize - *read);
234         src       = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->outBlock + ringbuffer->outPos;
235         dst       = (uint8_t *)buf + *read;
236 
237         deMemcpy(dst, src, (size_t)writeSize);
238 
239         ringbuffer->outPos += writeSize;
240         *read += writeSize;
241 
242         /* Block is consumed move to next one (or "between" this and next block) */
243         if (ringbuffer->outPos == ringbuffer->blockUsage[ringbuffer->outBlock])
244         {
245             ringbuffer->blockUsage[ringbuffer->outBlock] = 0;
246             ringbuffer->outPos                           = 0;
247             ringbuffer->outBlock++;
248 
249             if (ringbuffer->outBlock == ringbuffer->blockCount)
250                 ringbuffer->outBlock = 0;
251 
252             deSemaphore_increment(ringbuffer->emptyCount);
253         }
254     }
255 
256     return DE_STREAMRESULT_SUCCESS;
257 }
258 
consumerStream_deinit(deStreamData * stream)259 static deStreamResult consumerStream_deinit(deStreamData *stream)
260 {
261     DE_ASSERT(stream);
262     DE_UNREF(stream);
263 
264     return DE_STREAMRESULT_SUCCESS;
265 }
266 
267 /* There are no sensible errors so status is always good */
empty_getStatus(deStreamData * stream)268 deStreamStatus empty_getStatus(deStreamData *stream)
269 {
270     DE_UNREF(stream);
271 
272     return DE_STREAMSTATUS_GOOD;
273 }
274 
275 /* There are no sensible errors in ringbuffer */
empty_getError(deStreamData * stream)276 static const char *empty_getError(deStreamData *stream)
277 {
278     DE_ASSERT(stream);
279     DE_UNREF(stream);
280     return DE_NULL;
281 }
282 
283 static const deIOStreamVFTable producerStreamVFTable = {
284     DE_NULL, producerStream_write, empty_getError, producerStream_flush, producerStream_deinit, empty_getStatus};
285 
286 static const deIOStreamVFTable consumerStreamVFTable = {consumerStream_read,   DE_NULL,        empty_getError, DE_NULL,
287                                                         consumerStream_deinit, empty_getStatus};
288 
deProducerStream_init(deOutStream * stream,deRingbuffer * buffer)289 void deProducerStream_init(deOutStream *stream, deRingbuffer *buffer)
290 {
291     stream->ioStream.streamData = (deStreamData *)buffer;
292     stream->ioStream.vfTable    = &producerStreamVFTable;
293 }
294 
deConsumerStream_init(deInStream * stream,deRingbuffer * buffer)295 void deConsumerStream_init(deInStream *stream, deRingbuffer *buffer)
296 {
297     stream->ioStream.streamData = (deStreamData *)buffer;
298     stream->ioStream.vfTable    = &consumerStreamVFTable;
299 }
300