1 /* 2 * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 #ifndef NET_DCSCTP_TX_STREAM_SCHEDULER_H_ 11 #define NET_DCSCTP_TX_STREAM_SCHEDULER_H_ 12 13 #include <algorithm> 14 #include <cstdint> 15 #include <deque> 16 #include <map> 17 #include <memory> 18 #include <queue> 19 #include <set> 20 #include <string> 21 #include <utility> 22 23 #include "absl/algorithm/container.h" 24 #include "absl/memory/memory.h" 25 #include "absl/strings/string_view.h" 26 #include "absl/types/optional.h" 27 #include "api/array_view.h" 28 #include "net/dcsctp/packet/chunk/idata_chunk.h" 29 #include "net/dcsctp/packet/sctp_packet.h" 30 #include "net/dcsctp/public/dcsctp_message.h" 31 #include "net/dcsctp/public/dcsctp_socket.h" 32 #include "net/dcsctp/public/types.h" 33 #include "net/dcsctp/tx/send_queue.h" 34 #include "rtc_base/containers/flat_set.h" 35 #include "rtc_base/strong_alias.h" 36 37 namespace dcsctp { 38 39 // A parameterized stream scheduler. Currently, it implements the round robin 40 // scheduling algorithm using virtual finish time. It is to be used as a part of 41 // a send queue and will track all active streams (streams that have any data 42 // that can be sent). 43 // 44 // The stream scheduler works with the concept of associating active streams 45 // with a "virtual finish time", which is the time when a stream is allowed to 46 // produce data. Streams are ordered by their virtual finish time, and the 47 // "current virtual time" will advance to the next following virtual finish time 48 // whenever a chunk is to be produced. 49 // 50 // When message interleaving is enabled, the WFQ - Weighted Fair Queueing - 51 // scheduling algorithm will be used. And when it's not, round-robin scheduling 52 // will be used instead. 53 // 54 // In the round robin scheduling algorithm, a stream's virtual finish time will 55 // just increment by one (1) after having produced a chunk, which results in a 56 // round-robin scheduling. 57 // 58 // In WFQ scheduling algorithm, a stream's virtual finish time will be defined 59 // as the number of bytes in the next fragment to be sent, multiplied by the 60 // inverse of the stream's priority, meaning that a high priority - or a smaller 61 // fragment - results in a closer virtual finish time, compared to a stream with 62 // either a lower priority or a larger fragment to be sent. 63 class StreamScheduler { 64 private: 65 class VirtualTime : public webrtc::StrongAlias<class VirtualTimeTag, double> { 66 public: VirtualTime(const UnderlyingType & v)67 constexpr explicit VirtualTime(const UnderlyingType& v) 68 : webrtc::StrongAlias<class VirtualTimeTag, double>(v) {} 69 Zero()70 static constexpr VirtualTime Zero() { return VirtualTime(0); } 71 }; 72 class InverseWeight 73 : public webrtc::StrongAlias<class InverseWeightTag, double> { 74 public: InverseWeight(StreamPriority priority)75 constexpr explicit InverseWeight(StreamPriority priority) 76 : webrtc::StrongAlias<class InverseWeightTag, double>( 77 1.0 / std::max(static_cast<double>(*priority), 0.000001)) {} 78 }; 79 80 public: 81 class StreamProducer { 82 public: 83 virtual ~StreamProducer() = default; 84 85 // Produces a fragment of data to send. The current wall time is specified 86 // as `now` and should be used to skip chunks with expired limited lifetime. 87 // The parameter `max_size` specifies the maximum amount of actual payload 88 // that may be returned. If these constraints prevents the stream from 89 // sending some data, `absl::nullopt` should be returned. 90 virtual absl::optional<SendQueue::DataToSend> Produce(TimeMs now, 91 size_t max_size) = 0; 92 93 // Returns the number of payload bytes that is scheduled to be sent in the 94 // next enqueued message, or zero if there are no enqueued messages or if 95 // the stream has been actively paused. 96 virtual size_t bytes_to_send_in_next_message() const = 0; 97 }; 98 99 class Stream { 100 public: stream_id()101 StreamID stream_id() const { return stream_id_; } 102 priority()103 StreamPriority priority() const { return priority_; } 104 void SetPriority(StreamPriority priority); 105 106 // Will activate the stream _if_ it has any data to send. That is, if the 107 // callback to `bytes_to_send_in_next_message` returns non-zero. If the 108 // callback returns zero, the stream will not be made active. 109 void MaybeMakeActive(); 110 111 // Will remove the stream from the list of active streams, and will not try 112 // to produce data from it. To make it active again, call `MaybeMakeActive`. 113 void MakeInactive(); 114 115 // Make the scheduler move to another message, or another stream. This is 116 // used to abort the scheduler from continuing producing fragments for the 117 // current message in case it's deleted. ForceReschedule()118 void ForceReschedule() { parent_.ForceReschedule(); } 119 120 private: 121 friend class StreamScheduler; 122 Stream(StreamScheduler * parent,StreamProducer * producer,StreamID stream_id,StreamPriority priority)123 Stream(StreamScheduler* parent, 124 StreamProducer* producer, 125 StreamID stream_id, 126 StreamPriority priority) 127 : parent_(*parent), 128 producer_(*producer), 129 stream_id_(stream_id), 130 priority_(priority), 131 inverse_weight_(priority) {} 132 133 // Produces a message from this stream. This will only be called on streams 134 // that have data. 135 absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size); 136 137 void MakeActive(size_t bytes_to_send_next); 138 void ForceMarkInactive(); 139 current_time()140 VirtualTime current_time() const { return current_virtual_time_; } next_finish_time()141 VirtualTime next_finish_time() const { return next_finish_time_; } bytes_to_send_in_next_message()142 size_t bytes_to_send_in_next_message() const { 143 return producer_.bytes_to_send_in_next_message(); 144 } 145 146 VirtualTime CalculateFinishTime(size_t bytes_to_send_next) const; 147 148 StreamScheduler& parent_; 149 StreamProducer& producer_; 150 const StreamID stream_id_; 151 StreamPriority priority_; 152 InverseWeight inverse_weight_; 153 // This outgoing stream's "current" virtual_time. 154 VirtualTime current_virtual_time_ = VirtualTime::Zero(); 155 VirtualTime next_finish_time_ = VirtualTime::Zero(); 156 }; 157 158 // The `mtu` parameter represents the maximum SCTP packet size, which should 159 // be the same as `DcSctpOptions::mtu`. StreamScheduler(size_t mtu)160 explicit StreamScheduler(size_t mtu) 161 : max_payload_bytes_(mtu - SctpPacket::kHeaderSize - 162 IDataChunk::kHeaderSize) {} 163 CreateStream(StreamProducer * producer,StreamID stream_id,StreamPriority priority)164 std::unique_ptr<Stream> CreateStream(StreamProducer* producer, 165 StreamID stream_id, 166 StreamPriority priority) { 167 return absl::WrapUnique(new Stream(this, producer, stream_id, priority)); 168 } 169 EnableMessageInterleaving(bool enabled)170 void EnableMessageInterleaving(bool enabled) { 171 enable_message_interleaving_ = enabled; 172 } 173 174 // Makes the scheduler stop producing message from the current stream and 175 // re-evaluates which stream to produce from. ForceReschedule()176 void ForceReschedule() { currently_sending_a_message_ = false; } 177 178 // Produces a fragment of data to send. The current wall time is specified as 179 // `now` and will be used to skip chunks with expired limited lifetime. The 180 // parameter `max_size` specifies the maximum amount of actual payload that 181 // may be returned. If no data can be produced, `absl::nullopt` is returned. 182 absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size); 183 184 std::set<StreamID> ActiveStreamsForTesting() const; 185 186 private: 187 struct ActiveStreamComparator { 188 // Ordered by virtual finish time (primary), stream-id (secondary). operatorActiveStreamComparator189 bool operator()(Stream* a, Stream* b) const { 190 VirtualTime a_vft = a->next_finish_time(); 191 VirtualTime b_vft = b->next_finish_time(); 192 if (a_vft == b_vft) { 193 return a->stream_id() < b->stream_id(); 194 } 195 return a_vft < b_vft; 196 } 197 }; 198 199 bool IsConsistent() const; 200 201 const size_t max_payload_bytes_; 202 203 // The current virtual time, as defined in the WFQ algorithm. 204 VirtualTime virtual_time_ = VirtualTime::Zero(); 205 206 // The current stream to send chunks from. 207 Stream* current_stream_ = nullptr; 208 209 bool enable_message_interleaving_ = false; 210 211 // Indicates if the streams is currently sending a message, and should then 212 // - if message interleaving is not enabled - continue sending from this 213 // stream until that message has been sent in full. 214 bool currently_sending_a_message_ = false; 215 216 // The currently active streams, ordered by virtual finish time. 217 webrtc::flat_set<Stream*, ActiveStreamComparator> active_streams_; 218 }; 219 220 } // namespace dcsctp 221 222 #endif // NET_DCSCTP_TX_STREAM_SCHEDULER_H_ 223