xref: /aosp_15_r20/external/deqp/framework/delibs/decpp/deBlockBuffer.hpp (revision 35238bce31c2a825756842865a792f8cf7f89930)
1 #ifndef _DEBLOCKBUFFER_HPP
2 #define _DEBLOCKBUFFER_HPP
3 /*-------------------------------------------------------------------------
4  * drawElements C++ Base Library
5  * -----------------------------
6  *
7  * Copyright 2014 The Android Open Source Project
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  *//*!
22  * \file
23  * \brief Block-based thread-safe queue.
24  *//*--------------------------------------------------------------------*/
25 
26 #include "deBlockBuffer.hpp"
27 #include "deMutex.hpp"
28 #include "deSemaphore.h"
29 
30 #include <exception>
31 
32 namespace de
33 {
34 
35 void BlockBuffer_selfTest(void);
36 
37 class BufferCanceledException : public std::exception
38 {
39 public:
BufferCanceledException(void)40     inline BufferCanceledException(void)
41     {
42     }
~BufferCanceledException(void)43     inline ~BufferCanceledException(void) throw()
44     {
45     }
46 
what(void) const47     const char *what(void) const throw()
48     {
49         return "BufferCanceledException";
50     }
51 };
52 
53 template <typename T>
54 class BlockBuffer
55 {
56 public:
57     typedef BufferCanceledException CanceledException;
58 
59     BlockBuffer(int blockSize, int numBlocks);
60     ~BlockBuffer(void);
61 
62     void clear(void); //!< Resets buffer. Will block until pending writes and reads have completed.
63 
64     void write(int numElements, const T *elements);
65     int tryWrite(int numElements, const T *elements);
66     void flush(void);
67     bool tryFlush(void);
68 
69     void read(int numElements, T *elements);
70     int tryRead(int numElements, T *elements);
71 
72     void cancel(
73         void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException.
isCanceled(void) const74     bool isCanceled(void) const
75     {
76         return !!m_canceled;
77     }
78 
79 private:
80     BlockBuffer(const BlockBuffer &other);
81     BlockBuffer &operator=(const BlockBuffer &other);
82 
83     int writeToCurrentBlock(int numElements, const T *elements, bool blocking);
84     int readFromCurrentBlock(int numElements, T *elements, bool blocking);
85 
86     void flushWriteBlock(void);
87 
88     deSemaphore m_fill;  //!< Block fill count.
89     deSemaphore m_empty; //!< Block empty count.
90 
91     int m_writeBlock; //!< Current write block ndx.
92     int m_writePos;   //!< Position in block. 0 if block is not yet acquired.
93 
94     int m_readBlock; //!< Current read block ndx.
95     int m_readPos;   //!< Position in block. 0 if block is not yet acquired.
96 
97     int m_blockSize;
98     int m_numBlocks;
99 
100     T *m_elements;
101     int *m_numUsedInBlock;
102 
103     Mutex m_writeLock;
104     Mutex m_readLock;
105 
106     volatile uint32_t m_canceled;
107 } DE_WARN_UNUSED_TYPE;
108 
109 template <typename T>
BlockBuffer(int blockSize,int numBlocks)110 BlockBuffer<T>::BlockBuffer(int blockSize, int numBlocks)
111     : m_fill(0)
112     , m_empty(0)
113     , m_writeBlock(0)
114     , m_writePos(0)
115     , m_readBlock(0)
116     , m_readPos(0)
117     , m_blockSize(blockSize)
118     , m_numBlocks(numBlocks)
119     , m_elements(DE_NULL)
120     , m_numUsedInBlock(DE_NULL)
121     , m_writeLock()
122     , m_readLock()
123     , m_canceled(false)
124 {
125     DE_ASSERT(blockSize > 0);
126     DE_ASSERT(numBlocks > 0);
127 
128     try
129     {
130         m_elements       = new T[m_numBlocks * m_blockSize];
131         m_numUsedInBlock = new int[m_numBlocks];
132     }
133     catch (...)
134     {
135         delete[] m_elements;
136         delete[] m_numUsedInBlock;
137         throw;
138     }
139 
140     m_fill  = deSemaphore_create(0, DE_NULL);
141     m_empty = deSemaphore_create(numBlocks, DE_NULL);
142     DE_ASSERT(m_fill && m_empty);
143 }
144 
145 template <typename T>
~BlockBuffer(void)146 BlockBuffer<T>::~BlockBuffer(void)
147 {
148     delete[] m_elements;
149     delete[] m_numUsedInBlock;
150 
151     deSemaphore_destroy(m_fill);
152     deSemaphore_destroy(m_empty);
153 }
154 
155 template <typename T>
clear(void)156 void BlockBuffer<T>::clear(void)
157 {
158     ScopedLock readLock(m_readLock);
159     ScopedLock writeLock(m_writeLock);
160 
161     deSemaphore_destroy(m_fill);
162     deSemaphore_destroy(m_empty);
163 
164     m_fill       = deSemaphore_create(0, DE_NULL);
165     m_empty      = deSemaphore_create(m_numBlocks, DE_NULL);
166     m_writeBlock = 0;
167     m_writePos   = 0;
168     m_readBlock  = 0;
169     m_readPos    = 0;
170     m_canceled   = false;
171 
172     DE_ASSERT(m_fill && m_empty);
173 }
174 
175 template <typename T>
cancel(void)176 void BlockBuffer<T>::cancel(void)
177 {
178     DE_ASSERT(!m_canceled);
179     m_canceled = true;
180 
181     deSemaphore_increment(m_empty);
182     deSemaphore_increment(m_fill);
183 }
184 
185 template <typename T>
writeToCurrentBlock(int numElements,const T * elements,bool blocking)186 int BlockBuffer<T>::writeToCurrentBlock(int numElements, const T *elements, bool blocking)
187 {
188     DE_ASSERT(numElements > 0 && elements != DE_NULL);
189 
190     if (m_writePos == 0)
191     {
192         /* Write thread doesn't own current block - need to acquire. */
193         if (blocking)
194             deSemaphore_decrement(m_empty);
195         else
196         {
197             if (!deSemaphore_tryDecrement(m_empty))
198                 return 0;
199         }
200 
201         /* Check for canceled bit. */
202         if (m_canceled)
203         {
204             // \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here.
205             deSemaphore_increment(m_empty);
206             m_writeLock.unlock();
207             throw CanceledException();
208         }
209     }
210 
211     /* Write thread owns current block. */
212     T *block       = m_elements + m_writeBlock * m_blockSize;
213     int numToWrite = de::min(numElements, m_blockSize - m_writePos);
214 
215     DE_ASSERT(numToWrite > 0);
216 
217     for (int ndx = 0; ndx < numToWrite; ndx++)
218         block[m_writePos + ndx] = elements[ndx];
219 
220     m_writePos += numToWrite;
221 
222     if (m_writePos == m_blockSize)
223         flushWriteBlock(); /* Flush current write block. */
224 
225     return numToWrite;
226 }
227 
228 template <typename T>
readFromCurrentBlock(int numElements,T * elements,bool blocking)229 int BlockBuffer<T>::readFromCurrentBlock(int numElements, T *elements, bool blocking)
230 {
231     DE_ASSERT(numElements > 0 && elements != DE_NULL);
232 
233     if (m_readPos == 0)
234     {
235         /* Read thread doesn't own current block - need to acquire. */
236         if (blocking)
237             deSemaphore_decrement(m_fill);
238         else
239         {
240             if (!deSemaphore_tryDecrement(m_fill))
241                 return 0;
242         }
243 
244         /* Check for canceled bit. */
245         if (m_canceled)
246         {
247             // \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here.
248             deSemaphore_increment(m_fill);
249             m_readLock.unlock();
250             throw CanceledException();
251         }
252     }
253 
254     /* Read thread now owns current block. */
255     const T *block     = m_elements + m_readBlock * m_blockSize;
256     int numUsedInBlock = m_numUsedInBlock[m_readBlock];
257     int numToRead      = de::min(numElements, numUsedInBlock - m_readPos);
258 
259     DE_ASSERT(numToRead > 0);
260 
261     for (int ndx = 0; ndx < numToRead; ndx++)
262         elements[ndx] = block[m_readPos + ndx];
263 
264     m_readPos += numToRead;
265 
266     if (m_readPos == numUsedInBlock)
267     {
268         /* Free current read block and advance. */
269         m_readBlock = (m_readBlock + 1) % m_numBlocks;
270         m_readPos   = 0;
271         deSemaphore_increment(m_empty);
272     }
273 
274     return numToRead;
275 }
276 
277 template <typename T>
tryWrite(int numElements,const T * elements)278 int BlockBuffer<T>::tryWrite(int numElements, const T *elements)
279 {
280     int numWritten = 0;
281 
282     DE_ASSERT(numElements > 0 && elements != DE_NULL);
283 
284     if (m_canceled)
285         throw CanceledException();
286 
287     if (!m_writeLock.tryLock())
288         return numWritten;
289 
290     while (numWritten < numElements)
291     {
292         int ret = writeToCurrentBlock(numElements - numWritten, elements + numWritten, false /* non-blocking */);
293 
294         if (ret == 0)
295             break; /* Write failed. */
296 
297         numWritten += ret;
298     }
299 
300     m_writeLock.unlock();
301 
302     return numWritten;
303 }
304 
305 template <typename T>
write(int numElements,const T * elements)306 void BlockBuffer<T>::write(int numElements, const T *elements)
307 {
308     DE_ASSERT(numElements > 0 && elements != DE_NULL);
309 
310     if (m_canceled)
311         throw CanceledException();
312 
313     m_writeLock.lock();
314 
315     int numWritten = 0;
316     while (numWritten < numElements)
317         numWritten += writeToCurrentBlock(numElements - numWritten, elements + numWritten, true /* blocking */);
318 
319     m_writeLock.unlock();
320 }
321 
322 template <typename T>
flush(void)323 void BlockBuffer<T>::flush(void)
324 {
325     m_writeLock.lock();
326 
327     if (m_writePos > 0)
328         flushWriteBlock();
329 
330     m_writeLock.unlock();
331 }
332 
333 template <typename T>
tryFlush(void)334 bool BlockBuffer<T>::tryFlush(void)
335 {
336     if (!m_writeLock.tryLock())
337         return false;
338 
339     if (m_writePos > 0)
340         flushWriteBlock();
341 
342     m_writeLock.unlock();
343 
344     return true;
345 }
346 
347 template <typename T>
flushWriteBlock(void)348 void BlockBuffer<T>::flushWriteBlock(void)
349 {
350     DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize));
351 
352     m_numUsedInBlock[m_writeBlock] = m_writePos;
353     m_writeBlock                   = (m_writeBlock + 1) % m_numBlocks;
354     m_writePos                     = 0;
355     deSemaphore_increment(m_fill);
356 }
357 
358 template <typename T>
tryRead(int numElements,T * elements)359 int BlockBuffer<T>::tryRead(int numElements, T *elements)
360 {
361     int numRead = 0;
362 
363     if (m_canceled)
364         throw CanceledException();
365 
366     if (!m_readLock.tryLock())
367         return numRead;
368 
369     while (numRead < numElements)
370     {
371         int ret = readFromCurrentBlock(numElements - numRead, &elements[numRead], false /* non-blocking */);
372 
373         if (ret == 0)
374             break; /* Failed. */
375 
376         numRead += ret;
377     }
378 
379     m_readLock.unlock();
380 
381     return numRead;
382 }
383 
384 template <typename T>
read(int numElements,T * elements)385 void BlockBuffer<T>::read(int numElements, T *elements)
386 {
387     DE_ASSERT(numElements > 0 && elements != DE_NULL);
388 
389     if (m_canceled)
390         throw CanceledException();
391 
392     m_readLock.lock();
393 
394     int numRead = 0;
395     while (numRead < numElements)
396         numRead += readFromCurrentBlock(numElements - numRead, &elements[numRead], true /* blocking */);
397 
398     m_readLock.unlock();
399 }
400 
401 } // namespace de
402 
403 #endif // _DEBLOCKBUFFER_HPP
404