xref: /aosp_15_r20/external/pigweed/pw_multisink/multisink.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #include "pw_multisink/multisink.h"
15 
16 #include <cstring>
17 
18 #include "pw_assert/check.h"
19 #include "pw_bytes/span.h"
20 #include "pw_function/function.h"
21 #include "pw_log/log.h"
22 #include "pw_result/result.h"
23 #include "pw_status/status.h"
24 #include "pw_status/try.h"
25 #include "pw_varint/varint.h"
26 
27 namespace pw {
28 namespace multisink {
29 
HandleEntry(ConstByteSpan entry)30 void MultiSink::HandleEntry(ConstByteSpan entry) {
31   std::lock_guard lock(lock_);
32   const Status push_back_status = ring_buffer_.PushBack(entry, sequence_id_++);
33   PW_DCHECK_OK(push_back_status);
34   NotifyListeners();
35 }
36 
HandleDropped(uint32_t drop_count)37 void MultiSink::HandleDropped(uint32_t drop_count) {
38   std::lock_guard lock(lock_);
39   // Updating the sequence ID helps identify where the ingress drop happend when
40   // a drain peeks or pops.
41   sequence_id_ += drop_count;
42   total_ingress_drops_ += drop_count;
43   NotifyListeners();
44 }
45 
PopEntry(Drain & drain,const Drain::PeekedEntry & entry)46 Status MultiSink::PopEntry(Drain& drain, const Drain::PeekedEntry& entry) {
47   std::lock_guard lock(lock_);
48   PW_DCHECK_PTR_EQ(drain.multisink_, this);
49 
50   // Ignore the call if the entry has been handled already.
51   if (entry.sequence_id() == drain.last_handled_sequence_id_) {
52     return OkStatus();
53   }
54 
55   uint32_t next_entry_sequence_id;
56   Status peek_status = drain.reader_.PeekFrontPreamble(next_entry_sequence_id);
57   if (!peek_status.ok()) {
58     // Ignore errors if the multisink is empty.
59     if (peek_status.IsOutOfRange()) {
60       return OkStatus();
61     }
62     return peek_status;
63   }
64   if (next_entry_sequence_id == entry.sequence_id()) {
65     // A crash should not happen, since the peek was successful and `lock_` is
66     // still held, there shouldn't be any modifications to the multisink in
67     // between peeking and popping.
68     PW_CHECK_OK(drain.reader_.PopFront());
69   }
70   // If the entry's sequence id is not the next one it means that the
71   // multisink advanced since PeekEntry() was called. Advance the last handled
72   // sequence id to the passed entry anyway to mark the fact that the dropped
73   // messages reported on PeekEntry() are handled.
74   drain.last_handled_sequence_id_ = entry.sequence_id();
75   return OkStatus();
76 }
77 
PeekOrPopEntry(Drain & drain,ByteSpan buffer,Request request,uint32_t & drain_drop_count_out,uint32_t & ingress_drop_count_out,uint32_t & entry_sequence_id_out)78 Result<ConstByteSpan> MultiSink::PeekOrPopEntry(
79     Drain& drain,
80     ByteSpan buffer,
81     Request request,
82     uint32_t& drain_drop_count_out,
83     uint32_t& ingress_drop_count_out,
84     uint32_t& entry_sequence_id_out) {
85   size_t bytes_read = 0;
86   entry_sequence_id_out = 0;
87   drain_drop_count_out = 0;
88   ingress_drop_count_out = 0;
89 
90   std::lock_guard lock(lock_);
91   PW_DCHECK_PTR_EQ(drain.multisink_, this);
92 
93   const Status peek_status = drain.reader_.PeekFrontWithPreamble(
94       buffer, entry_sequence_id_out, bytes_read);
95 
96   if (peek_status.IsOutOfRange()) {
97     // If the drain has caught up, report the last handled sequence ID so that
98     // it can still process any dropped entries.
99     // Negation overflow is by design.
100     /// -fsanitize-undefined-ignore-overflow-pattern=negated-unsigned-const
101     entry_sequence_id_out = sequence_id_ - 1;
102   } else if (!peek_status.ok()) {
103     // Discard the entry if the result isn't OK or OUT_OF_RANGE and exit, as the
104     // entry_sequence_id_out cannot be used for computation. Later invocations
105     // will calculate the drop count.
106     PW_CHECK(drain.reader_.PopFront().ok());
107     return peek_status;
108   }
109 
110   // Compute the drop count delta by comparing this entry's sequence ID with the
111   // last sequence ID this drain successfully read.
112   //
113   // The drop count calculation simply computes the difference between the
114   // current and last sequence IDs. Consecutive successful reads will always
115   // differ by one at least, so it is subtracted out. If the read was not
116   // successful, the difference is not adjusted.
117   drain_drop_count_out = entry_sequence_id_out -
118                          drain.last_handled_sequence_id_ -
119                          (peek_status.ok() ? 1 : 0);
120 
121   // Only report the ingress drop count when the drain catches up to where the
122   // drop happened, accounting only for the drops found and no more, as
123   // indicated by the gap in sequence IDs.
124   if (drain_drop_count_out > 0) {
125     ingress_drop_count_out =
126         std::min(drain_drop_count_out,
127                  total_ingress_drops_ - drain.last_handled_ingress_drop_count_);
128     // Remove the ingress drop count duplicated in drain_drop_count_out.
129     drain_drop_count_out -= ingress_drop_count_out;
130     // Check if all the ingress drops were reported.
131     drain.last_handled_ingress_drop_count_ =
132         total_ingress_drops_ > ingress_drop_count_out
133             ? total_ingress_drops_ - ingress_drop_count_out
134             : total_ingress_drops_;
135   }
136 
137   // The Peek above may have failed due to OutOfRange, now that we've set the
138   // drop count see if we should return before attempting to pop.
139   if (peek_status.IsOutOfRange()) {
140     // No more entries, update the drain.
141     drain.last_handled_sequence_id_ = entry_sequence_id_out;
142     return peek_status;
143   }
144   if (request == Request::kPop) {
145     PW_CHECK(drain.reader_.PopFront().ok());
146     drain.last_handled_sequence_id_ = entry_sequence_id_out;
147   }
148   return as_bytes(buffer.first(bytes_read));
149 }
150 
AttachDrain(Drain & drain)151 void MultiSink::AttachDrain(Drain& drain) {
152   std::lock_guard lock(lock_);
153   PW_DCHECK_PTR_EQ(drain.multisink_, nullptr);
154   drain.multisink_ = this;
155 
156   PW_CHECK_OK(ring_buffer_.AttachReader(drain.reader_));
157   if (&drain == &oldest_entry_drain_) {
158     // Negation overflow is by design.
159     /// -fsanitize-undefined-ignore-overflow-pattern=negated-unsigned-const
160     drain.last_handled_sequence_id_ = sequence_id_ - 1;
161   } else {
162     drain.last_handled_sequence_id_ =
163         oldest_entry_drain_.last_handled_sequence_id_;
164   }
165   drain.last_peek_sequence_id_ = drain.last_handled_sequence_id_;
166   drain.last_handled_ingress_drop_count_ = 0;
167 }
168 
DetachDrain(Drain & drain)169 void MultiSink::DetachDrain(Drain& drain) {
170   std::lock_guard lock(lock_);
171   PW_DCHECK_PTR_EQ(drain.multisink_, this);
172   drain.multisink_ = nullptr;
173   PW_CHECK_OK(ring_buffer_.DetachReader(drain.reader_),
174               "The drain wasn't already attached.");
175 }
176 
AttachListener(Listener & listener)177 void MultiSink::AttachListener(Listener& listener) {
178   std::lock_guard lock(lock_);
179   listeners_.push_back(listener);
180   // Notify the newly added entry, in case there are items in the sink.
181   listener.OnNewEntryAvailable();
182 }
183 
DetachListener(Listener & listener)184 void MultiSink::DetachListener(Listener& listener) {
185   std::lock_guard lock(lock_);
186   [[maybe_unused]] bool was_detached = listeners_.remove(listener);
187   PW_DCHECK(was_detached, "The listener was already attached.");
188 }
189 
Clear()190 void MultiSink::Clear() {
191   std::lock_guard lock(lock_);
192   ring_buffer_.Clear();
193 }
194 
NotifyListeners()195 void MultiSink::NotifyListeners() {
196   for (auto& listener : listeners_) {
197     listener.OnNewEntryAvailable();
198   }
199 }
200 
UnsafeForEachEntry(const Function<void (ConstByteSpan)> & callback,size_t max_num_entries)201 Status MultiSink::UnsafeForEachEntry(
202     const Function<void(ConstByteSpan)>& callback, size_t max_num_entries) {
203   MultiSink::UnsafeIterationWrapper multisink_iteration = UnsafeIteration();
204 
205   // First count the number of entries.
206   size_t num_entries = 0;
207   for ([[maybe_unused]] ConstByteSpan entry : multisink_iteration) {
208     num_entries++;
209   }
210 
211   // Log up to the max number of logs to avoid overflowing the crash log
212   // writer.
213   const size_t first_logged_offset =
214       max_num_entries > num_entries ? 0 : num_entries - max_num_entries;
215   pw::multisink::MultiSink::iterator it = multisink_iteration.begin();
216   for (size_t offset = 0; it != multisink_iteration.end(); ++it, ++offset) {
217     if (offset < first_logged_offset) {
218       continue;  // Skip this log.
219     }
220     callback(*it);
221   }
222   if (!it.status().ok()) {
223     PW_LOG_WARN("Multisink corruption detected, some entries may be missing");
224     return Status::DataLoss();
225   }
226 
227   return OkStatus();
228 }
229 
UnsafeForEachEntryFromEnd(const Function<void (ConstByteSpan)> & callback,size_t max_size_bytes)230 Status MultiSink::UnsafeForEachEntryFromEnd(
231     const Function<void(ConstByteSpan)>& callback, size_t max_size_bytes) {
232   MultiSink::UnsafeIterationWrapper multisink_iteration = UnsafeIteration();
233 
234   // First count the number of entries and total size of the entries.
235   size_t num_entries = 0;
236   size_t total_bytes = 0;
237   iterator it = multisink_iteration.begin();
238   iterator last_elem_it;
239   for (; it != multisink_iteration.end(); ++it) {
240     num_entries++;
241     total_bytes += (*it).size();
242     last_elem_it = it;
243   }
244 
245   size_t max_num_entries = std::numeric_limits<size_t>::max();
246   // All entries won't fit in the available space, so reverse iterate
247   // from the end to calculate the number of elements from the end
248   // which will fit in the available space.
249   if (total_bytes > max_size_bytes) {
250     total_bytes = 0;
251     max_num_entries = 0;
252     while (total_bytes <= max_size_bytes) {
253       total_bytes += (*last_elem_it).size();
254       last_elem_it--;
255       max_num_entries++;
256     }
257   }
258 
259   // Log up to the max number of logs to avoid overflowing the crash log
260   // writer.
261   const size_t first_logged_offset =
262       max_num_entries > num_entries ? 0 : num_entries - max_num_entries;
263   it = multisink_iteration.begin();
264   for (size_t offset = 0; it != multisink_iteration.end(); ++it, ++offset) {
265     if (offset < first_logged_offset) {
266       continue;  // Skip this log.
267     }
268     callback(*it);
269   }
270   if (!it.status().ok()) {
271     PW_LOG_WARN("Multisink corruption detected, some entries may be missing");
272     return Status::DataLoss();
273   }
274 
275   return OkStatus();
276 }
277 
PopEntry(const PeekedEntry & entry)278 Status MultiSink::Drain::PopEntry(const PeekedEntry& entry) {
279   PW_DCHECK_NOTNULL(multisink_);
280   return multisink_->PopEntry(*this, entry);
281 }
282 
PeekEntry(ByteSpan buffer,uint32_t & drain_drop_count_out,uint32_t & ingress_drop_count_out)283 Result<MultiSink::Drain::PeekedEntry> MultiSink::Drain::PeekEntry(
284     ByteSpan buffer,
285     uint32_t& drain_drop_count_out,
286     uint32_t& ingress_drop_count_out) {
287   PW_DCHECK_NOTNULL(multisink_);
288   uint32_t entry_sequence_id_out;
289   Result<ConstByteSpan> peek_result =
290       multisink_->PeekOrPopEntry(*this,
291                                  buffer,
292                                  Request::kPeek,
293                                  drain_drop_count_out,
294                                  ingress_drop_count_out,
295                                  entry_sequence_id_out);
296   if (!peek_result.ok()) {
297     return peek_result.status();
298   }
299   return PeekedEntry(peek_result.value(), entry_sequence_id_out);
300 }
301 
PopEntry(ByteSpan buffer,uint32_t & drain_drop_count_out,uint32_t & ingress_drop_count_out)302 Result<ConstByteSpan> MultiSink::Drain::PopEntry(
303     ByteSpan buffer,
304     uint32_t& drain_drop_count_out,
305     uint32_t& ingress_drop_count_out) {
306   PW_DCHECK_NOTNULL(multisink_);
307   uint32_t entry_sequence_id_out;
308   return multisink_->PeekOrPopEntry(*this,
309                                     buffer,
310                                     Request::kPop,
311                                     drain_drop_count_out,
312                                     ingress_drop_count_out,
313                                     entry_sequence_id_out);
314 }
315 
316 }  // namespace multisink
317 }  // namespace pw
318