xref: /aosp_15_r20/external/deqp/framework/delibs/destream/deThreadStream.c (revision 35238bce31c2a825756842865a792f8cf7f89930)
1 /*-------------------------------------------------------------------------
2  * drawElements Stream Library
3  * ---------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Buffered and threaded input and output streams
22  *//*--------------------------------------------------------------------*/
23 
24 #include "deThreadStream.h"
25 #include "deStreamCpyThread.h"
26 #include "deRingbuffer.h"
27 #include "stdlib.h"
28 
29 typedef struct deThreadInStream_s
30 {
31     deRingbuffer *ringbuffer;
32     deInStream *input;
33     deInStream consumerStream;
34     deOutStream producerStream;
35     deThread thread;
36     int bufferSize;
37 } deThreadInStream;
38 
39 typedef struct deThreadOutStream_s
40 {
41     deRingbuffer *ringbuffer;
42     deInStream consumerStream;
43     deOutStream producerStream;
44     deStreamCpyThread *thread;
45 } deThreadOutStream;
46 
inStreamCopy(void * arg)47 static void inStreamCopy(void *arg)
48 {
49     deThreadInStream *threadStream = (deThreadInStream *)arg;
50 
51     uint8_t *buffer = malloc(sizeof(uint8_t) * (size_t)threadStream->bufferSize);
52 
53     for (;;)
54     {
55         int32_t read              = 0;
56         int32_t written           = 0;
57         deStreamResult readResult = DE_STREAMRESULT_ERROR;
58 
59         readResult = deInStream_read(threadStream->input, buffer, threadStream->bufferSize, &read);
60         DE_ASSERT(readResult != DE_STREAMRESULT_ERROR);
61         while (written < read)
62         {
63             int32_t wrote = 0;
64 
65             /* \todo [mika] Handle errors */
66             deOutStream_write(&(threadStream->producerStream), buffer, read - written, &wrote);
67 
68             written += wrote;
69         }
70 
71         if (readResult == DE_STREAMRESULT_END_OF_STREAM)
72         {
73             break;
74         }
75     }
76 
77     deOutStream_flush(&(threadStream->producerStream));
78     deRingbuffer_stop(threadStream->ringbuffer);
79     free(buffer);
80 }
81 
threadInStream_read(deStreamData * stream,void * buf,int32_t bufSize,int32_t * numRead)82 static deStreamResult threadInStream_read(deStreamData *stream, void *buf, int32_t bufSize, int32_t *numRead)
83 {
84     deThreadInStream *threadStream = (deThreadInStream *)stream;
85     return deInStream_read(&(threadStream->consumerStream), buf, bufSize, numRead);
86 }
87 
threadInStream_getError(deStreamData * stream)88 static const char *threadInStream_getError(deStreamData *stream)
89 {
90     deThreadInStream *threadStream = (deThreadInStream *)stream;
91 
92     /* \todo [mika] Add handling for errors on thread stream */
93     return deInStream_getError(&(threadStream->consumerStream));
94 }
95 
threadInStream_getStatus(deStreamData * stream)96 static deStreamStatus threadInStream_getStatus(deStreamData *stream)
97 {
98     deThreadInStream *threadStream = (deThreadInStream *)stream;
99 
100     /* \todo [mika] Add handling for status on thread stream */
101     return deInStream_getStatus(&(threadStream->consumerStream));
102 }
103 
104 /* \note [mika] Used by both in and out stream */
threadStream_deinit(deStreamData * stream)105 static deStreamResult threadStream_deinit(deStreamData *stream)
106 {
107     deThreadInStream *threadStream = (deThreadInStream *)stream;
108 
109     deRingbuffer_stop(threadStream->ringbuffer);
110 
111     deThread_join(threadStream->thread);
112     deThread_destroy(threadStream->thread);
113 
114     deOutStream_deinit(&(threadStream->producerStream));
115     deInStream_deinit(&(threadStream->consumerStream));
116 
117     deRingbuffer_destroy(threadStream->ringbuffer);
118 
119     return DE_STREAMRESULT_SUCCESS;
120 }
121 
122 static const deIOStreamVFTable threadInStreamVFTable = {
123     threadInStream_read, DE_NULL, threadInStream_getError, DE_NULL, threadStream_deinit, threadInStream_getStatus};
124 
deThreadInStream_init(deInStream * stream,deInStream * input,int ringbufferBlockSize,int ringbufferBlockCount)125 void deThreadInStream_init(deInStream *stream, deInStream *input, int ringbufferBlockSize, int ringbufferBlockCount)
126 {
127     deThreadInStream *threadStream = DE_NULL;
128 
129     threadStream = malloc(sizeof(deThreadInStream));
130     DE_ASSERT(threadStream);
131 
132     threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
133     DE_ASSERT(threadStream->ringbuffer);
134 
135     threadStream->bufferSize = ringbufferBlockSize;
136     threadStream->input      = input;
137     deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
138     deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
139 
140     threadStream->thread        = deThread_create(inStreamCopy, threadStream, DE_NULL);
141     stream->ioStream.vfTable    = &threadInStreamVFTable;
142     stream->ioStream.streamData = threadStream;
143 }
144 
threadOutStream_write(deStreamData * stream,const void * buf,int32_t bufSize,int32_t * numWritten)145 static deStreamResult threadOutStream_write(deStreamData *stream, const void *buf, int32_t bufSize, int32_t *numWritten)
146 {
147     deThreadOutStream *threadStream = (deThreadOutStream *)stream;
148     return deOutStream_write(&(threadStream->producerStream), buf, bufSize, numWritten);
149 }
150 
threadOutStream_getError(deStreamData * stream)151 static const char *threadOutStream_getError(deStreamData *stream)
152 {
153     deThreadOutStream *threadStream = (deThreadOutStream *)stream;
154 
155     /* \todo [mika] Add handling for errors on thread stream */
156     return deOutStream_getError(&(threadStream->producerStream));
157 }
158 
threadOutStream_getStatus(deStreamData * stream)159 static deStreamStatus threadOutStream_getStatus(deStreamData *stream)
160 {
161     deThreadOutStream *threadStream = (deThreadOutStream *)stream;
162 
163     /* \todo [mika] Add handling for errors on thread stream */
164     return deOutStream_getStatus(&(threadStream->producerStream));
165 }
166 
threadOutStream_flush(deStreamData * stream)167 static deStreamResult threadOutStream_flush(deStreamData *stream)
168 {
169     deThreadOutStream *threadStream = (deThreadOutStream *)stream;
170 
171     return deOutStream_flush(&(threadStream->producerStream));
172 }
173 
174 static const deIOStreamVFTable threadOutStreamVFTable = {DE_NULL,
175                                                          threadOutStream_write,
176                                                          threadOutStream_getError,
177                                                          threadOutStream_flush,
178                                                          threadStream_deinit,
179                                                          threadOutStream_getStatus};
180 
deThreadOutStream_init(deOutStream * stream,deOutStream * output,int ringbufferBlockSize,int ringbufferBlockCount)181 void deThreadOutStream_init(deOutStream *stream, deOutStream *output, int ringbufferBlockSize, int ringbufferBlockCount)
182 {
183     deThreadOutStream *threadStream = DE_NULL;
184 
185     threadStream = malloc(sizeof(deThreadOutStream));
186     DE_ASSERT(threadStream);
187 
188     threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
189     DE_ASSERT(threadStream->ringbuffer);
190 
191     deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
192     deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
193 
194     threadStream->thread     = deStreamCpyThread_create(&(threadStream->consumerStream), output, ringbufferBlockSize);
195     stream->ioStream.vfTable = &threadOutStreamVFTable;
196     stream->ioStream.streamData = threadStream;
197 }
198