xref: /aosp_15_r20/external/webrtc/net/dcsctp/tx/stream_scheduler.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "net/dcsctp/tx/stream_scheduler.h"
11 
12 #include <algorithm>
13 
14 #include "absl/algorithm/container.h"
15 #include "absl/types/optional.h"
16 #include "api/array_view.h"
17 #include "net/dcsctp/common/str_join.h"
18 #include "net/dcsctp/packet/data.h"
19 #include "net/dcsctp/public/dcsctp_message.h"
20 #include "net/dcsctp/public/dcsctp_socket.h"
21 #include "net/dcsctp/public/types.h"
22 #include "net/dcsctp/tx/send_queue.h"
23 #include "rtc_base/checks.h"
24 #include "rtc_base/logging.h"
25 
26 namespace dcsctp {
27 
SetPriority(StreamPriority priority)28 void StreamScheduler::Stream::SetPriority(StreamPriority priority) {
29   priority_ = priority;
30   inverse_weight_ = InverseWeight(priority);
31 }
32 
Produce(TimeMs now,size_t max_size)33 absl::optional<SendQueue::DataToSend> StreamScheduler::Produce(
34     TimeMs now,
35     size_t max_size) {
36   // For non-interleaved streams, avoid rescheduling while still sending a
37   // message as it needs to be sent in full. For interleaved messaging,
38   // reschedule for every I-DATA chunk sent.
39   bool rescheduling =
40       enable_message_interleaving_ || !currently_sending_a_message_;
41 
42   RTC_LOG(LS_VERBOSE) << "Producing data, rescheduling=" << rescheduling
43                       << ", active="
44                       << StrJoin(active_streams_, ", ",
45                                  [&](rtc::StringBuilder& sb, const auto& p) {
46                                    sb << *p->stream_id() << "@"
47                                       << *p->next_finish_time();
48                                  });
49 
50   RTC_DCHECK(rescheduling || current_stream_ != nullptr);
51 
52   absl::optional<SendQueue::DataToSend> data;
53   while (!data.has_value() && !active_streams_.empty()) {
54     if (rescheduling) {
55       auto it = active_streams_.begin();
56       current_stream_ = *it;
57       RTC_DLOG(LS_VERBOSE) << "Rescheduling to stream "
58                            << *current_stream_->stream_id();
59 
60       active_streams_.erase(it);
61       current_stream_->ForceMarkInactive();
62     } else {
63       RTC_DLOG(LS_VERBOSE) << "Producing from previous stream: "
64                            << *current_stream_->stream_id();
65       RTC_DCHECK(absl::c_any_of(active_streams_, [this](const auto* p) {
66         return p == current_stream_;
67       }));
68     }
69 
70     data = current_stream_->Produce(now, max_size);
71   }
72 
73   if (!data.has_value()) {
74     RTC_DLOG(LS_VERBOSE)
75         << "There is no stream with data; Can't produce any data.";
76     RTC_DCHECK(IsConsistent());
77 
78     return absl::nullopt;
79   }
80 
81   RTC_DCHECK(data->data.stream_id == current_stream_->stream_id());
82 
83   RTC_DLOG(LS_VERBOSE) << "Producing DATA, type="
84                        << (data->data.is_unordered ? "unordered" : "ordered")
85                        << "::"
86                        << (*data->data.is_beginning && *data->data.is_end
87                                ? "complete"
88                            : *data->data.is_beginning ? "first"
89                            : *data->data.is_end       ? "last"
90                                                       : "middle")
91                        << ", stream_id=" << *current_stream_->stream_id()
92                        << ", ppid=" << *data->data.ppid
93                        << ", length=" << data->data.payload.size();
94 
95   currently_sending_a_message_ = !*data->data.is_end;
96   virtual_time_ = current_stream_->current_time();
97 
98   // One side-effect of rescheduling is that the new stream will not be present
99   // in `active_streams`.
100   size_t bytes_to_send_next = current_stream_->bytes_to_send_in_next_message();
101   if (rescheduling && bytes_to_send_next > 0) {
102     current_stream_->MakeActive(bytes_to_send_next);
103   } else if (!rescheduling && bytes_to_send_next == 0) {
104     current_stream_->MakeInactive();
105   }
106 
107   RTC_DCHECK(IsConsistent());
108   return data;
109 }
110 
CalculateFinishTime(size_t bytes_to_send_next) const111 StreamScheduler::VirtualTime StreamScheduler::Stream::CalculateFinishTime(
112     size_t bytes_to_send_next) const {
113   if (parent_.enable_message_interleaving_) {
114     // Perform weighted fair queuing scheduling.
115     return VirtualTime(*current_virtual_time_ +
116                        bytes_to_send_next * *inverse_weight_);
117   }
118 
119   // Perform round-robin scheduling by letting the stream have its next virtual
120   // finish time in the future. It doesn't matter how far into the future, just
121   // any positive number so that any other stream that has the same virtual
122   // finish time as this stream gets to produce their data before revisiting
123   // this stream.
124   return VirtualTime(*current_virtual_time_ + 1);
125 }
126 
Produce(TimeMs now,size_t max_size)127 absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
128     TimeMs now,
129     size_t max_size) {
130   absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
131 
132   if (data.has_value()) {
133     VirtualTime new_current = CalculateFinishTime(data->data.payload.size());
134     RTC_DLOG(LS_VERBOSE) << "Virtual time changed: " << *current_virtual_time_
135                          << " -> " << *new_current;
136     current_virtual_time_ = new_current;
137   }
138 
139   return data;
140 }
141 
IsConsistent() const142 bool StreamScheduler::IsConsistent() const {
143   for (Stream* stream : active_streams_) {
144     if (stream->next_finish_time_ == VirtualTime::Zero()) {
145       RTC_DLOG(LS_VERBOSE) << "Stream " << *stream->stream_id()
146                            << " is active, but has no next-finish-time";
147       return false;
148     }
149   }
150   return true;
151 }
152 
MaybeMakeActive()153 void StreamScheduler::Stream::MaybeMakeActive() {
154   RTC_DLOG(LS_VERBOSE) << "MaybeMakeActive(" << *stream_id() << ")";
155   RTC_DCHECK(next_finish_time_ == VirtualTime::Zero());
156   size_t bytes_to_send_next = bytes_to_send_in_next_message();
157   if (bytes_to_send_next == 0) {
158     return;
159   }
160 
161   MakeActive(bytes_to_send_next);
162 }
163 
MakeActive(size_t bytes_to_send_next)164 void StreamScheduler::Stream::MakeActive(size_t bytes_to_send_next) {
165   current_virtual_time_ = parent_.virtual_time_;
166   RTC_DCHECK_GT(bytes_to_send_next, 0);
167   VirtualTime next_finish_time = CalculateFinishTime(
168       std::min(bytes_to_send_next, parent_.max_payload_bytes_));
169   RTC_DCHECK_GT(*next_finish_time, 0);
170   RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id()
171                        << " active, expiring at " << *next_finish_time;
172   RTC_DCHECK(next_finish_time_ == VirtualTime::Zero());
173   next_finish_time_ = next_finish_time;
174   RTC_DCHECK(!absl::c_any_of(parent_.active_streams_,
175                              [this](const auto* p) { return p == this; }));
176   parent_.active_streams_.emplace(this);
177 }
178 
ForceMarkInactive()179 void StreamScheduler::Stream::ForceMarkInactive() {
180   RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id() << " inactive";
181   RTC_DCHECK(next_finish_time_ != VirtualTime::Zero());
182   next_finish_time_ = VirtualTime::Zero();
183 }
184 
MakeInactive()185 void StreamScheduler::Stream::MakeInactive() {
186   ForceMarkInactive();
187   webrtc::EraseIf(parent_.active_streams_,
188                   [&](const auto* s) { return s == this; });
189 }
190 
ActiveStreamsForTesting() const191 std::set<StreamID> StreamScheduler::ActiveStreamsForTesting() const {
192   std::set<StreamID> stream_ids;
193   for (const auto& stream : active_streams_) {
194     stream_ids.insert(stream->stream_id());
195   }
196   return stream_ids;
197 }
198 
199 }  // namespace dcsctp
200