1*61c4878aSAndroid Build Coastguard Worker // Copyright 2021 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker #include "pw_multisink/multisink.h"
15*61c4878aSAndroid Build Coastguard Worker
16*61c4878aSAndroid Build Coastguard Worker #include <cstring>
17*61c4878aSAndroid Build Coastguard Worker
18*61c4878aSAndroid Build Coastguard Worker #include "pw_assert/check.h"
19*61c4878aSAndroid Build Coastguard Worker #include "pw_bytes/span.h"
20*61c4878aSAndroid Build Coastguard Worker #include "pw_function/function.h"
21*61c4878aSAndroid Build Coastguard Worker #include "pw_log/log.h"
22*61c4878aSAndroid Build Coastguard Worker #include "pw_result/result.h"
23*61c4878aSAndroid Build Coastguard Worker #include "pw_status/status.h"
24*61c4878aSAndroid Build Coastguard Worker #include "pw_status/try.h"
25*61c4878aSAndroid Build Coastguard Worker #include "pw_varint/varint.h"
26*61c4878aSAndroid Build Coastguard Worker
27*61c4878aSAndroid Build Coastguard Worker namespace pw {
28*61c4878aSAndroid Build Coastguard Worker namespace multisink {
29*61c4878aSAndroid Build Coastguard Worker
HandleEntry(ConstByteSpan entry)30*61c4878aSAndroid Build Coastguard Worker void MultiSink::HandleEntry(ConstByteSpan entry) {
31*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(lock_);
32*61c4878aSAndroid Build Coastguard Worker const Status push_back_status = ring_buffer_.PushBack(entry, sequence_id_++);
33*61c4878aSAndroid Build Coastguard Worker PW_DCHECK_OK(push_back_status);
34*61c4878aSAndroid Build Coastguard Worker NotifyListeners();
35*61c4878aSAndroid Build Coastguard Worker }
36*61c4878aSAndroid Build Coastguard Worker
HandleDropped(uint32_t drop_count)37*61c4878aSAndroid Build Coastguard Worker void MultiSink::HandleDropped(uint32_t drop_count) {
38*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(lock_);
39*61c4878aSAndroid Build Coastguard Worker // Updating the sequence ID helps identify where the ingress drop happend when
40*61c4878aSAndroid Build Coastguard Worker // a drain peeks or pops.
41*61c4878aSAndroid Build Coastguard Worker sequence_id_ += drop_count;
42*61c4878aSAndroid Build Coastguard Worker total_ingress_drops_ += drop_count;
43*61c4878aSAndroid Build Coastguard Worker NotifyListeners();
44*61c4878aSAndroid Build Coastguard Worker }
45*61c4878aSAndroid Build Coastguard Worker
PopEntry(Drain & drain,const Drain::PeekedEntry & entry)46*61c4878aSAndroid Build Coastguard Worker Status MultiSink::PopEntry(Drain& drain, const Drain::PeekedEntry& entry) {
47*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(lock_);
48*61c4878aSAndroid Build Coastguard Worker PW_DCHECK_PTR_EQ(drain.multisink_, this);
49*61c4878aSAndroid Build Coastguard Worker
50*61c4878aSAndroid Build Coastguard Worker // Ignore the call if the entry has been handled already.
51*61c4878aSAndroid Build Coastguard Worker if (entry.sequence_id() == drain.last_handled_sequence_id_) {
52*61c4878aSAndroid Build Coastguard Worker return OkStatus();
53*61c4878aSAndroid Build Coastguard Worker }
54*61c4878aSAndroid Build Coastguard Worker
55*61c4878aSAndroid Build Coastguard Worker uint32_t next_entry_sequence_id;
56*61c4878aSAndroid Build Coastguard Worker Status peek_status = drain.reader_.PeekFrontPreamble(next_entry_sequence_id);
57*61c4878aSAndroid Build Coastguard Worker if (!peek_status.ok()) {
58*61c4878aSAndroid Build Coastguard Worker // Ignore errors if the multisink is empty.
59*61c4878aSAndroid Build Coastguard Worker if (peek_status.IsOutOfRange()) {
60*61c4878aSAndroid Build Coastguard Worker return OkStatus();
61*61c4878aSAndroid Build Coastguard Worker }
62*61c4878aSAndroid Build Coastguard Worker return peek_status;
63*61c4878aSAndroid Build Coastguard Worker }
64*61c4878aSAndroid Build Coastguard Worker if (next_entry_sequence_id == entry.sequence_id()) {
65*61c4878aSAndroid Build Coastguard Worker // A crash should not happen, since the peek was successful and `lock_` is
66*61c4878aSAndroid Build Coastguard Worker // still held, there shouldn't be any modifications to the multisink in
67*61c4878aSAndroid Build Coastguard Worker // between peeking and popping.
68*61c4878aSAndroid Build Coastguard Worker PW_CHECK_OK(drain.reader_.PopFront());
69*61c4878aSAndroid Build Coastguard Worker }
70*61c4878aSAndroid Build Coastguard Worker // If the entry's sequence id is not the next one it means that the
71*61c4878aSAndroid Build Coastguard Worker // multisink advanced since PeekEntry() was called. Advance the last handled
72*61c4878aSAndroid Build Coastguard Worker // sequence id to the passed entry anyway to mark the fact that the dropped
73*61c4878aSAndroid Build Coastguard Worker // messages reported on PeekEntry() are handled.
74*61c4878aSAndroid Build Coastguard Worker drain.last_handled_sequence_id_ = entry.sequence_id();
75*61c4878aSAndroid Build Coastguard Worker return OkStatus();
76*61c4878aSAndroid Build Coastguard Worker }
77*61c4878aSAndroid Build Coastguard Worker
PeekOrPopEntry(Drain & drain,ByteSpan buffer,Request request,uint32_t & drain_drop_count_out,uint32_t & ingress_drop_count_out,uint32_t & entry_sequence_id_out)78*61c4878aSAndroid Build Coastguard Worker Result<ConstByteSpan> MultiSink::PeekOrPopEntry(
79*61c4878aSAndroid Build Coastguard Worker Drain& drain,
80*61c4878aSAndroid Build Coastguard Worker ByteSpan buffer,
81*61c4878aSAndroid Build Coastguard Worker Request request,
82*61c4878aSAndroid Build Coastguard Worker uint32_t& drain_drop_count_out,
83*61c4878aSAndroid Build Coastguard Worker uint32_t& ingress_drop_count_out,
84*61c4878aSAndroid Build Coastguard Worker uint32_t& entry_sequence_id_out) {
85*61c4878aSAndroid Build Coastguard Worker size_t bytes_read = 0;
86*61c4878aSAndroid Build Coastguard Worker entry_sequence_id_out = 0;
87*61c4878aSAndroid Build Coastguard Worker drain_drop_count_out = 0;
88*61c4878aSAndroid Build Coastguard Worker ingress_drop_count_out = 0;
89*61c4878aSAndroid Build Coastguard Worker
90*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(lock_);
91*61c4878aSAndroid Build Coastguard Worker PW_DCHECK_PTR_EQ(drain.multisink_, this);
92*61c4878aSAndroid Build Coastguard Worker
93*61c4878aSAndroid Build Coastguard Worker const Status peek_status = drain.reader_.PeekFrontWithPreamble(
94*61c4878aSAndroid Build Coastguard Worker buffer, entry_sequence_id_out, bytes_read);
95*61c4878aSAndroid Build Coastguard Worker
96*61c4878aSAndroid Build Coastguard Worker if (peek_status.IsOutOfRange()) {
97*61c4878aSAndroid Build Coastguard Worker // If the drain has caught up, report the last handled sequence ID so that
98*61c4878aSAndroid Build Coastguard Worker // it can still process any dropped entries.
99*61c4878aSAndroid Build Coastguard Worker // Negation overflow is by design.
100*61c4878aSAndroid Build Coastguard Worker /// -fsanitize-undefined-ignore-overflow-pattern=negated-unsigned-const
101*61c4878aSAndroid Build Coastguard Worker entry_sequence_id_out = sequence_id_ - 1;
102*61c4878aSAndroid Build Coastguard Worker } else if (!peek_status.ok()) {
103*61c4878aSAndroid Build Coastguard Worker // Discard the entry if the result isn't OK or OUT_OF_RANGE and exit, as the
104*61c4878aSAndroid Build Coastguard Worker // entry_sequence_id_out cannot be used for computation. Later invocations
105*61c4878aSAndroid Build Coastguard Worker // will calculate the drop count.
106*61c4878aSAndroid Build Coastguard Worker PW_CHECK(drain.reader_.PopFront().ok());
107*61c4878aSAndroid Build Coastguard Worker return peek_status;
108*61c4878aSAndroid Build Coastguard Worker }
109*61c4878aSAndroid Build Coastguard Worker
110*61c4878aSAndroid Build Coastguard Worker // Compute the drop count delta by comparing this entry's sequence ID with the
111*61c4878aSAndroid Build Coastguard Worker // last sequence ID this drain successfully read.
112*61c4878aSAndroid Build Coastguard Worker //
113*61c4878aSAndroid Build Coastguard Worker // The drop count calculation simply computes the difference between the
114*61c4878aSAndroid Build Coastguard Worker // current and last sequence IDs. Consecutive successful reads will always
115*61c4878aSAndroid Build Coastguard Worker // differ by one at least, so it is subtracted out. If the read was not
116*61c4878aSAndroid Build Coastguard Worker // successful, the difference is not adjusted.
117*61c4878aSAndroid Build Coastguard Worker drain_drop_count_out = entry_sequence_id_out -
118*61c4878aSAndroid Build Coastguard Worker drain.last_handled_sequence_id_ -
119*61c4878aSAndroid Build Coastguard Worker (peek_status.ok() ? 1 : 0);
120*61c4878aSAndroid Build Coastguard Worker
121*61c4878aSAndroid Build Coastguard Worker // Only report the ingress drop count when the drain catches up to where the
122*61c4878aSAndroid Build Coastguard Worker // drop happened, accounting only for the drops found and no more, as
123*61c4878aSAndroid Build Coastguard Worker // indicated by the gap in sequence IDs.
124*61c4878aSAndroid Build Coastguard Worker if (drain_drop_count_out > 0) {
125*61c4878aSAndroid Build Coastguard Worker ingress_drop_count_out =
126*61c4878aSAndroid Build Coastguard Worker std::min(drain_drop_count_out,
127*61c4878aSAndroid Build Coastguard Worker total_ingress_drops_ - drain.last_handled_ingress_drop_count_);
128*61c4878aSAndroid Build Coastguard Worker // Remove the ingress drop count duplicated in drain_drop_count_out.
129*61c4878aSAndroid Build Coastguard Worker drain_drop_count_out -= ingress_drop_count_out;
130*61c4878aSAndroid Build Coastguard Worker // Check if all the ingress drops were reported.
131*61c4878aSAndroid Build Coastguard Worker drain.last_handled_ingress_drop_count_ =
132*61c4878aSAndroid Build Coastguard Worker total_ingress_drops_ > ingress_drop_count_out
133*61c4878aSAndroid Build Coastguard Worker ? total_ingress_drops_ - ingress_drop_count_out
134*61c4878aSAndroid Build Coastguard Worker : total_ingress_drops_;
135*61c4878aSAndroid Build Coastguard Worker }
136*61c4878aSAndroid Build Coastguard Worker
137*61c4878aSAndroid Build Coastguard Worker // The Peek above may have failed due to OutOfRange, now that we've set the
138*61c4878aSAndroid Build Coastguard Worker // drop count see if we should return before attempting to pop.
139*61c4878aSAndroid Build Coastguard Worker if (peek_status.IsOutOfRange()) {
140*61c4878aSAndroid Build Coastguard Worker // No more entries, update the drain.
141*61c4878aSAndroid Build Coastguard Worker drain.last_handled_sequence_id_ = entry_sequence_id_out;
142*61c4878aSAndroid Build Coastguard Worker return peek_status;
143*61c4878aSAndroid Build Coastguard Worker }
144*61c4878aSAndroid Build Coastguard Worker if (request == Request::kPop) {
145*61c4878aSAndroid Build Coastguard Worker PW_CHECK(drain.reader_.PopFront().ok());
146*61c4878aSAndroid Build Coastguard Worker drain.last_handled_sequence_id_ = entry_sequence_id_out;
147*61c4878aSAndroid Build Coastguard Worker }
148*61c4878aSAndroid Build Coastguard Worker return as_bytes(buffer.first(bytes_read));
149*61c4878aSAndroid Build Coastguard Worker }
150*61c4878aSAndroid Build Coastguard Worker
AttachDrain(Drain & drain)151*61c4878aSAndroid Build Coastguard Worker void MultiSink::AttachDrain(Drain& drain) {
152*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(lock_);
153*61c4878aSAndroid Build Coastguard Worker PW_DCHECK_PTR_EQ(drain.multisink_, nullptr);
154*61c4878aSAndroid Build Coastguard Worker drain.multisink_ = this;
155*61c4878aSAndroid Build Coastguard Worker
156*61c4878aSAndroid Build Coastguard Worker PW_CHECK_OK(ring_buffer_.AttachReader(drain.reader_));
157*61c4878aSAndroid Build Coastguard Worker if (&drain == &oldest_entry_drain_) {
158*61c4878aSAndroid Build Coastguard Worker // Negation overflow is by design.
159*61c4878aSAndroid Build Coastguard Worker /// -fsanitize-undefined-ignore-overflow-pattern=negated-unsigned-const
160*61c4878aSAndroid Build Coastguard Worker drain.last_handled_sequence_id_ = sequence_id_ - 1;
161*61c4878aSAndroid Build Coastguard Worker } else {
162*61c4878aSAndroid Build Coastguard Worker drain.last_handled_sequence_id_ =
163*61c4878aSAndroid Build Coastguard Worker oldest_entry_drain_.last_handled_sequence_id_;
164*61c4878aSAndroid Build Coastguard Worker }
165*61c4878aSAndroid Build Coastguard Worker drain.last_peek_sequence_id_ = drain.last_handled_sequence_id_;
166*61c4878aSAndroid Build Coastguard Worker drain.last_handled_ingress_drop_count_ = 0;
167*61c4878aSAndroid Build Coastguard Worker }
168*61c4878aSAndroid Build Coastguard Worker
DetachDrain(Drain & drain)169*61c4878aSAndroid Build Coastguard Worker void MultiSink::DetachDrain(Drain& drain) {
170*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(lock_);
171*61c4878aSAndroid Build Coastguard Worker PW_DCHECK_PTR_EQ(drain.multisink_, this);
172*61c4878aSAndroid Build Coastguard Worker drain.multisink_ = nullptr;
173*61c4878aSAndroid Build Coastguard Worker PW_CHECK_OK(ring_buffer_.DetachReader(drain.reader_),
174*61c4878aSAndroid Build Coastguard Worker "The drain wasn't already attached.");
175*61c4878aSAndroid Build Coastguard Worker }
176*61c4878aSAndroid Build Coastguard Worker
AttachListener(Listener & listener)177*61c4878aSAndroid Build Coastguard Worker void MultiSink::AttachListener(Listener& listener) {
178*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(lock_);
179*61c4878aSAndroid Build Coastguard Worker listeners_.push_back(listener);
180*61c4878aSAndroid Build Coastguard Worker // Notify the newly added entry, in case there are items in the sink.
181*61c4878aSAndroid Build Coastguard Worker listener.OnNewEntryAvailable();
182*61c4878aSAndroid Build Coastguard Worker }
183*61c4878aSAndroid Build Coastguard Worker
DetachListener(Listener & listener)184*61c4878aSAndroid Build Coastguard Worker void MultiSink::DetachListener(Listener& listener) {
185*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(lock_);
186*61c4878aSAndroid Build Coastguard Worker [[maybe_unused]] bool was_detached = listeners_.remove(listener);
187*61c4878aSAndroid Build Coastguard Worker PW_DCHECK(was_detached, "The listener was already attached.");
188*61c4878aSAndroid Build Coastguard Worker }
189*61c4878aSAndroid Build Coastguard Worker
Clear()190*61c4878aSAndroid Build Coastguard Worker void MultiSink::Clear() {
191*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(lock_);
192*61c4878aSAndroid Build Coastguard Worker ring_buffer_.Clear();
193*61c4878aSAndroid Build Coastguard Worker }
194*61c4878aSAndroid Build Coastguard Worker
NotifyListeners()195*61c4878aSAndroid Build Coastguard Worker void MultiSink::NotifyListeners() {
196*61c4878aSAndroid Build Coastguard Worker for (auto& listener : listeners_) {
197*61c4878aSAndroid Build Coastguard Worker listener.OnNewEntryAvailable();
198*61c4878aSAndroid Build Coastguard Worker }
199*61c4878aSAndroid Build Coastguard Worker }
200*61c4878aSAndroid Build Coastguard Worker
UnsafeForEachEntry(const Function<void (ConstByteSpan)> & callback,size_t max_num_entries)201*61c4878aSAndroid Build Coastguard Worker Status MultiSink::UnsafeForEachEntry(
202*61c4878aSAndroid Build Coastguard Worker const Function<void(ConstByteSpan)>& callback, size_t max_num_entries) {
203*61c4878aSAndroid Build Coastguard Worker MultiSink::UnsafeIterationWrapper multisink_iteration = UnsafeIteration();
204*61c4878aSAndroid Build Coastguard Worker
205*61c4878aSAndroid Build Coastguard Worker // First count the number of entries.
206*61c4878aSAndroid Build Coastguard Worker size_t num_entries = 0;
207*61c4878aSAndroid Build Coastguard Worker for ([[maybe_unused]] ConstByteSpan entry : multisink_iteration) {
208*61c4878aSAndroid Build Coastguard Worker num_entries++;
209*61c4878aSAndroid Build Coastguard Worker }
210*61c4878aSAndroid Build Coastguard Worker
211*61c4878aSAndroid Build Coastguard Worker // Log up to the max number of logs to avoid overflowing the crash log
212*61c4878aSAndroid Build Coastguard Worker // writer.
213*61c4878aSAndroid Build Coastguard Worker const size_t first_logged_offset =
214*61c4878aSAndroid Build Coastguard Worker max_num_entries > num_entries ? 0 : num_entries - max_num_entries;
215*61c4878aSAndroid Build Coastguard Worker pw::multisink::MultiSink::iterator it = multisink_iteration.begin();
216*61c4878aSAndroid Build Coastguard Worker for (size_t offset = 0; it != multisink_iteration.end(); ++it, ++offset) {
217*61c4878aSAndroid Build Coastguard Worker if (offset < first_logged_offset) {
218*61c4878aSAndroid Build Coastguard Worker continue; // Skip this log.
219*61c4878aSAndroid Build Coastguard Worker }
220*61c4878aSAndroid Build Coastguard Worker callback(*it);
221*61c4878aSAndroid Build Coastguard Worker }
222*61c4878aSAndroid Build Coastguard Worker if (!it.status().ok()) {
223*61c4878aSAndroid Build Coastguard Worker PW_LOG_WARN("Multisink corruption detected, some entries may be missing");
224*61c4878aSAndroid Build Coastguard Worker return Status::DataLoss();
225*61c4878aSAndroid Build Coastguard Worker }
226*61c4878aSAndroid Build Coastguard Worker
227*61c4878aSAndroid Build Coastguard Worker return OkStatus();
228*61c4878aSAndroid Build Coastguard Worker }
229*61c4878aSAndroid Build Coastguard Worker
UnsafeForEachEntryFromEnd(const Function<void (ConstByteSpan)> & callback,size_t max_size_bytes)230*61c4878aSAndroid Build Coastguard Worker Status MultiSink::UnsafeForEachEntryFromEnd(
231*61c4878aSAndroid Build Coastguard Worker const Function<void(ConstByteSpan)>& callback, size_t max_size_bytes) {
232*61c4878aSAndroid Build Coastguard Worker MultiSink::UnsafeIterationWrapper multisink_iteration = UnsafeIteration();
233*61c4878aSAndroid Build Coastguard Worker
234*61c4878aSAndroid Build Coastguard Worker // First count the number of entries and total size of the entries.
235*61c4878aSAndroid Build Coastguard Worker size_t num_entries = 0;
236*61c4878aSAndroid Build Coastguard Worker size_t total_bytes = 0;
237*61c4878aSAndroid Build Coastguard Worker iterator it = multisink_iteration.begin();
238*61c4878aSAndroid Build Coastguard Worker iterator last_elem_it;
239*61c4878aSAndroid Build Coastguard Worker for (; it != multisink_iteration.end(); ++it) {
240*61c4878aSAndroid Build Coastguard Worker num_entries++;
241*61c4878aSAndroid Build Coastguard Worker total_bytes += (*it).size();
242*61c4878aSAndroid Build Coastguard Worker last_elem_it = it;
243*61c4878aSAndroid Build Coastguard Worker }
244*61c4878aSAndroid Build Coastguard Worker
245*61c4878aSAndroid Build Coastguard Worker size_t max_num_entries = std::numeric_limits<size_t>::max();
246*61c4878aSAndroid Build Coastguard Worker // All entries won't fit in the available space, so reverse iterate
247*61c4878aSAndroid Build Coastguard Worker // from the end to calculate the number of elements from the end
248*61c4878aSAndroid Build Coastguard Worker // which will fit in the available space.
249*61c4878aSAndroid Build Coastguard Worker if (total_bytes > max_size_bytes) {
250*61c4878aSAndroid Build Coastguard Worker total_bytes = 0;
251*61c4878aSAndroid Build Coastguard Worker max_num_entries = 0;
252*61c4878aSAndroid Build Coastguard Worker while (total_bytes <= max_size_bytes) {
253*61c4878aSAndroid Build Coastguard Worker total_bytes += (*last_elem_it).size();
254*61c4878aSAndroid Build Coastguard Worker last_elem_it--;
255*61c4878aSAndroid Build Coastguard Worker max_num_entries++;
256*61c4878aSAndroid Build Coastguard Worker }
257*61c4878aSAndroid Build Coastguard Worker }
258*61c4878aSAndroid Build Coastguard Worker
259*61c4878aSAndroid Build Coastguard Worker // Log up to the max number of logs to avoid overflowing the crash log
260*61c4878aSAndroid Build Coastguard Worker // writer.
261*61c4878aSAndroid Build Coastguard Worker const size_t first_logged_offset =
262*61c4878aSAndroid Build Coastguard Worker max_num_entries > num_entries ? 0 : num_entries - max_num_entries;
263*61c4878aSAndroid Build Coastguard Worker it = multisink_iteration.begin();
264*61c4878aSAndroid Build Coastguard Worker for (size_t offset = 0; it != multisink_iteration.end(); ++it, ++offset) {
265*61c4878aSAndroid Build Coastguard Worker if (offset < first_logged_offset) {
266*61c4878aSAndroid Build Coastguard Worker continue; // Skip this log.
267*61c4878aSAndroid Build Coastguard Worker }
268*61c4878aSAndroid Build Coastguard Worker callback(*it);
269*61c4878aSAndroid Build Coastguard Worker }
270*61c4878aSAndroid Build Coastguard Worker if (!it.status().ok()) {
271*61c4878aSAndroid Build Coastguard Worker PW_LOG_WARN("Multisink corruption detected, some entries may be missing");
272*61c4878aSAndroid Build Coastguard Worker return Status::DataLoss();
273*61c4878aSAndroid Build Coastguard Worker }
274*61c4878aSAndroid Build Coastguard Worker
275*61c4878aSAndroid Build Coastguard Worker return OkStatus();
276*61c4878aSAndroid Build Coastguard Worker }
277*61c4878aSAndroid Build Coastguard Worker
PopEntry(const PeekedEntry & entry)278*61c4878aSAndroid Build Coastguard Worker Status MultiSink::Drain::PopEntry(const PeekedEntry& entry) {
279*61c4878aSAndroid Build Coastguard Worker PW_DCHECK_NOTNULL(multisink_);
280*61c4878aSAndroid Build Coastguard Worker return multisink_->PopEntry(*this, entry);
281*61c4878aSAndroid Build Coastguard Worker }
282*61c4878aSAndroid Build Coastguard Worker
PeekEntry(ByteSpan buffer,uint32_t & drain_drop_count_out,uint32_t & ingress_drop_count_out)283*61c4878aSAndroid Build Coastguard Worker Result<MultiSink::Drain::PeekedEntry> MultiSink::Drain::PeekEntry(
284*61c4878aSAndroid Build Coastguard Worker ByteSpan buffer,
285*61c4878aSAndroid Build Coastguard Worker uint32_t& drain_drop_count_out,
286*61c4878aSAndroid Build Coastguard Worker uint32_t& ingress_drop_count_out) {
287*61c4878aSAndroid Build Coastguard Worker PW_DCHECK_NOTNULL(multisink_);
288*61c4878aSAndroid Build Coastguard Worker uint32_t entry_sequence_id_out;
289*61c4878aSAndroid Build Coastguard Worker Result<ConstByteSpan> peek_result =
290*61c4878aSAndroid Build Coastguard Worker multisink_->PeekOrPopEntry(*this,
291*61c4878aSAndroid Build Coastguard Worker buffer,
292*61c4878aSAndroid Build Coastguard Worker Request::kPeek,
293*61c4878aSAndroid Build Coastguard Worker drain_drop_count_out,
294*61c4878aSAndroid Build Coastguard Worker ingress_drop_count_out,
295*61c4878aSAndroid Build Coastguard Worker entry_sequence_id_out);
296*61c4878aSAndroid Build Coastguard Worker if (!peek_result.ok()) {
297*61c4878aSAndroid Build Coastguard Worker return peek_result.status();
298*61c4878aSAndroid Build Coastguard Worker }
299*61c4878aSAndroid Build Coastguard Worker return PeekedEntry(peek_result.value(), entry_sequence_id_out);
300*61c4878aSAndroid Build Coastguard Worker }
301*61c4878aSAndroid Build Coastguard Worker
PopEntry(ByteSpan buffer,uint32_t & drain_drop_count_out,uint32_t & ingress_drop_count_out)302*61c4878aSAndroid Build Coastguard Worker Result<ConstByteSpan> MultiSink::Drain::PopEntry(
303*61c4878aSAndroid Build Coastguard Worker ByteSpan buffer,
304*61c4878aSAndroid Build Coastguard Worker uint32_t& drain_drop_count_out,
305*61c4878aSAndroid Build Coastguard Worker uint32_t& ingress_drop_count_out) {
306*61c4878aSAndroid Build Coastguard Worker PW_DCHECK_NOTNULL(multisink_);
307*61c4878aSAndroid Build Coastguard Worker uint32_t entry_sequence_id_out;
308*61c4878aSAndroid Build Coastguard Worker return multisink_->PeekOrPopEntry(*this,
309*61c4878aSAndroid Build Coastguard Worker buffer,
310*61c4878aSAndroid Build Coastguard Worker Request::kPop,
311*61c4878aSAndroid Build Coastguard Worker drain_drop_count_out,
312*61c4878aSAndroid Build Coastguard Worker ingress_drop_count_out,
313*61c4878aSAndroid Build Coastguard Worker entry_sequence_id_out);
314*61c4878aSAndroid Build Coastguard Worker }
315*61c4878aSAndroid Build Coastguard Worker
316*61c4878aSAndroid Build Coastguard Worker } // namespace multisink
317*61c4878aSAndroid Build Coastguard Worker } // namespace pw
318