xref: /aosp_15_r20/external/pigweed/pw_ring_buffer/prefixed_entry_ring_buffer.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
16 
17 #include <algorithm>
18 #include <cstring>
19 
20 #include "pw_assert/assert.h"
21 #include "pw_assert/check.h"
22 #include "pw_status/try.h"
23 #include "pw_varint/varint.h"
24 
25 namespace pw {
26 namespace ring_buffer {
27 
28 using std::byte;
29 using Entry = PrefixedEntryRingBufferMulti::Entry;
30 using Reader = PrefixedEntryRingBufferMulti::Reader;
31 using iterator = PrefixedEntryRingBufferMulti::iterator;
32 
Clear()33 void PrefixedEntryRingBufferMulti::Clear() {
34   write_idx_ = 0;
35   for (Reader& reader : readers_) {
36     reader.read_idx_ = 0;
37     reader.entry_count_ = 0;
38   }
39 }
40 
SetBuffer(span<byte> buffer)41 Status PrefixedEntryRingBufferMulti::SetBuffer(span<byte> buffer) {
42   if ((buffer.data() == nullptr) ||  //
43       (buffer.size_bytes() == 0) ||  //
44       (buffer.size_bytes() > kMaxBufferBytes)) {
45     return Status::InvalidArgument();
46   }
47 
48   buffer_ = buffer.data();
49   buffer_bytes_ = buffer.size_bytes();
50 
51   Clear();
52   return OkStatus();
53 }
54 
AttachReader(Reader & reader)55 Status PrefixedEntryRingBufferMulti::AttachReader(Reader& reader) {
56   if (reader.buffer_ != nullptr) {
57     return Status::InvalidArgument();
58   }
59   reader.buffer_ = this;
60 
61   if (readers_.empty()) {
62     reader.read_idx_ = write_idx_;
63     reader.entry_count_ = 0;
64   } else {
65     const Reader& slowest_reader = GetSlowestReader();
66     reader.read_idx_ = slowest_reader.read_idx_;
67     reader.entry_count_ = slowest_reader.entry_count_;
68   }
69 
70   readers_.push_back(reader);
71   return OkStatus();
72 }
73 
DetachReader(Reader & reader)74 Status PrefixedEntryRingBufferMulti::DetachReader(Reader& reader) {
75   if (reader.buffer_ != this) {
76     return Status::InvalidArgument();
77   }
78   reader.buffer_ = nullptr;
79   reader.read_idx_ = 0;
80   reader.entry_count_ = 0;
81   readers_.remove(reader);
82   return OkStatus();
83 }
84 
InternalPushBack(span<const byte> data,uint32_t user_preamble_data,bool pop_front_if_needed)85 Status PrefixedEntryRingBufferMulti::InternalPushBack(
86     span<const byte> data,
87     uint32_t user_preamble_data,
88     bool pop_front_if_needed) {
89   if (buffer_ == nullptr) {
90     return Status::FailedPrecondition();
91   }
92 
93   // Prepare a single buffer that can hold both the user preamble and entry
94   // length.
95   byte preamble_buf[varint::kMaxVarint32SizeBytes * 2];
96   size_t user_preamble_bytes = 0;
97   if (user_preamble_) {
98     user_preamble_bytes =
99         varint::Encode<uint32_t>(user_preamble_data, preamble_buf);
100   }
101   size_t length_bytes =
102       varint::Encode<uint32_t>(static_cast<uint32_t>(data.size_bytes()),
103                                span(preamble_buf).subspan(user_preamble_bytes));
104   size_t total_write_bytes =
105       user_preamble_bytes + length_bytes + data.size_bytes();
106   if (buffer_bytes_ < total_write_bytes) {
107     return Status::OutOfRange();
108   }
109 
110   if (pop_front_if_needed) {
111     // PushBack() case: evict items as needed.
112     // Drop old entries until we have space for the new entry.
113     while (RawAvailableBytes() < total_write_bytes) {
114       InternalPopFrontAll();
115     }
116   } else if (RawAvailableBytes() < total_write_bytes) {
117     // TryPushBack() case: don't evict items.
118     return Status::ResourceExhausted();
119   }
120 
121   // Write the new entry into the ring buffer.
122   RawWrite(span(preamble_buf, user_preamble_bytes + length_bytes));
123   RawWrite(data);
124 
125   // Update all readers of the new count.
126   for (Reader& reader : readers_) {
127     reader.entry_count_++;
128   }
129   return OkStatus();
130 }
131 
GetOutput(span<byte> data_out,size_t * write_index)132 auto GetOutput(span<byte> data_out, size_t* write_index) {
133   return [data_out, write_index](span<const byte> src) -> Status {
134     size_t copy_size = std::min(data_out.size_bytes(), src.size_bytes());
135 
136     memcpy(data_out.data() + *write_index, src.data(), copy_size);
137     *write_index += copy_size;
138 
139     return (copy_size == src.size_bytes()) ? OkStatus()
140                                            : Status::ResourceExhausted();
141   };
142 }
143 
InternalPeekFront(const Reader & reader,span<byte> data,size_t * bytes_read_out) const144 Status PrefixedEntryRingBufferMulti::InternalPeekFront(
145     const Reader& reader, span<byte> data, size_t* bytes_read_out) const {
146   *bytes_read_out = 0;
147   return InternalRead(reader, GetOutput(data, bytes_read_out), false);
148 }
149 
InternalPeekFront(const Reader & reader,ReadOutput output) const150 Status PrefixedEntryRingBufferMulti::InternalPeekFront(
151     const Reader& reader, ReadOutput output) const {
152   return InternalRead(reader, output, false);
153 }
154 
InternalPeekFrontWithPreamble(const Reader & reader,span<byte> data,size_t * bytes_read_out) const155 Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
156     const Reader& reader, span<byte> data, size_t* bytes_read_out) const {
157   *bytes_read_out = 0;
158   return InternalRead(reader, GetOutput(data, bytes_read_out), true);
159 }
160 
InternalPeekFrontWithPreamble(const Reader & reader,ReadOutput output) const161 Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
162     const Reader& reader, ReadOutput output) const {
163   return InternalRead(reader, output, true);
164 }
165 
InternalPeekFrontPreamble(const Reader & reader,uint32_t & user_preamble_out) const166 Status PrefixedEntryRingBufferMulti::InternalPeekFrontPreamble(
167     const Reader& reader, uint32_t& user_preamble_out) const {
168   if (reader.entry_count_ == 0) {
169     return Status::OutOfRange();
170   }
171   // Figure out where to start reading (wrapped); accounting for preamble.
172   EntryInfo info = FrontEntryInfo(reader);
173   user_preamble_out = info.user_preamble;
174   return OkStatus();
175 }
176 
177 // TODO: b/235351046 - Consider whether this internal templating is required, or
178 // if we can simply promote GetOutput to a static function and remove the
179 // template. T should be similar to Status (*read_output)(span<const byte>)
180 template <typename T>
InternalRead(const Reader & reader,T read_output,bool include_preamble_in_output,uint32_t * user_preamble_out) const181 Status PrefixedEntryRingBufferMulti::InternalRead(
182     const Reader& reader,
183     T read_output,
184     bool include_preamble_in_output,
185     uint32_t* user_preamble_out) const {
186   if (buffer_ == nullptr) {
187     return Status::FailedPrecondition();
188   }
189   if (reader.entry_count_ == 0) {
190     return Status::OutOfRange();
191   }
192 
193   // Figure out where to start reading (wrapped); accounting for preamble.
194   EntryInfo info = FrontEntryInfo(reader);
195   size_t read_bytes = info.data_bytes;
196   size_t data_read_idx = reader.read_idx_;
197   if (user_preamble_out) {
198     *user_preamble_out = info.user_preamble;
199   }
200   if (include_preamble_in_output) {
201     read_bytes += info.preamble_bytes;
202   } else {
203     data_read_idx = IncrementIndex(data_read_idx, info.preamble_bytes);
204   }
205 
206   // Read bytes, stopping at the end of the buffer if this entry wraps.
207   size_t bytes_until_wrap = buffer_bytes_ - data_read_idx;
208   size_t bytes_to_copy = std::min(read_bytes, bytes_until_wrap);
209   Status status = read_output(span(buffer_ + data_read_idx, bytes_to_copy));
210 
211   // If the entry wrapped, read the remaining bytes.
212   if (status.ok() && (bytes_to_copy < read_bytes)) {
213     status = read_output(span(buffer_, read_bytes - bytes_to_copy));
214   }
215   return status;
216 }
217 
InternalPopFrontAll()218 void PrefixedEntryRingBufferMulti::InternalPopFrontAll() {
219   // Forcefully pop all readers. Find the slowest reader, which must have
220   // the highest entry count, then pop all readers that have the same count.
221   //
222   // It is expected that InternalPopFrontAll is called only when there is
223   // something to pop from at least one reader. If no readers exist, or all
224   // readers are caught up, this function will assert.
225   size_t entry_count = GetSlowestReader().entry_count_;
226   PW_DCHECK_INT_NE(entry_count, 0);
227   // Otherwise, pop the readers that have the largest value.
228   for (Reader& reader : readers_) {
229     if (reader.entry_count_ == entry_count) {
230       reader.PopFront()
231           .IgnoreError();  // TODO: b/242598609 - Handle Status properly
232     }
233   }
234 }
235 
GetSlowestReader() const236 const Reader& PrefixedEntryRingBufferMulti::GetSlowestReader() const {
237   PW_DCHECK_INT_GT(readers_.size(), 0);
238   const Reader* slowest_reader = &(*readers_.begin());
239   for (const Reader& reader : readers_) {
240     if (reader.entry_count_ > slowest_reader->entry_count_) {
241       slowest_reader = &reader;
242     }
243   }
244   return *slowest_reader;
245 }
246 
Dering()247 Status PrefixedEntryRingBufferMulti::Dering() {
248   if (buffer_ == nullptr || readers_.empty()) {
249     return Status::FailedPrecondition();
250   }
251 
252   // Check if by luck we're already deringed.
253   Reader& slowest_reader = GetSlowestReaderWritable();
254   if (slowest_reader.read_idx_ == 0) {
255     return OkStatus();
256   }
257 
258   return InternalDering(slowest_reader);
259 }
260 
InternalDering(Reader & dering_reader)261 Status PrefixedEntryRingBufferMulti::InternalDering(Reader& dering_reader) {
262   if (buffer_ == nullptr || readers_.empty()) {
263     return Status::FailedPrecondition();
264   }
265 
266   auto buffer_span = span(buffer_, buffer_bytes_);
267   std::rotate(
268       buffer_span.begin(),
269       buffer_span.begin() + static_cast<span<std::byte>::difference_type>(
270                                 dering_reader.read_idx_),
271       buffer_span.end());
272 
273   // If the new index is past the end of the buffer,
274   // alias it back (wrap) to the start of the buffer.
275   if (write_idx_ < dering_reader.read_idx_) {
276     write_idx_ += buffer_bytes_;
277   }
278   write_idx_ -= dering_reader.read_idx_;
279 
280   for (Reader& reader : readers_) {
281     if (&reader == &dering_reader) {
282       continue;
283     }
284     if (reader.read_idx_ < dering_reader.read_idx_) {
285       reader.read_idx_ += buffer_bytes_;
286     }
287     reader.read_idx_ -= dering_reader.read_idx_;
288   }
289 
290   dering_reader.read_idx_ = 0;
291   return OkStatus();
292 }
293 
InternalPopFront(Reader & reader)294 Status PrefixedEntryRingBufferMulti::InternalPopFront(Reader& reader) {
295   if (buffer_ == nullptr) {
296     return Status::FailedPrecondition();
297   }
298   if (reader.entry_count_ == 0) {
299     return Status::OutOfRange();
300   }
301 
302   // Advance the read pointer past the front entry to the next one.
303   EntryInfo info = FrontEntryInfo(reader);
304   size_t entry_bytes = info.preamble_bytes + info.data_bytes;
305   size_t prev_read_idx = reader.read_idx_;
306   reader.read_idx_ = IncrementIndex(prev_read_idx, entry_bytes);
307   reader.entry_count_--;
308   return OkStatus();
309 }
310 
InternalFrontEntryDataSizeBytes(const Reader & reader) const311 size_t PrefixedEntryRingBufferMulti::InternalFrontEntryDataSizeBytes(
312     const Reader& reader) const {
313   if (reader.entry_count_ == 0) {
314     return 0;
315   }
316   return FrontEntryInfo(reader).data_bytes;
317 }
318 
InternalFrontEntryTotalSizeBytes(const Reader & reader) const319 size_t PrefixedEntryRingBufferMulti::InternalFrontEntryTotalSizeBytes(
320     const Reader& reader) const {
321   if (reader.entry_count_ == 0) {
322     return 0;
323   }
324   EntryInfo info = FrontEntryInfo(reader);
325   return info.preamble_bytes + info.data_bytes;
326 }
327 
328 PrefixedEntryRingBufferMulti::EntryInfo
FrontEntryInfo(const Reader & reader) const329 PrefixedEntryRingBufferMulti::FrontEntryInfo(const Reader& reader) const {
330   Result<PrefixedEntryRingBufferMulti::EntryInfo> entry_info =
331       RawFrontEntryInfo(reader.read_idx_);
332   PW_CHECK_OK(entry_info.status());
333   return entry_info.value();
334 }
335 
336 Result<PrefixedEntryRingBufferMulti::EntryInfo>
RawFrontEntryInfo(size_t source_idx) const337 PrefixedEntryRingBufferMulti::RawFrontEntryInfo(size_t source_idx) const {
338   // Entry headers consists of: (optional prefix byte, varint size, data...)
339 
340   // If a preamble exists, extract the varint and it's bytes in bytes.
341   size_t user_preamble_bytes = 0;
342   uint64_t user_preamble_data = 0;
343   byte varint_buf[varint::kMaxVarint32SizeBytes];
344   if (user_preamble_) {
345     RawRead(varint_buf, source_idx, varint::kMaxVarint32SizeBytes);
346     user_preamble_bytes = varint::Decode(varint_buf, &user_preamble_data);
347     if (user_preamble_bytes == 0u) {
348       return Status::DataLoss();
349     }
350   }
351 
352   // Read the entry header; extract the varint and it's bytes in bytes.
353   RawRead(varint_buf,
354           IncrementIndex(source_idx, user_preamble_bytes),
355           varint::kMaxVarint32SizeBytes);
356   uint64_t entry_bytes;
357   size_t length_bytes = varint::Decode(varint_buf, &entry_bytes);
358   if (length_bytes == 0u) {
359     return Status::DataLoss();
360   }
361 
362   EntryInfo info = {};
363   info.preamble_bytes = user_preamble_bytes + length_bytes;
364   info.user_preamble = static_cast<uint32_t>(user_preamble_data);
365   info.data_bytes = static_cast<size_t>(entry_bytes);
366   return info;
367 }
368 
369 // Comparisons ordered for more probable early exits, assuming the reader is
370 // not far behind the writer compared to the size of the ring.
RawAvailableBytes() const371 size_t PrefixedEntryRingBufferMulti::RawAvailableBytes() const {
372   // Compute slowest reader. If no readers exist, the entire buffer can be
373   // written.
374   if (readers_.empty()) {
375     return buffer_bytes_;
376   }
377 
378   size_t read_idx = GetSlowestReader().read_idx_;
379   // Case: Not wrapped.
380   if (read_idx < write_idx_) {
381     return buffer_bytes_ - (write_idx_ - read_idx);
382   }
383   // Case: Wrapped
384   if (read_idx > write_idx_) {
385     return read_idx - write_idx_;
386   }
387   // Case: Matched read and write heads; empty or full.
388   for (const Reader& reader : readers_) {
389     if (reader.read_idx_ == read_idx && reader.entry_count_ != 0) {
390       return 0;
391     }
392   }
393   return buffer_bytes_;
394 }
395 
RawWrite(span<const std::byte> source)396 void PrefixedEntryRingBufferMulti::RawWrite(span<const std::byte> source) {
397   if (source.size_bytes() == 0) {
398     return;
399   }
400 
401   // Write until the end of the source or the backing buffer.
402   size_t bytes_until_wrap = buffer_bytes_ - write_idx_;
403   size_t bytes_to_copy = std::min(source.size(), bytes_until_wrap);
404   memcpy(buffer_ + write_idx_, source.data(), bytes_to_copy);
405 
406   // If there wasn't space in the backing buffer, wrap to the front.
407   if (bytes_to_copy < source.size()) {
408     memcpy(
409         buffer_, source.data() + bytes_to_copy, source.size() - bytes_to_copy);
410   }
411   write_idx_ = IncrementIndex(write_idx_, source.size());
412 }
413 
RawRead(byte * destination,size_t source_idx,size_t length_bytes) const414 void PrefixedEntryRingBufferMulti::RawRead(byte* destination,
415                                            size_t source_idx,
416                                            size_t length_bytes) const {
417   if (length_bytes == 0) {
418     return;
419   }
420 
421   // Read the pre-wrap bytes.
422   size_t bytes_until_wrap = buffer_bytes_ - source_idx;
423   size_t bytes_to_copy = std::min(length_bytes, bytes_until_wrap);
424   memcpy(destination, buffer_ + source_idx, bytes_to_copy);
425 
426   // Read the post-wrap bytes, if needed.
427   if (bytes_to_copy < length_bytes) {
428     memcpy(destination + bytes_to_copy, buffer_, length_bytes - bytes_to_copy);
429   }
430 }
431 
IncrementIndex(size_t index,size_t count) const432 size_t PrefixedEntryRingBufferMulti::IncrementIndex(size_t index,
433                                                     size_t count) const {
434   // Note: This doesn't use modulus (%) since the branch is cheaper, and we
435   // guarantee that count will never be greater than buffer_bytes_.
436   index += count;
437   if (index > buffer_bytes_) {
438     index -= buffer_bytes_;
439   }
440   return index;
441 }
442 
PeekFrontWithPreamble(span<byte> data,uint32_t & user_preamble_out,size_t & entry_bytes_read_out) const443 Status PrefixedEntryRingBufferMulti::Reader::PeekFrontWithPreamble(
444     span<byte> data,
445     uint32_t& user_preamble_out,
446     size_t& entry_bytes_read_out) const {
447   entry_bytes_read_out = 0;
448   return buffer_->InternalRead(
449       *this, GetOutput(data, &entry_bytes_read_out), false, &user_preamble_out);
450 }
451 
EntriesSize() const452 size_t PrefixedEntryRingBufferMulti::Reader::EntriesSize() const {
453   // Case: Not wrapped.
454   if (read_idx_ < buffer_->write_idx_) {
455     return buffer_->write_idx_ - read_idx_;
456   }
457   // Case: Wrapped.
458   if (read_idx_ > buffer_->write_idx_) {
459     return buffer_->buffer_bytes_ - (read_idx_ - buffer_->write_idx_);
460   }
461 
462   // No entries remaining.
463   if (entry_count_ == 0) {
464     return 0;
465   }
466 
467   return buffer_->buffer_bytes_;
468 }
469 
operator ++()470 iterator& iterator::operator++() {
471   PW_DCHECK_OK(iteration_status_);
472   PW_DCHECK_INT_NE(entry_count_, 0);
473 
474   Result<EntryInfo> info = ring_buffer_->RawFrontEntryInfo(read_idx_);
475   if (!info.status().ok()) {
476     SkipToEnd(info.status());
477     return *this;
478   }
479 
480   // It is guaranteed that the buffer is deringed at this point.
481   read_idx_ += info.value().preamble_bytes + info.value().data_bytes;
482   entry_count_--;
483 
484   if (entry_count_ == 0) {
485     SkipToEnd(OkStatus());
486     return *this;
487   }
488 
489   if (read_idx_ >= ring_buffer_->TotalUsedBytes()) {
490     SkipToEnd(Status::DataLoss());
491     return *this;
492   }
493 
494   info = ring_buffer_->RawFrontEntryInfo(read_idx_);
495   if (!info.status().ok()) {
496     SkipToEnd(info.status());
497     return *this;
498   }
499   return *this;
500 }
501 
operator --()502 iterator& iterator::operator--() {
503   PW_DCHECK_OK(iteration_status_);
504   PW_DCHECK_INT_NE(entry_count_, 0);
505 
506   Result<EntryInfo> info = ring_buffer_->RawFrontEntryInfo(read_idx_);
507   if (!info.status().ok()) {
508     SkipToEnd(info.status());
509     return *this;
510   }
511 
512   // It is guaranteed that the buffer is deringed at this point.
513   read_idx_ -= info.value().preamble_bytes + info.value().data_bytes;
514   entry_count_++;
515 
516   // If read_idx_ is larger that the total bytes, it's wrapped
517   // as the iterator has decremented past the last element.
518   if (read_idx_ > ring_buffer_->TotalSizeBytes()) {
519     SkipToEnd(Status::DataLoss());
520     return *this;
521   }
522 
523   info = ring_buffer_->RawFrontEntryInfo(read_idx_);
524   if (!info.status().ok()) {
525     SkipToEnd(info.status());
526     return *this;
527   }
528   return *this;
529 }
530 
operator *() const531 const Entry& iterator::operator*() const {
532   PW_DCHECK_OK(iteration_status_);
533   PW_DCHECK_INT_NE(entry_count_, 0);
534 
535   Result<EntryInfo> info = ring_buffer_->RawFrontEntryInfo(read_idx_);
536   PW_DCHECK_OK(info.status());
537 
538   entry_ = {
539       .buffer = span<const byte>(
540           ring_buffer_->buffer_ + read_idx_ + info.value().preamble_bytes,
541           info.value().data_bytes),
542       .preamble = info.value().user_preamble,
543   };
544   return entry_;
545 }
546 
547 }  // namespace ring_buffer
548 }  // namespace pw
549