1 /*
2 * Copyright 2019 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "rtc_base/memory/fifo_buffer.h"
12
13 #include <algorithm>
14
15 #include "rtc_base/thread.h"
16
17 namespace rtc {
18
FifoBuffer(size_t size)19 FifoBuffer::FifoBuffer(size_t size)
20 : state_(SS_OPEN),
21 buffer_(new char[size]),
22 buffer_length_(size),
23 data_length_(0),
24 read_position_(0),
25 owner_(Thread::Current()) {
26 // all events are done on the owner_ thread
27 }
28
FifoBuffer(size_t size,Thread * owner)29 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
30 : state_(SS_OPEN),
31 buffer_(new char[size]),
32 buffer_length_(size),
33 data_length_(0),
34 read_position_(0),
35 owner_(owner) {
36 // all events are done on the owner_ thread
37 }
38
~FifoBuffer()39 FifoBuffer::~FifoBuffer() {}
40
GetBuffered(size_t * size) const41 bool FifoBuffer::GetBuffered(size_t* size) const {
42 webrtc::MutexLock lock(&mutex_);
43 *size = data_length_;
44 return true;
45 }
46
GetState() const47 StreamState FifoBuffer::GetState() const {
48 webrtc::MutexLock lock(&mutex_);
49 return state_;
50 }
51
Read(rtc::ArrayView<uint8_t> buffer,size_t & bytes_read,int & error)52 StreamResult FifoBuffer::Read(rtc::ArrayView<uint8_t> buffer,
53 size_t& bytes_read,
54 int& error) {
55 webrtc::MutexLock lock(&mutex_);
56 const bool was_writable = data_length_ < buffer_length_;
57 size_t copy = 0;
58 StreamResult result = ReadLocked(buffer.data(), buffer.size(), ©);
59
60 if (result == SR_SUCCESS) {
61 // If read was successful then adjust the read position and number of
62 // bytes buffered.
63 read_position_ = (read_position_ + copy) % buffer_length_;
64 data_length_ -= copy;
65 bytes_read = copy;
66
67 // if we were full before, and now we're not, post an event
68 if (!was_writable && copy > 0) {
69 PostEvent(SE_WRITE, 0);
70 }
71 }
72 return result;
73 }
74
Write(rtc::ArrayView<const uint8_t> buffer,size_t & bytes_written,int & error)75 StreamResult FifoBuffer::Write(rtc::ArrayView<const uint8_t> buffer,
76 size_t& bytes_written,
77 int& error) {
78 webrtc::MutexLock lock(&mutex_);
79
80 const bool was_readable = (data_length_ > 0);
81 size_t copy = 0;
82 StreamResult result = WriteLocked(buffer.data(), buffer.size(), ©);
83
84 if (result == SR_SUCCESS) {
85 // If write was successful then adjust the number of readable bytes.
86 data_length_ += copy;
87 bytes_written = copy;
88 // if we didn't have any data to read before, and now we do, post an event
89 if (!was_readable && copy > 0) {
90 PostEvent(SE_READ, 0);
91 }
92 }
93 return result;
94 }
95
Close()96 void FifoBuffer::Close() {
97 webrtc::MutexLock lock(&mutex_);
98 state_ = SS_CLOSED;
99 }
100
GetReadData(size_t * size)101 const void* FifoBuffer::GetReadData(size_t* size) {
102 webrtc::MutexLock lock(&mutex_);
103 *size = (read_position_ + data_length_ <= buffer_length_)
104 ? data_length_
105 : buffer_length_ - read_position_;
106 return &buffer_[read_position_];
107 }
108
ConsumeReadData(size_t size)109 void FifoBuffer::ConsumeReadData(size_t size) {
110 webrtc::MutexLock lock(&mutex_);
111 RTC_DCHECK(size <= data_length_);
112 const bool was_writable = data_length_ < buffer_length_;
113 read_position_ = (read_position_ + size) % buffer_length_;
114 data_length_ -= size;
115 if (!was_writable && size > 0) {
116 PostEvent(SE_WRITE, 0);
117 }
118 }
119
GetWriteBuffer(size_t * size)120 void* FifoBuffer::GetWriteBuffer(size_t* size) {
121 webrtc::MutexLock lock(&mutex_);
122 if (state_ == SS_CLOSED) {
123 return nullptr;
124 }
125
126 // if empty, reset the write position to the beginning, so we can get
127 // the biggest possible block
128 if (data_length_ == 0) {
129 read_position_ = 0;
130 }
131
132 const size_t write_position =
133 (read_position_ + data_length_) % buffer_length_;
134 *size = (write_position > read_position_ || data_length_ == 0)
135 ? buffer_length_ - write_position
136 : read_position_ - write_position;
137 return &buffer_[write_position];
138 }
139
ConsumeWriteBuffer(size_t size)140 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
141 webrtc::MutexLock lock(&mutex_);
142 RTC_DCHECK(size <= buffer_length_ - data_length_);
143 const bool was_readable = (data_length_ > 0);
144 data_length_ += size;
145 if (!was_readable && size > 0) {
146 PostEvent(SE_READ, 0);
147 }
148 }
149
ReadLocked(void * buffer,size_t bytes,size_t * bytes_read)150 StreamResult FifoBuffer::ReadLocked(void* buffer,
151 size_t bytes,
152 size_t* bytes_read) {
153 if (data_length_ == 0) {
154 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
155 }
156
157 const size_t available = data_length_;
158 const size_t read_position = read_position_ % buffer_length_;
159 const size_t copy = std::min(bytes, available);
160 const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
161 char* const p = static_cast<char*>(buffer);
162 memcpy(p, &buffer_[read_position], tail_copy);
163 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
164
165 if (bytes_read) {
166 *bytes_read = copy;
167 }
168 return SR_SUCCESS;
169 }
170
WriteLocked(const void * buffer,size_t bytes,size_t * bytes_written)171 StreamResult FifoBuffer::WriteLocked(const void* buffer,
172 size_t bytes,
173 size_t* bytes_written) {
174 if (state_ == SS_CLOSED) {
175 return SR_EOS;
176 }
177
178 if (data_length_ >= buffer_length_) {
179 return SR_BLOCK;
180 }
181
182 const size_t available = buffer_length_ - data_length_;
183 const size_t write_position =
184 (read_position_ + data_length_) % buffer_length_;
185 const size_t copy = std::min(bytes, available);
186 const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
187 const char* const p = static_cast<const char*>(buffer);
188 memcpy(&buffer_[write_position], p, tail_copy);
189 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
190
191 if (bytes_written) {
192 *bytes_written = copy;
193 }
194 return SR_SUCCESS;
195 }
196
197 } // namespace rtc
198