1 /*-------------------------------------------------------------------------
2 * drawElements Stream Library
3 * ---------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Buffered and threaded input and output streams
22 *//*--------------------------------------------------------------------*/
23
24 #include "deThreadStream.h"
25 #include "deStreamCpyThread.h"
26 #include "deRingbuffer.h"
27 #include "stdlib.h"
28
29 typedef struct deThreadInStream_s
30 {
31 deRingbuffer *ringbuffer;
32 deInStream *input;
33 deInStream consumerStream;
34 deOutStream producerStream;
35 deThread thread;
36 int bufferSize;
37 } deThreadInStream;
38
39 typedef struct deThreadOutStream_s
40 {
41 deRingbuffer *ringbuffer;
42 deInStream consumerStream;
43 deOutStream producerStream;
44 deStreamCpyThread *thread;
45 } deThreadOutStream;
46
inStreamCopy(void * arg)47 static void inStreamCopy(void *arg)
48 {
49 deThreadInStream *threadStream = (deThreadInStream *)arg;
50
51 uint8_t *buffer = malloc(sizeof(uint8_t) * (size_t)threadStream->bufferSize);
52
53 for (;;)
54 {
55 int32_t read = 0;
56 int32_t written = 0;
57 deStreamResult readResult = DE_STREAMRESULT_ERROR;
58
59 readResult = deInStream_read(threadStream->input, buffer, threadStream->bufferSize, &read);
60 DE_ASSERT(readResult != DE_STREAMRESULT_ERROR);
61 while (written < read)
62 {
63 int32_t wrote = 0;
64
65 /* \todo [mika] Handle errors */
66 deOutStream_write(&(threadStream->producerStream), buffer, read - written, &wrote);
67
68 written += wrote;
69 }
70
71 if (readResult == DE_STREAMRESULT_END_OF_STREAM)
72 {
73 break;
74 }
75 }
76
77 deOutStream_flush(&(threadStream->producerStream));
78 deRingbuffer_stop(threadStream->ringbuffer);
79 free(buffer);
80 }
81
threadInStream_read(deStreamData * stream,void * buf,int32_t bufSize,int32_t * numRead)82 static deStreamResult threadInStream_read(deStreamData *stream, void *buf, int32_t bufSize, int32_t *numRead)
83 {
84 deThreadInStream *threadStream = (deThreadInStream *)stream;
85 return deInStream_read(&(threadStream->consumerStream), buf, bufSize, numRead);
86 }
87
threadInStream_getError(deStreamData * stream)88 static const char *threadInStream_getError(deStreamData *stream)
89 {
90 deThreadInStream *threadStream = (deThreadInStream *)stream;
91
92 /* \todo [mika] Add handling for errors on thread stream */
93 return deInStream_getError(&(threadStream->consumerStream));
94 }
95
threadInStream_getStatus(deStreamData * stream)96 static deStreamStatus threadInStream_getStatus(deStreamData *stream)
97 {
98 deThreadInStream *threadStream = (deThreadInStream *)stream;
99
100 /* \todo [mika] Add handling for status on thread stream */
101 return deInStream_getStatus(&(threadStream->consumerStream));
102 }
103
104 /* \note [mika] Used by both in and out stream */
threadStream_deinit(deStreamData * stream)105 static deStreamResult threadStream_deinit(deStreamData *stream)
106 {
107 deThreadInStream *threadStream = (deThreadInStream *)stream;
108
109 deRingbuffer_stop(threadStream->ringbuffer);
110
111 deThread_join(threadStream->thread);
112 deThread_destroy(threadStream->thread);
113
114 deOutStream_deinit(&(threadStream->producerStream));
115 deInStream_deinit(&(threadStream->consumerStream));
116
117 deRingbuffer_destroy(threadStream->ringbuffer);
118
119 return DE_STREAMRESULT_SUCCESS;
120 }
121
122 static const deIOStreamVFTable threadInStreamVFTable = {
123 threadInStream_read, DE_NULL, threadInStream_getError, DE_NULL, threadStream_deinit, threadInStream_getStatus};
124
deThreadInStream_init(deInStream * stream,deInStream * input,int ringbufferBlockSize,int ringbufferBlockCount)125 void deThreadInStream_init(deInStream *stream, deInStream *input, int ringbufferBlockSize, int ringbufferBlockCount)
126 {
127 deThreadInStream *threadStream = DE_NULL;
128
129 threadStream = malloc(sizeof(deThreadInStream));
130 DE_ASSERT(threadStream);
131
132 threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
133 DE_ASSERT(threadStream->ringbuffer);
134
135 threadStream->bufferSize = ringbufferBlockSize;
136 threadStream->input = input;
137 deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
138 deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
139
140 threadStream->thread = deThread_create(inStreamCopy, threadStream, DE_NULL);
141 stream->ioStream.vfTable = &threadInStreamVFTable;
142 stream->ioStream.streamData = threadStream;
143 }
144
threadOutStream_write(deStreamData * stream,const void * buf,int32_t bufSize,int32_t * numWritten)145 static deStreamResult threadOutStream_write(deStreamData *stream, const void *buf, int32_t bufSize, int32_t *numWritten)
146 {
147 deThreadOutStream *threadStream = (deThreadOutStream *)stream;
148 return deOutStream_write(&(threadStream->producerStream), buf, bufSize, numWritten);
149 }
150
threadOutStream_getError(deStreamData * stream)151 static const char *threadOutStream_getError(deStreamData *stream)
152 {
153 deThreadOutStream *threadStream = (deThreadOutStream *)stream;
154
155 /* \todo [mika] Add handling for errors on thread stream */
156 return deOutStream_getError(&(threadStream->producerStream));
157 }
158
threadOutStream_getStatus(deStreamData * stream)159 static deStreamStatus threadOutStream_getStatus(deStreamData *stream)
160 {
161 deThreadOutStream *threadStream = (deThreadOutStream *)stream;
162
163 /* \todo [mika] Add handling for errors on thread stream */
164 return deOutStream_getStatus(&(threadStream->producerStream));
165 }
166
threadOutStream_flush(deStreamData * stream)167 static deStreamResult threadOutStream_flush(deStreamData *stream)
168 {
169 deThreadOutStream *threadStream = (deThreadOutStream *)stream;
170
171 return deOutStream_flush(&(threadStream->producerStream));
172 }
173
174 static const deIOStreamVFTable threadOutStreamVFTable = {DE_NULL,
175 threadOutStream_write,
176 threadOutStream_getError,
177 threadOutStream_flush,
178 threadStream_deinit,
179 threadOutStream_getStatus};
180
deThreadOutStream_init(deOutStream * stream,deOutStream * output,int ringbufferBlockSize,int ringbufferBlockCount)181 void deThreadOutStream_init(deOutStream *stream, deOutStream *output, int ringbufferBlockSize, int ringbufferBlockCount)
182 {
183 deThreadOutStream *threadStream = DE_NULL;
184
185 threadStream = malloc(sizeof(deThreadOutStream));
186 DE_ASSERT(threadStream);
187
188 threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
189 DE_ASSERT(threadStream->ringbuffer);
190
191 deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
192 deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
193
194 threadStream->thread = deStreamCpyThread_create(&(threadStream->consumerStream), output, ringbufferBlockSize);
195 stream->ioStream.vfTable = &threadOutStreamVFTable;
196 stream->ioStream.streamData = threadStream;
197 }
198