xref: /aosp_15_r20/external/webrtc/net/dcsctp/tx/rr_send_queue.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "net/dcsctp/tx/rr_send_queue.h"
11 
12 #include <cstdint>
13 #include <deque>
14 #include <limits>
15 #include <map>
16 #include <set>
17 #include <utility>
18 #include <vector>
19 
20 #include "absl/algorithm/container.h"
21 #include "absl/types/optional.h"
22 #include "api/array_view.h"
23 #include "net/dcsctp/common/str_join.h"
24 #include "net/dcsctp/packet/data.h"
25 #include "net/dcsctp/public/dcsctp_message.h"
26 #include "net/dcsctp/public/dcsctp_socket.h"
27 #include "net/dcsctp/public/types.h"
28 #include "net/dcsctp/tx/send_queue.h"
29 #include "rtc_base/logging.h"
30 
31 namespace dcsctp {
32 
RRSendQueue(absl::string_view log_prefix,DcSctpSocketCallbacks * callbacks,size_t buffer_size,size_t mtu,StreamPriority default_priority,size_t total_buffered_amount_low_threshold)33 RRSendQueue::RRSendQueue(absl::string_view log_prefix,
34                          DcSctpSocketCallbacks* callbacks,
35                          size_t buffer_size,
36                          size_t mtu,
37                          StreamPriority default_priority,
38                          size_t total_buffered_amount_low_threshold)
39     : log_prefix_(std::string(log_prefix) + "fcfs: "),
40       callbacks_(*callbacks),
41       buffer_size_(buffer_size),
42       default_priority_(default_priority),
43       scheduler_(mtu),
44       total_buffered_amount_(
45           [this]() { callbacks_.OnTotalBufferedAmountLow(); }) {
46   total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
47 }
48 
bytes_to_send_in_next_message() const49 size_t RRSendQueue::OutgoingStream::bytes_to_send_in_next_message() const {
50   if (pause_state_ == PauseState::kPaused ||
51       pause_state_ == PauseState::kResetting) {
52     // The stream has paused (and there is no partially sent message).
53     return 0;
54   }
55 
56   if (items_.empty()) {
57     return 0;
58   }
59 
60   return items_.front().remaining_size;
61 }
62 
AddHandoverState(DcSctpSocketHandoverState::OutgoingStream & state) const63 void RRSendQueue::OutgoingStream::AddHandoverState(
64     DcSctpSocketHandoverState::OutgoingStream& state) const {
65   state.next_ssn = next_ssn_.value();
66   state.next_ordered_mid = next_ordered_mid_.value();
67   state.next_unordered_mid = next_unordered_mid_.value();
68   state.priority = *scheduler_stream_->priority();
69 }
70 
IsConsistent() const71 bool RRSendQueue::IsConsistent() const {
72   std::set<StreamID> expected_active_streams;
73   std::set<StreamID> actual_active_streams =
74       scheduler_.ActiveStreamsForTesting();
75 
76   size_t total_buffered_amount = 0;
77   for (const auto& [stream_id, stream] : streams_) {
78     total_buffered_amount += stream.buffered_amount().value();
79     if (stream.bytes_to_send_in_next_message() > 0) {
80       expected_active_streams.emplace(stream_id);
81     }
82   }
83   if (expected_active_streams != actual_active_streams) {
84     auto fn = [&](rtc::StringBuilder& sb, const auto& p) { sb << *p; };
85     RTC_DLOG(LS_ERROR) << "Active streams mismatch, is=["
86                        << StrJoin(actual_active_streams, ",", fn)
87                        << "], expected=["
88                        << StrJoin(expected_active_streams, ",", fn) << "]";
89     return false;
90   }
91 
92   return total_buffered_amount == total_buffered_amount_.value();
93 }
94 
IsConsistent() const95 bool RRSendQueue::OutgoingStream::IsConsistent() const {
96   size_t bytes = 0;
97   for (const auto& item : items_) {
98     bytes += item.remaining_size;
99   }
100   return bytes == buffered_amount_.value();
101 }
102 
Decrease(size_t bytes)103 void RRSendQueue::ThresholdWatcher::Decrease(size_t bytes) {
104   RTC_DCHECK(bytes <= value_);
105   size_t old_value = value_;
106   value_ -= bytes;
107 
108   if (old_value > low_threshold_ && value_ <= low_threshold_) {
109     on_threshold_reached_();
110   }
111 }
112 
SetLowThreshold(size_t low_threshold)113 void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
114   // Betting on https://github.com/w3c/webrtc-pc/issues/2654 being accepted.
115   if (low_threshold_ < value_ && low_threshold >= value_) {
116     on_threshold_reached_();
117   }
118   low_threshold_ = low_threshold;
119 }
120 
Add(DcSctpMessage message,MessageAttributes attributes)121 void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
122                                       MessageAttributes attributes) {
123   bool was_active = bytes_to_send_in_next_message() > 0;
124   buffered_amount_.Increase(message.payload().size());
125   parent_.total_buffered_amount_.Increase(message.payload().size());
126   items_.emplace_back(std::move(message), std::move(attributes));
127 
128   if (!was_active) {
129     scheduler_stream_->MaybeMakeActive();
130   }
131 
132   RTC_DCHECK(IsConsistent());
133 }
134 
Produce(TimeMs now,size_t max_size)135 absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
136     TimeMs now,
137     size_t max_size) {
138   RTC_DCHECK(pause_state_ != PauseState::kPaused &&
139              pause_state_ != PauseState::kResetting);
140 
141   while (!items_.empty()) {
142     Item& item = items_.front();
143     DcSctpMessage& message = item.message;
144 
145     // Allocate Message ID and SSN when the first fragment is sent.
146     if (!item.message_id.has_value()) {
147       // Oops, this entire message has already expired. Try the next one.
148       if (item.attributes.expires_at <= now) {
149         HandleMessageExpired(item);
150         items_.pop_front();
151         continue;
152       }
153 
154       MID& mid =
155           item.attributes.unordered ? next_unordered_mid_ : next_ordered_mid_;
156       item.message_id = mid;
157       mid = MID(*mid + 1);
158     }
159     if (!item.attributes.unordered && !item.ssn.has_value()) {
160       item.ssn = next_ssn_;
161       next_ssn_ = SSN(*next_ssn_ + 1);
162     }
163 
164     // Grab the next `max_size` fragment from this message and calculate flags.
165     rtc::ArrayView<const uint8_t> chunk_payload =
166         item.message.payload().subview(item.remaining_offset, max_size);
167     rtc::ArrayView<const uint8_t> message_payload = message.payload();
168     Data::IsBeginning is_beginning(chunk_payload.data() ==
169                                    message_payload.data());
170     Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) ==
171                        (message_payload.data() + message_payload.size()));
172 
173     StreamID stream_id = message.stream_id();
174     PPID ppid = message.ppid();
175 
176     // Zero-copy the payload if the message fits in a single chunk.
177     std::vector<uint8_t> payload =
178         is_beginning && is_end
179             ? std::move(message).ReleasePayload()
180             : std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end());
181 
182     FSN fsn(item.current_fsn);
183     item.current_fsn = FSN(*item.current_fsn + 1);
184     buffered_amount_.Decrease(payload.size());
185     parent_.total_buffered_amount_.Decrease(payload.size());
186 
187     SendQueue::DataToSend chunk(Data(stream_id, item.ssn.value_or(SSN(0)),
188                                      item.message_id.value(), fsn, ppid,
189                                      std::move(payload), is_beginning, is_end,
190                                      item.attributes.unordered));
191     chunk.max_retransmissions = item.attributes.max_retransmissions;
192     chunk.expires_at = item.attributes.expires_at;
193     chunk.lifecycle_id =
194         is_end ? item.attributes.lifecycle_id : LifecycleId::NotSet();
195 
196     if (is_end) {
197       // The entire message has been sent, and its last data copied to `chunk`,
198       // so it can safely be discarded.
199       items_.pop_front();
200 
201       if (pause_state_ == PauseState::kPending) {
202         RTC_DLOG(LS_VERBOSE) << "Pause state on " << *stream_id
203                              << " is moving from pending to paused";
204         pause_state_ = PauseState::kPaused;
205       }
206     } else {
207       item.remaining_offset += chunk_payload.size();
208       item.remaining_size -= chunk_payload.size();
209       RTC_DCHECK(item.remaining_offset + item.remaining_size ==
210                  item.message.payload().size());
211       RTC_DCHECK(item.remaining_size > 0);
212     }
213     RTC_DCHECK(IsConsistent());
214     return chunk;
215   }
216   RTC_DCHECK(IsConsistent());
217   return absl::nullopt;
218 }
219 
HandleMessageExpired(OutgoingStream::Item & item)220 void RRSendQueue::OutgoingStream::HandleMessageExpired(
221     OutgoingStream::Item& item) {
222   buffered_amount_.Decrease(item.remaining_size);
223   parent_.total_buffered_amount_.Decrease(item.remaining_size);
224   if (item.attributes.lifecycle_id.IsSet()) {
225     RTC_DLOG(LS_VERBOSE) << "Triggering OnLifecycleMessageExpired("
226                          << item.attributes.lifecycle_id.value() << ", false)";
227 
228     parent_.callbacks_.OnLifecycleMessageExpired(item.attributes.lifecycle_id,
229                                                  /*maybe_delivered=*/false);
230     parent_.callbacks_.OnLifecycleEnd(item.attributes.lifecycle_id);
231   }
232 }
233 
Discard(IsUnordered unordered,MID message_id)234 bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
235                                           MID message_id) {
236   bool result = false;
237   if (!items_.empty()) {
238     Item& item = items_.front();
239     if (item.attributes.unordered == unordered && item.message_id.has_value() &&
240         *item.message_id == message_id) {
241       HandleMessageExpired(item);
242       items_.pop_front();
243 
244       // Only partially sent messages are discarded, so if a message was
245       // discarded, then it was the currently sent message.
246       scheduler_stream_->ForceReschedule();
247 
248       if (pause_state_ == PauseState::kPending) {
249         pause_state_ = PauseState::kPaused;
250         scheduler_stream_->MakeInactive();
251       } else if (bytes_to_send_in_next_message() == 0) {
252         scheduler_stream_->MakeInactive();
253       }
254 
255       // As the item still existed, it had unsent data.
256       result = true;
257     }
258   }
259   RTC_DCHECK(IsConsistent());
260   return result;
261 }
262 
Pause()263 void RRSendQueue::OutgoingStream::Pause() {
264   if (pause_state_ != PauseState::kNotPaused) {
265     // Already in progress.
266     return;
267   }
268 
269   bool had_pending_items = !items_.empty();
270 
271   // https://datatracker.ietf.org/doc/html/rfc8831#section-6.7
272   // "Closing of a data channel MUST be signaled by resetting the corresponding
273   // outgoing streams [RFC6525].  This means that if one side decides to close
274   // the data channel, it resets the corresponding outgoing stream."
275   // ... "[RFC6525] also guarantees that all the messages are delivered (or
276   // abandoned) before the stream is reset."
277 
278   // A stream is paused when it's about to be reset. In this implementation,
279   // it will throw away all non-partially send messages - they will be abandoned
280   // as noted above. This is subject to change. It will however not discard any
281   // partially sent messages - only whole messages. Partially delivered messages
282   // (at the time of receiving a Stream Reset command) will always deliver all
283   // the fragments before actually resetting the stream.
284   for (auto it = items_.begin(); it != items_.end();) {
285     if (it->remaining_offset == 0) {
286       HandleMessageExpired(*it);
287       it = items_.erase(it);
288     } else {
289       ++it;
290     }
291   }
292 
293   pause_state_ = (items_.empty() || items_.front().remaining_offset == 0)
294                      ? PauseState::kPaused
295                      : PauseState::kPending;
296 
297   if (had_pending_items && pause_state_ == PauseState::kPaused) {
298     RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
299                          << " was previously active, but is now paused.";
300     scheduler_stream_->MakeInactive();
301   }
302 
303   RTC_DCHECK(IsConsistent());
304 }
305 
Resume()306 void RRSendQueue::OutgoingStream::Resume() {
307   RTC_DCHECK(pause_state_ == PauseState::kResetting);
308   pause_state_ = PauseState::kNotPaused;
309   scheduler_stream_->MaybeMakeActive();
310   RTC_DCHECK(IsConsistent());
311 }
312 
Reset()313 void RRSendQueue::OutgoingStream::Reset() {
314   // This can be called both when an outgoing stream reset has been responded
315   // to, or when the entire SendQueue is reset due to detecting the peer having
316   // restarted. The stream may be in any state at this time.
317   PauseState old_pause_state = pause_state_;
318   pause_state_ = PauseState::kNotPaused;
319   next_ordered_mid_ = MID(0);
320   next_unordered_mid_ = MID(0);
321   next_ssn_ = SSN(0);
322   if (!items_.empty()) {
323     // If this message has been partially sent, reset it so that it will be
324     // re-sent.
325     auto& item = items_.front();
326     buffered_amount_.Increase(item.message.payload().size() -
327                               item.remaining_size);
328     parent_.total_buffered_amount_.Increase(item.message.payload().size() -
329                                             item.remaining_size);
330     item.remaining_offset = 0;
331     item.remaining_size = item.message.payload().size();
332     item.message_id = absl::nullopt;
333     item.ssn = absl::nullopt;
334     item.current_fsn = FSN(0);
335     if (old_pause_state == PauseState::kPaused ||
336         old_pause_state == PauseState::kResetting) {
337       scheduler_stream_->MaybeMakeActive();
338     }
339   }
340   RTC_DCHECK(IsConsistent());
341 }
342 
has_partially_sent_message() const343 bool RRSendQueue::OutgoingStream::has_partially_sent_message() const {
344   if (items_.empty()) {
345     return false;
346   }
347   return items_.front().message_id.has_value();
348 }
349 
Add(TimeMs now,DcSctpMessage message,const SendOptions & send_options)350 void RRSendQueue::Add(TimeMs now,
351                       DcSctpMessage message,
352                       const SendOptions& send_options) {
353   RTC_DCHECK(!message.payload().empty());
354   // Any limited lifetime should start counting from now - when the message
355   // has been added to the queue.
356 
357   // `expires_at` is the time when it expires. Which is slightly larger than the
358   // message's lifetime, as the message is alive during its entire lifetime
359   // (which may be zero).
360   MessageAttributes attributes = {
361       .unordered = send_options.unordered,
362       .max_retransmissions =
363           send_options.max_retransmissions.has_value()
364               ? MaxRetransmits(send_options.max_retransmissions.value())
365               : MaxRetransmits::NoLimit(),
366       .expires_at = send_options.lifetime.has_value()
367                         ? now + *send_options.lifetime + DurationMs(1)
368                         : TimeMs::InfiniteFuture(),
369       .lifecycle_id = send_options.lifecycle_id,
370   };
371   GetOrCreateStreamInfo(message.stream_id())
372       .Add(std::move(message), std::move(attributes));
373   RTC_DCHECK(IsConsistent());
374 }
375 
IsFull() const376 bool RRSendQueue::IsFull() const {
377   return total_buffered_amount() >= buffer_size_;
378 }
379 
IsEmpty() const380 bool RRSendQueue::IsEmpty() const {
381   return total_buffered_amount() == 0;
382 }
383 
Produce(TimeMs now,size_t max_size)384 absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
385                                                            size_t max_size) {
386   return scheduler_.Produce(now, max_size);
387 }
388 
Discard(IsUnordered unordered,StreamID stream_id,MID message_id)389 bool RRSendQueue::Discard(IsUnordered unordered,
390                           StreamID stream_id,
391                           MID message_id) {
392   bool has_discarded =
393       GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
394 
395   RTC_DCHECK(IsConsistent());
396   return has_discarded;
397 }
398 
PrepareResetStream(StreamID stream_id)399 void RRSendQueue::PrepareResetStream(StreamID stream_id) {
400   GetOrCreateStreamInfo(stream_id).Pause();
401   RTC_DCHECK(IsConsistent());
402 }
403 
HasStreamsReadyToBeReset() const404 bool RRSendQueue::HasStreamsReadyToBeReset() const {
405   for (auto& [unused, stream] : streams_) {
406     if (stream.IsReadyToBeReset()) {
407       return true;
408     }
409   }
410   return false;
411 }
GetStreamsReadyToBeReset()412 std::vector<StreamID> RRSendQueue::GetStreamsReadyToBeReset() {
413   RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) {
414                return p.second.IsResetting();
415              }) == 0);
416   std::vector<StreamID> ready;
417   for (auto& [stream_id, stream] : streams_) {
418     if (stream.IsReadyToBeReset()) {
419       stream.SetAsResetting();
420       ready.push_back(stream_id);
421     }
422   }
423   return ready;
424 }
425 
CommitResetStreams()426 void RRSendQueue::CommitResetStreams() {
427   RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) {
428                return p.second.IsResetting();
429              }) > 0);
430   for (auto& [unused, stream] : streams_) {
431     if (stream.IsResetting()) {
432       stream.Reset();
433     }
434   }
435   RTC_DCHECK(IsConsistent());
436 }
437 
RollbackResetStreams()438 void RRSendQueue::RollbackResetStreams() {
439   RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) {
440                return p.second.IsResetting();
441              }) > 0);
442   for (auto& [unused, stream] : streams_) {
443     if (stream.IsResetting()) {
444       stream.Resume();
445     }
446   }
447   RTC_DCHECK(IsConsistent());
448 }
449 
Reset()450 void RRSendQueue::Reset() {
451   // Recalculate buffered amount, as partially sent messages may have been put
452   // fully back in the queue.
453   for (auto& [unused, stream] : streams_) {
454     stream.Reset();
455   }
456   scheduler_.ForceReschedule();
457 }
458 
buffered_amount(StreamID stream_id) const459 size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
460   auto it = streams_.find(stream_id);
461   if (it == streams_.end()) {
462     return 0;
463   }
464   return it->second.buffered_amount().value();
465 }
466 
buffered_amount_low_threshold(StreamID stream_id) const467 size_t RRSendQueue::buffered_amount_low_threshold(StreamID stream_id) const {
468   auto it = streams_.find(stream_id);
469   if (it == streams_.end()) {
470     return 0;
471   }
472   return it->second.buffered_amount().low_threshold();
473 }
474 
SetBufferedAmountLowThreshold(StreamID stream_id,size_t bytes)475 void RRSendQueue::SetBufferedAmountLowThreshold(StreamID stream_id,
476                                                 size_t bytes) {
477   GetOrCreateStreamInfo(stream_id).buffered_amount().SetLowThreshold(bytes);
478 }
479 
GetOrCreateStreamInfo(StreamID stream_id)480 RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
481     StreamID stream_id) {
482   auto it = streams_.find(stream_id);
483   if (it != streams_.end()) {
484     return it->second;
485   }
486 
487   return streams_
488       .emplace(
489           std::piecewise_construct, std::forward_as_tuple(stream_id),
490           std::forward_as_tuple(this, &scheduler_, stream_id, default_priority_,
491                                 [this, stream_id]() {
492                                   callbacks_.OnBufferedAmountLow(stream_id);
493                                 }))
494       .first->second;
495 }
496 
SetStreamPriority(StreamID stream_id,StreamPriority priority)497 void RRSendQueue::SetStreamPriority(StreamID stream_id,
498                                     StreamPriority priority) {
499   OutgoingStream& stream = GetOrCreateStreamInfo(stream_id);
500 
501   stream.SetPriority(priority);
502   RTC_DCHECK(IsConsistent());
503 }
504 
GetStreamPriority(StreamID stream_id) const505 StreamPriority RRSendQueue::GetStreamPriority(StreamID stream_id) const {
506   auto stream_it = streams_.find(stream_id);
507   if (stream_it == streams_.end()) {
508     return default_priority_;
509   }
510   return stream_it->second.priority();
511 }
512 
GetHandoverReadiness() const513 HandoverReadinessStatus RRSendQueue::GetHandoverReadiness() const {
514   HandoverReadinessStatus status;
515   if (!IsEmpty()) {
516     status.Add(HandoverUnreadinessReason::kSendQueueNotEmpty);
517   }
518   return status;
519 }
520 
AddHandoverState(DcSctpSocketHandoverState & state)521 void RRSendQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
522   for (const auto& [stream_id, stream] : streams_) {
523     DcSctpSocketHandoverState::OutgoingStream state_stream;
524     state_stream.id = stream_id.value();
525     stream.AddHandoverState(state_stream);
526     state.tx.streams.push_back(std::move(state_stream));
527   }
528 }
529 
RestoreFromState(const DcSctpSocketHandoverState & state)530 void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
531   for (const DcSctpSocketHandoverState::OutgoingStream& state_stream :
532        state.tx.streams) {
533     StreamID stream_id(state_stream.id);
534     streams_.emplace(
535         std::piecewise_construct, std::forward_as_tuple(stream_id),
536         std::forward_as_tuple(
537             this, &scheduler_, stream_id, StreamPriority(state_stream.priority),
538             [this, stream_id]() { callbacks_.OnBufferedAmountLow(stream_id); },
539             &state_stream));
540   }
541 }
542 }  // namespace dcsctp
543