1 /*
2 * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include "net/dcsctp/tx/stream_scheduler.h"
11
12 #include <algorithm>
13
14 #include "absl/algorithm/container.h"
15 #include "absl/types/optional.h"
16 #include "api/array_view.h"
17 #include "net/dcsctp/common/str_join.h"
18 #include "net/dcsctp/packet/data.h"
19 #include "net/dcsctp/public/dcsctp_message.h"
20 #include "net/dcsctp/public/dcsctp_socket.h"
21 #include "net/dcsctp/public/types.h"
22 #include "net/dcsctp/tx/send_queue.h"
23 #include "rtc_base/checks.h"
24 #include "rtc_base/logging.h"
25
26 namespace dcsctp {
27
SetPriority(StreamPriority priority)28 void StreamScheduler::Stream::SetPriority(StreamPriority priority) {
29 priority_ = priority;
30 inverse_weight_ = InverseWeight(priority);
31 }
32
Produce(TimeMs now,size_t max_size)33 absl::optional<SendQueue::DataToSend> StreamScheduler::Produce(
34 TimeMs now,
35 size_t max_size) {
36 // For non-interleaved streams, avoid rescheduling while still sending a
37 // message as it needs to be sent in full. For interleaved messaging,
38 // reschedule for every I-DATA chunk sent.
39 bool rescheduling =
40 enable_message_interleaving_ || !currently_sending_a_message_;
41
42 RTC_LOG(LS_VERBOSE) << "Producing data, rescheduling=" << rescheduling
43 << ", active="
44 << StrJoin(active_streams_, ", ",
45 [&](rtc::StringBuilder& sb, const auto& p) {
46 sb << *p->stream_id() << "@"
47 << *p->next_finish_time();
48 });
49
50 RTC_DCHECK(rescheduling || current_stream_ != nullptr);
51
52 absl::optional<SendQueue::DataToSend> data;
53 while (!data.has_value() && !active_streams_.empty()) {
54 if (rescheduling) {
55 auto it = active_streams_.begin();
56 current_stream_ = *it;
57 RTC_DLOG(LS_VERBOSE) << "Rescheduling to stream "
58 << *current_stream_->stream_id();
59
60 active_streams_.erase(it);
61 current_stream_->ForceMarkInactive();
62 } else {
63 RTC_DLOG(LS_VERBOSE) << "Producing from previous stream: "
64 << *current_stream_->stream_id();
65 RTC_DCHECK(absl::c_any_of(active_streams_, [this](const auto* p) {
66 return p == current_stream_;
67 }));
68 }
69
70 data = current_stream_->Produce(now, max_size);
71 }
72
73 if (!data.has_value()) {
74 RTC_DLOG(LS_VERBOSE)
75 << "There is no stream with data; Can't produce any data.";
76 RTC_DCHECK(IsConsistent());
77
78 return absl::nullopt;
79 }
80
81 RTC_DCHECK(data->data.stream_id == current_stream_->stream_id());
82
83 RTC_DLOG(LS_VERBOSE) << "Producing DATA, type="
84 << (data->data.is_unordered ? "unordered" : "ordered")
85 << "::"
86 << (*data->data.is_beginning && *data->data.is_end
87 ? "complete"
88 : *data->data.is_beginning ? "first"
89 : *data->data.is_end ? "last"
90 : "middle")
91 << ", stream_id=" << *current_stream_->stream_id()
92 << ", ppid=" << *data->data.ppid
93 << ", length=" << data->data.payload.size();
94
95 currently_sending_a_message_ = !*data->data.is_end;
96 virtual_time_ = current_stream_->current_time();
97
98 // One side-effect of rescheduling is that the new stream will not be present
99 // in `active_streams`.
100 size_t bytes_to_send_next = current_stream_->bytes_to_send_in_next_message();
101 if (rescheduling && bytes_to_send_next > 0) {
102 current_stream_->MakeActive(bytes_to_send_next);
103 } else if (!rescheduling && bytes_to_send_next == 0) {
104 current_stream_->MakeInactive();
105 }
106
107 RTC_DCHECK(IsConsistent());
108 return data;
109 }
110
CalculateFinishTime(size_t bytes_to_send_next) const111 StreamScheduler::VirtualTime StreamScheduler::Stream::CalculateFinishTime(
112 size_t bytes_to_send_next) const {
113 if (parent_.enable_message_interleaving_) {
114 // Perform weighted fair queuing scheduling.
115 return VirtualTime(*current_virtual_time_ +
116 bytes_to_send_next * *inverse_weight_);
117 }
118
119 // Perform round-robin scheduling by letting the stream have its next virtual
120 // finish time in the future. It doesn't matter how far into the future, just
121 // any positive number so that any other stream that has the same virtual
122 // finish time as this stream gets to produce their data before revisiting
123 // this stream.
124 return VirtualTime(*current_virtual_time_ + 1);
125 }
126
Produce(TimeMs now,size_t max_size)127 absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
128 TimeMs now,
129 size_t max_size) {
130 absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
131
132 if (data.has_value()) {
133 VirtualTime new_current = CalculateFinishTime(data->data.payload.size());
134 RTC_DLOG(LS_VERBOSE) << "Virtual time changed: " << *current_virtual_time_
135 << " -> " << *new_current;
136 current_virtual_time_ = new_current;
137 }
138
139 return data;
140 }
141
IsConsistent() const142 bool StreamScheduler::IsConsistent() const {
143 for (Stream* stream : active_streams_) {
144 if (stream->next_finish_time_ == VirtualTime::Zero()) {
145 RTC_DLOG(LS_VERBOSE) << "Stream " << *stream->stream_id()
146 << " is active, but has no next-finish-time";
147 return false;
148 }
149 }
150 return true;
151 }
152
MaybeMakeActive()153 void StreamScheduler::Stream::MaybeMakeActive() {
154 RTC_DLOG(LS_VERBOSE) << "MaybeMakeActive(" << *stream_id() << ")";
155 RTC_DCHECK(next_finish_time_ == VirtualTime::Zero());
156 size_t bytes_to_send_next = bytes_to_send_in_next_message();
157 if (bytes_to_send_next == 0) {
158 return;
159 }
160
161 MakeActive(bytes_to_send_next);
162 }
163
MakeActive(size_t bytes_to_send_next)164 void StreamScheduler::Stream::MakeActive(size_t bytes_to_send_next) {
165 current_virtual_time_ = parent_.virtual_time_;
166 RTC_DCHECK_GT(bytes_to_send_next, 0);
167 VirtualTime next_finish_time = CalculateFinishTime(
168 std::min(bytes_to_send_next, parent_.max_payload_bytes_));
169 RTC_DCHECK_GT(*next_finish_time, 0);
170 RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id()
171 << " active, expiring at " << *next_finish_time;
172 RTC_DCHECK(next_finish_time_ == VirtualTime::Zero());
173 next_finish_time_ = next_finish_time;
174 RTC_DCHECK(!absl::c_any_of(parent_.active_streams_,
175 [this](const auto* p) { return p == this; }));
176 parent_.active_streams_.emplace(this);
177 }
178
ForceMarkInactive()179 void StreamScheduler::Stream::ForceMarkInactive() {
180 RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id() << " inactive";
181 RTC_DCHECK(next_finish_time_ != VirtualTime::Zero());
182 next_finish_time_ = VirtualTime::Zero();
183 }
184
MakeInactive()185 void StreamScheduler::Stream::MakeInactive() {
186 ForceMarkInactive();
187 webrtc::EraseIf(parent_.active_streams_,
188 [&](const auto* s) { return s == this; });
189 }
190
ActiveStreamsForTesting() const191 std::set<StreamID> StreamScheduler::ActiveStreamsForTesting() const {
192 std::set<StreamID> stream_ids;
193 for (const auto& stream : active_streams_) {
194 stream_ids.insert(stream->stream_id());
195 }
196 return stream_ids;
197 }
198
199 } // namespace dcsctp
200