1 #ifndef _DEBLOCKBUFFER_HPP
2 #define _DEBLOCKBUFFER_HPP
3 /*-------------------------------------------------------------------------
4 * drawElements C++ Base Library
5 * -----------------------------
6 *
7 * Copyright 2014 The Android Open Source Project
8 *
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 *
13 * http://www.apache.org/licenses/LICENSE-2.0
14 *
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 *
21 *//*!
22 * \file
23 * \brief Block-based thread-safe queue.
24 *//*--------------------------------------------------------------------*/
25
26 #include "deBlockBuffer.hpp"
27 #include "deMutex.hpp"
28 #include "deSemaphore.h"
29
30 #include <exception>
31
32 namespace de
33 {
34
35 void BlockBuffer_selfTest(void);
36
37 class BufferCanceledException : public std::exception
38 {
39 public:
BufferCanceledException(void)40 inline BufferCanceledException(void)
41 {
42 }
~BufferCanceledException(void)43 inline ~BufferCanceledException(void) throw()
44 {
45 }
46
what(void) const47 const char *what(void) const throw()
48 {
49 return "BufferCanceledException";
50 }
51 };
52
53 template <typename T>
54 class BlockBuffer
55 {
56 public:
57 typedef BufferCanceledException CanceledException;
58
59 BlockBuffer(int blockSize, int numBlocks);
60 ~BlockBuffer(void);
61
62 void clear(void); //!< Resets buffer. Will block until pending writes and reads have completed.
63
64 void write(int numElements, const T *elements);
65 int tryWrite(int numElements, const T *elements);
66 void flush(void);
67 bool tryFlush(void);
68
69 void read(int numElements, T *elements);
70 int tryRead(int numElements, T *elements);
71
72 void cancel(
73 void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException.
isCanceled(void) const74 bool isCanceled(void) const
75 {
76 return !!m_canceled;
77 }
78
79 private:
80 BlockBuffer(const BlockBuffer &other);
81 BlockBuffer &operator=(const BlockBuffer &other);
82
83 int writeToCurrentBlock(int numElements, const T *elements, bool blocking);
84 int readFromCurrentBlock(int numElements, T *elements, bool blocking);
85
86 void flushWriteBlock(void);
87
88 deSemaphore m_fill; //!< Block fill count.
89 deSemaphore m_empty; //!< Block empty count.
90
91 int m_writeBlock; //!< Current write block ndx.
92 int m_writePos; //!< Position in block. 0 if block is not yet acquired.
93
94 int m_readBlock; //!< Current read block ndx.
95 int m_readPos; //!< Position in block. 0 if block is not yet acquired.
96
97 int m_blockSize;
98 int m_numBlocks;
99
100 T *m_elements;
101 int *m_numUsedInBlock;
102
103 Mutex m_writeLock;
104 Mutex m_readLock;
105
106 volatile uint32_t m_canceled;
107 } DE_WARN_UNUSED_TYPE;
108
109 template <typename T>
BlockBuffer(int blockSize,int numBlocks)110 BlockBuffer<T>::BlockBuffer(int blockSize, int numBlocks)
111 : m_fill(0)
112 , m_empty(0)
113 , m_writeBlock(0)
114 , m_writePos(0)
115 , m_readBlock(0)
116 , m_readPos(0)
117 , m_blockSize(blockSize)
118 , m_numBlocks(numBlocks)
119 , m_elements(DE_NULL)
120 , m_numUsedInBlock(DE_NULL)
121 , m_writeLock()
122 , m_readLock()
123 , m_canceled(false)
124 {
125 DE_ASSERT(blockSize > 0);
126 DE_ASSERT(numBlocks > 0);
127
128 try
129 {
130 m_elements = new T[m_numBlocks * m_blockSize];
131 m_numUsedInBlock = new int[m_numBlocks];
132 }
133 catch (...)
134 {
135 delete[] m_elements;
136 delete[] m_numUsedInBlock;
137 throw;
138 }
139
140 m_fill = deSemaphore_create(0, DE_NULL);
141 m_empty = deSemaphore_create(numBlocks, DE_NULL);
142 DE_ASSERT(m_fill && m_empty);
143 }
144
145 template <typename T>
~BlockBuffer(void)146 BlockBuffer<T>::~BlockBuffer(void)
147 {
148 delete[] m_elements;
149 delete[] m_numUsedInBlock;
150
151 deSemaphore_destroy(m_fill);
152 deSemaphore_destroy(m_empty);
153 }
154
155 template <typename T>
clear(void)156 void BlockBuffer<T>::clear(void)
157 {
158 ScopedLock readLock(m_readLock);
159 ScopedLock writeLock(m_writeLock);
160
161 deSemaphore_destroy(m_fill);
162 deSemaphore_destroy(m_empty);
163
164 m_fill = deSemaphore_create(0, DE_NULL);
165 m_empty = deSemaphore_create(m_numBlocks, DE_NULL);
166 m_writeBlock = 0;
167 m_writePos = 0;
168 m_readBlock = 0;
169 m_readPos = 0;
170 m_canceled = false;
171
172 DE_ASSERT(m_fill && m_empty);
173 }
174
175 template <typename T>
cancel(void)176 void BlockBuffer<T>::cancel(void)
177 {
178 DE_ASSERT(!m_canceled);
179 m_canceled = true;
180
181 deSemaphore_increment(m_empty);
182 deSemaphore_increment(m_fill);
183 }
184
185 template <typename T>
writeToCurrentBlock(int numElements,const T * elements,bool blocking)186 int BlockBuffer<T>::writeToCurrentBlock(int numElements, const T *elements, bool blocking)
187 {
188 DE_ASSERT(numElements > 0 && elements != DE_NULL);
189
190 if (m_writePos == 0)
191 {
192 /* Write thread doesn't own current block - need to acquire. */
193 if (blocking)
194 deSemaphore_decrement(m_empty);
195 else
196 {
197 if (!deSemaphore_tryDecrement(m_empty))
198 return 0;
199 }
200
201 /* Check for canceled bit. */
202 if (m_canceled)
203 {
204 // \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here.
205 deSemaphore_increment(m_empty);
206 m_writeLock.unlock();
207 throw CanceledException();
208 }
209 }
210
211 /* Write thread owns current block. */
212 T *block = m_elements + m_writeBlock * m_blockSize;
213 int numToWrite = de::min(numElements, m_blockSize - m_writePos);
214
215 DE_ASSERT(numToWrite > 0);
216
217 for (int ndx = 0; ndx < numToWrite; ndx++)
218 block[m_writePos + ndx] = elements[ndx];
219
220 m_writePos += numToWrite;
221
222 if (m_writePos == m_blockSize)
223 flushWriteBlock(); /* Flush current write block. */
224
225 return numToWrite;
226 }
227
228 template <typename T>
readFromCurrentBlock(int numElements,T * elements,bool blocking)229 int BlockBuffer<T>::readFromCurrentBlock(int numElements, T *elements, bool blocking)
230 {
231 DE_ASSERT(numElements > 0 && elements != DE_NULL);
232
233 if (m_readPos == 0)
234 {
235 /* Read thread doesn't own current block - need to acquire. */
236 if (blocking)
237 deSemaphore_decrement(m_fill);
238 else
239 {
240 if (!deSemaphore_tryDecrement(m_fill))
241 return 0;
242 }
243
244 /* Check for canceled bit. */
245 if (m_canceled)
246 {
247 // \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here.
248 deSemaphore_increment(m_fill);
249 m_readLock.unlock();
250 throw CanceledException();
251 }
252 }
253
254 /* Read thread now owns current block. */
255 const T *block = m_elements + m_readBlock * m_blockSize;
256 int numUsedInBlock = m_numUsedInBlock[m_readBlock];
257 int numToRead = de::min(numElements, numUsedInBlock - m_readPos);
258
259 DE_ASSERT(numToRead > 0);
260
261 for (int ndx = 0; ndx < numToRead; ndx++)
262 elements[ndx] = block[m_readPos + ndx];
263
264 m_readPos += numToRead;
265
266 if (m_readPos == numUsedInBlock)
267 {
268 /* Free current read block and advance. */
269 m_readBlock = (m_readBlock + 1) % m_numBlocks;
270 m_readPos = 0;
271 deSemaphore_increment(m_empty);
272 }
273
274 return numToRead;
275 }
276
277 template <typename T>
tryWrite(int numElements,const T * elements)278 int BlockBuffer<T>::tryWrite(int numElements, const T *elements)
279 {
280 int numWritten = 0;
281
282 DE_ASSERT(numElements > 0 && elements != DE_NULL);
283
284 if (m_canceled)
285 throw CanceledException();
286
287 if (!m_writeLock.tryLock())
288 return numWritten;
289
290 while (numWritten < numElements)
291 {
292 int ret = writeToCurrentBlock(numElements - numWritten, elements + numWritten, false /* non-blocking */);
293
294 if (ret == 0)
295 break; /* Write failed. */
296
297 numWritten += ret;
298 }
299
300 m_writeLock.unlock();
301
302 return numWritten;
303 }
304
305 template <typename T>
write(int numElements,const T * elements)306 void BlockBuffer<T>::write(int numElements, const T *elements)
307 {
308 DE_ASSERT(numElements > 0 && elements != DE_NULL);
309
310 if (m_canceled)
311 throw CanceledException();
312
313 m_writeLock.lock();
314
315 int numWritten = 0;
316 while (numWritten < numElements)
317 numWritten += writeToCurrentBlock(numElements - numWritten, elements + numWritten, true /* blocking */);
318
319 m_writeLock.unlock();
320 }
321
322 template <typename T>
flush(void)323 void BlockBuffer<T>::flush(void)
324 {
325 m_writeLock.lock();
326
327 if (m_writePos > 0)
328 flushWriteBlock();
329
330 m_writeLock.unlock();
331 }
332
333 template <typename T>
tryFlush(void)334 bool BlockBuffer<T>::tryFlush(void)
335 {
336 if (!m_writeLock.tryLock())
337 return false;
338
339 if (m_writePos > 0)
340 flushWriteBlock();
341
342 m_writeLock.unlock();
343
344 return true;
345 }
346
347 template <typename T>
flushWriteBlock(void)348 void BlockBuffer<T>::flushWriteBlock(void)
349 {
350 DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize));
351
352 m_numUsedInBlock[m_writeBlock] = m_writePos;
353 m_writeBlock = (m_writeBlock + 1) % m_numBlocks;
354 m_writePos = 0;
355 deSemaphore_increment(m_fill);
356 }
357
358 template <typename T>
tryRead(int numElements,T * elements)359 int BlockBuffer<T>::tryRead(int numElements, T *elements)
360 {
361 int numRead = 0;
362
363 if (m_canceled)
364 throw CanceledException();
365
366 if (!m_readLock.tryLock())
367 return numRead;
368
369 while (numRead < numElements)
370 {
371 int ret = readFromCurrentBlock(numElements - numRead, &elements[numRead], false /* non-blocking */);
372
373 if (ret == 0)
374 break; /* Failed. */
375
376 numRead += ret;
377 }
378
379 m_readLock.unlock();
380
381 return numRead;
382 }
383
384 template <typename T>
read(int numElements,T * elements)385 void BlockBuffer<T>::read(int numElements, T *elements)
386 {
387 DE_ASSERT(numElements > 0 && elements != DE_NULL);
388
389 if (m_canceled)
390 throw CanceledException();
391
392 m_readLock.lock();
393
394 int numRead = 0;
395 while (numRead < numElements)
396 numRead += readFromCurrentBlock(numElements - numRead, &elements[numRead], true /* blocking */);
397
398 m_readLock.unlock();
399 }
400
401 } // namespace de
402
403 #endif // _DEBLOCKBUFFER_HPP
404