1 /*-------------------------------------------------------------------------
2 * drawElements Stream Library
3 * ---------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Thread safe ringbuffer
22 *//*--------------------------------------------------------------------*/
23 #include "deRingbuffer.h"
24
25 #include "deInt32.h"
26 #include "deMemory.h"
27 #include "deSemaphore.h"
28
29 #include <stdlib.h>
30 #include <stdio.h>
31
32 struct deRingbuffer_s
33 {
34 int32_t blockSize;
35 int32_t blockCount;
36 int32_t *blockUsage;
37 uint8_t *buffer;
38
39 deSemaphore emptyCount;
40 deSemaphore fullCount;
41
42 int32_t outBlock;
43 int32_t outPos;
44
45 int32_t inBlock;
46 int32_t inPos;
47
48 bool stopNotified;
49 bool consumerStopping;
50 };
51
deRingbuffer_create(int32_t blockSize,int32_t blockCount)52 deRingbuffer *deRingbuffer_create(int32_t blockSize, int32_t blockCount)
53 {
54 deRingbuffer *ringbuffer = (deRingbuffer *)deCalloc(sizeof(deRingbuffer));
55
56 DE_ASSERT(ringbuffer);
57 DE_ASSERT(blockCount > 0);
58 DE_ASSERT(blockSize > 0);
59
60 ringbuffer->blockSize = blockSize;
61 ringbuffer->blockCount = blockCount;
62 ringbuffer->buffer = (uint8_t *)deMalloc(sizeof(uint8_t) * (size_t)blockSize * (size_t)blockCount);
63 ringbuffer->blockUsage = (int32_t *)deMalloc(sizeof(uint32_t) * (size_t)blockCount);
64 ringbuffer->emptyCount = deSemaphore_create(ringbuffer->blockCount, DE_NULL);
65 ringbuffer->fullCount = deSemaphore_create(0, DE_NULL);
66
67 if (!ringbuffer->buffer || !ringbuffer->blockUsage || !ringbuffer->emptyCount || !ringbuffer->fullCount)
68 {
69 if (ringbuffer->emptyCount)
70 deSemaphore_destroy(ringbuffer->emptyCount);
71 if (ringbuffer->fullCount)
72 deSemaphore_destroy(ringbuffer->fullCount);
73 deFree(ringbuffer->buffer);
74 deFree(ringbuffer->blockUsage);
75 deFree(ringbuffer);
76 return DE_NULL;
77 }
78
79 memset(ringbuffer->blockUsage, 0, sizeof(int32_t) * (size_t)blockCount);
80
81 ringbuffer->outBlock = 0;
82 ringbuffer->outPos = 0;
83
84 ringbuffer->inBlock = 0;
85 ringbuffer->inPos = 0;
86
87 ringbuffer->stopNotified = false;
88 ringbuffer->consumerStopping = false;
89
90 return ringbuffer;
91 }
92
deRingbuffer_stop(deRingbuffer * ringbuffer)93 void deRingbuffer_stop(deRingbuffer *ringbuffer)
94 {
95 /* Set notify to true and increment fullCount to let consumer continue */
96 ringbuffer->stopNotified = true;
97 deSemaphore_increment(ringbuffer->fullCount);
98 }
99
deRingbuffer_destroy(deRingbuffer * ringbuffer)100 void deRingbuffer_destroy(deRingbuffer *ringbuffer)
101 {
102 deSemaphore_destroy(ringbuffer->emptyCount);
103 deSemaphore_destroy(ringbuffer->fullCount);
104
105 free(ringbuffer->buffer);
106 free(ringbuffer->blockUsage);
107 free(ringbuffer);
108 }
109
producerStream_write(deStreamData * stream,const void * buf,int32_t bufSize,int32_t * written)110 static deStreamResult producerStream_write(deStreamData *stream, const void *buf, int32_t bufSize, int32_t *written)
111 {
112 deRingbuffer *ringbuffer = (deRingbuffer *)stream;
113
114 DE_ASSERT(stream);
115 /* If ringbuffer is stopping return error on write */
116 if (ringbuffer->stopNotified)
117 {
118 DE_ASSERT(false);
119 return DE_STREAMRESULT_ERROR;
120 }
121
122 *written = 0;
123
124 /* Write while more data available */
125 while (*written < bufSize)
126 {
127 int32_t writeSize = 0;
128 uint8_t *src = DE_NULL;
129 uint8_t *dst = DE_NULL;
130
131 /* If between blocks accuire new block */
132 if (ringbuffer->inPos == 0)
133 {
134 deSemaphore_decrement(ringbuffer->emptyCount);
135 }
136
137 writeSize = deMin32(ringbuffer->blockSize - ringbuffer->inPos, bufSize - *written);
138 dst = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->inBlock + ringbuffer->inPos;
139 src = (uint8_t *)buf + *written;
140
141 deMemcpy(dst, src, (size_t)writeSize);
142
143 ringbuffer->inPos += writeSize;
144 *written += writeSize;
145 ringbuffer->blockUsage[ringbuffer->inBlock] += writeSize;
146
147 /* Block is full move to next one (or "between" this and next block) */
148 if (ringbuffer->inPos == ringbuffer->blockSize)
149 {
150 ringbuffer->inPos = 0;
151 ringbuffer->inBlock++;
152
153 if (ringbuffer->inBlock == ringbuffer->blockCount)
154 ringbuffer->inBlock = 0;
155 deSemaphore_increment(ringbuffer->fullCount);
156 }
157 }
158
159 return DE_STREAMRESULT_SUCCESS;
160 }
161
producerStream_flush(deStreamData * stream)162 static deStreamResult producerStream_flush(deStreamData *stream)
163 {
164 deRingbuffer *ringbuffer = (deRingbuffer *)stream;
165
166 DE_ASSERT(stream);
167
168 /* No blocks reserved by producer */
169 if (ringbuffer->inPos == 0)
170 return DE_STREAMRESULT_SUCCESS;
171
172 ringbuffer->inPos = 0;
173 ringbuffer->inBlock++;
174
175 if (ringbuffer->inBlock == ringbuffer->blockCount)
176 ringbuffer->inBlock = 0;
177
178 deSemaphore_increment(ringbuffer->fullCount);
179 return DE_STREAMRESULT_SUCCESS;
180 }
181
producerStream_deinit(deStreamData * stream)182 static deStreamResult producerStream_deinit(deStreamData *stream)
183 {
184 DE_ASSERT(stream);
185
186 producerStream_flush(stream);
187
188 /* \note mika Stream doesn't own ringbuffer, so it's not deallocated */
189 return DE_STREAMRESULT_SUCCESS;
190 }
191
consumerStream_read(deStreamData * stream,void * buf,int32_t bufSize,int32_t * read)192 static deStreamResult consumerStream_read(deStreamData *stream, void *buf, int32_t bufSize, int32_t *read)
193 {
194 deRingbuffer *ringbuffer = (deRingbuffer *)stream;
195
196 DE_ASSERT(stream);
197
198 *read = 0;
199 DE_ASSERT(ringbuffer);
200
201 while (*read < bufSize)
202 {
203 int32_t writeSize = 0;
204 uint8_t *src = DE_NULL;
205 uint8_t *dst = DE_NULL;
206
207 /* If between blocks accuire new block */
208 if (ringbuffer->outPos == 0)
209 {
210 /* If consumer is set to stop after everything is consumed,
211 * do not block if there is no more input left
212 */
213 if (ringbuffer->consumerStopping)
214 {
215 /* Try to accuire new block, if can't there is no more input */
216 if (!deSemaphore_tryDecrement(ringbuffer->fullCount))
217 {
218 return DE_STREAMRESULT_END_OF_STREAM;
219 }
220 }
221 else
222 {
223 /* If not stopping block until there is more input */
224 deSemaphore_decrement(ringbuffer->fullCount);
225 /* Ringbuffer was set to stop */
226 if (ringbuffer->stopNotified)
227 {
228 ringbuffer->consumerStopping = true;
229 }
230 }
231 }
232
233 writeSize = deMin32(ringbuffer->blockUsage[ringbuffer->outBlock] - ringbuffer->outPos, bufSize - *read);
234 src = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->outBlock + ringbuffer->outPos;
235 dst = (uint8_t *)buf + *read;
236
237 deMemcpy(dst, src, (size_t)writeSize);
238
239 ringbuffer->outPos += writeSize;
240 *read += writeSize;
241
242 /* Block is consumed move to next one (or "between" this and next block) */
243 if (ringbuffer->outPos == ringbuffer->blockUsage[ringbuffer->outBlock])
244 {
245 ringbuffer->blockUsage[ringbuffer->outBlock] = 0;
246 ringbuffer->outPos = 0;
247 ringbuffer->outBlock++;
248
249 if (ringbuffer->outBlock == ringbuffer->blockCount)
250 ringbuffer->outBlock = 0;
251
252 deSemaphore_increment(ringbuffer->emptyCount);
253 }
254 }
255
256 return DE_STREAMRESULT_SUCCESS;
257 }
258
consumerStream_deinit(deStreamData * stream)259 static deStreamResult consumerStream_deinit(deStreamData *stream)
260 {
261 DE_ASSERT(stream);
262 DE_UNREF(stream);
263
264 return DE_STREAMRESULT_SUCCESS;
265 }
266
267 /* There are no sensible errors so status is always good */
empty_getStatus(deStreamData * stream)268 deStreamStatus empty_getStatus(deStreamData *stream)
269 {
270 DE_UNREF(stream);
271
272 return DE_STREAMSTATUS_GOOD;
273 }
274
275 /* There are no sensible errors in ringbuffer */
empty_getError(deStreamData * stream)276 static const char *empty_getError(deStreamData *stream)
277 {
278 DE_ASSERT(stream);
279 DE_UNREF(stream);
280 return DE_NULL;
281 }
282
283 static const deIOStreamVFTable producerStreamVFTable = {
284 DE_NULL, producerStream_write, empty_getError, producerStream_flush, producerStream_deinit, empty_getStatus};
285
286 static const deIOStreamVFTable consumerStreamVFTable = {consumerStream_read, DE_NULL, empty_getError, DE_NULL,
287 consumerStream_deinit, empty_getStatus};
288
deProducerStream_init(deOutStream * stream,deRingbuffer * buffer)289 void deProducerStream_init(deOutStream *stream, deRingbuffer *buffer)
290 {
291 stream->ioStream.streamData = (deStreamData *)buffer;
292 stream->ioStream.vfTable = &producerStreamVFTable;
293 }
294
deConsumerStream_init(deInStream * stream,deRingbuffer * buffer)295 void deConsumerStream_init(deInStream *stream, deRingbuffer *buffer)
296 {
297 stream->ioStream.streamData = (deStreamData *)buffer;
298 stream->ioStream.vfTable = &consumerStreamVFTable;
299 }
300