xref: /aosp_15_r20/external/webrtc/net/dcsctp/tx/stream_scheduler.h (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #ifndef NET_DCSCTP_TX_STREAM_SCHEDULER_H_
11 #define NET_DCSCTP_TX_STREAM_SCHEDULER_H_
12 
13 #include <algorithm>
14 #include <cstdint>
15 #include <deque>
16 #include <map>
17 #include <memory>
18 #include <queue>
19 #include <set>
20 #include <string>
21 #include <utility>
22 
23 #include "absl/algorithm/container.h"
24 #include "absl/memory/memory.h"
25 #include "absl/strings/string_view.h"
26 #include "absl/types/optional.h"
27 #include "api/array_view.h"
28 #include "net/dcsctp/packet/chunk/idata_chunk.h"
29 #include "net/dcsctp/packet/sctp_packet.h"
30 #include "net/dcsctp/public/dcsctp_message.h"
31 #include "net/dcsctp/public/dcsctp_socket.h"
32 #include "net/dcsctp/public/types.h"
33 #include "net/dcsctp/tx/send_queue.h"
34 #include "rtc_base/containers/flat_set.h"
35 #include "rtc_base/strong_alias.h"
36 
37 namespace dcsctp {
38 
39 // A parameterized stream scheduler. Currently, it implements the round robin
40 // scheduling algorithm using virtual finish time. It is to be used as a part of
41 // a send queue and will track all active streams (streams that have any data
42 // that can be sent).
43 //
44 // The stream scheduler works with the concept of associating active streams
45 // with a "virtual finish time", which is the time when a stream is allowed to
46 // produce data. Streams are ordered by their virtual finish time, and the
47 // "current virtual time" will advance to the next following virtual finish time
48 // whenever a chunk is to be produced.
49 //
50 // When message interleaving is enabled, the WFQ - Weighted Fair Queueing -
51 // scheduling algorithm will be used. And when it's not, round-robin scheduling
52 // will be used instead.
53 //
54 // In the round robin scheduling algorithm, a stream's virtual finish time will
55 // just increment by one (1) after having produced a chunk, which results in a
56 // round-robin scheduling.
57 //
58 // In WFQ scheduling algorithm, a stream's virtual finish time will be defined
59 // as the number of bytes in the next fragment to be sent, multiplied by the
60 // inverse of the stream's priority, meaning that a high priority - or a smaller
61 // fragment - results in a closer virtual finish time, compared to a stream with
62 // either a lower priority or a larger fragment to be sent.
63 class StreamScheduler {
64  private:
65   class VirtualTime : public webrtc::StrongAlias<class VirtualTimeTag, double> {
66    public:
VirtualTime(const UnderlyingType & v)67     constexpr explicit VirtualTime(const UnderlyingType& v)
68         : webrtc::StrongAlias<class VirtualTimeTag, double>(v) {}
69 
Zero()70     static constexpr VirtualTime Zero() { return VirtualTime(0); }
71   };
72   class InverseWeight
73       : public webrtc::StrongAlias<class InverseWeightTag, double> {
74    public:
InverseWeight(StreamPriority priority)75     constexpr explicit InverseWeight(StreamPriority priority)
76         : webrtc::StrongAlias<class InverseWeightTag, double>(
77               1.0 / std::max(static_cast<double>(*priority), 0.000001)) {}
78   };
79 
80  public:
81   class StreamProducer {
82    public:
83     virtual ~StreamProducer() = default;
84 
85     // Produces a fragment of data to send. The current wall time is specified
86     // as `now` and should be used to skip chunks with expired limited lifetime.
87     // The parameter `max_size` specifies the maximum amount of actual payload
88     // that may be returned. If these constraints prevents the stream from
89     // sending some data, `absl::nullopt` should be returned.
90     virtual absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
91                                                           size_t max_size) = 0;
92 
93     // Returns the number of payload bytes that is scheduled to be sent in the
94     // next enqueued message, or zero if there are no enqueued messages or if
95     // the stream has been actively paused.
96     virtual size_t bytes_to_send_in_next_message() const = 0;
97   };
98 
99   class Stream {
100    public:
stream_id()101     StreamID stream_id() const { return stream_id_; }
102 
priority()103     StreamPriority priority() const { return priority_; }
104     void SetPriority(StreamPriority priority);
105 
106     // Will activate the stream _if_ it has any data to send. That is, if the
107     // callback to `bytes_to_send_in_next_message` returns non-zero. If the
108     // callback returns zero, the stream will not be made active.
109     void MaybeMakeActive();
110 
111     // Will remove the stream from the list of active streams, and will not try
112     // to produce data from it. To make it active again, call `MaybeMakeActive`.
113     void MakeInactive();
114 
115     // Make the scheduler move to another message, or another stream. This is
116     // used to abort the scheduler from continuing producing fragments for the
117     // current message in case it's deleted.
ForceReschedule()118     void ForceReschedule() { parent_.ForceReschedule(); }
119 
120    private:
121     friend class StreamScheduler;
122 
Stream(StreamScheduler * parent,StreamProducer * producer,StreamID stream_id,StreamPriority priority)123     Stream(StreamScheduler* parent,
124            StreamProducer* producer,
125            StreamID stream_id,
126            StreamPriority priority)
127         : parent_(*parent),
128           producer_(*producer),
129           stream_id_(stream_id),
130           priority_(priority),
131           inverse_weight_(priority) {}
132 
133     // Produces a message from this stream. This will only be called on streams
134     // that have data.
135     absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size);
136 
137     void MakeActive(size_t bytes_to_send_next);
138     void ForceMarkInactive();
139 
current_time()140     VirtualTime current_time() const { return current_virtual_time_; }
next_finish_time()141     VirtualTime next_finish_time() const { return next_finish_time_; }
bytes_to_send_in_next_message()142     size_t bytes_to_send_in_next_message() const {
143       return producer_.bytes_to_send_in_next_message();
144     }
145 
146     VirtualTime CalculateFinishTime(size_t bytes_to_send_next) const;
147 
148     StreamScheduler& parent_;
149     StreamProducer& producer_;
150     const StreamID stream_id_;
151     StreamPriority priority_;
152     InverseWeight inverse_weight_;
153     // This outgoing stream's "current" virtual_time.
154     VirtualTime current_virtual_time_ = VirtualTime::Zero();
155     VirtualTime next_finish_time_ = VirtualTime::Zero();
156   };
157 
158   // The `mtu` parameter represents the maximum SCTP packet size, which should
159   // be the same as `DcSctpOptions::mtu`.
StreamScheduler(size_t mtu)160   explicit StreamScheduler(size_t mtu)
161       : max_payload_bytes_(mtu - SctpPacket::kHeaderSize -
162                            IDataChunk::kHeaderSize) {}
163 
CreateStream(StreamProducer * producer,StreamID stream_id,StreamPriority priority)164   std::unique_ptr<Stream> CreateStream(StreamProducer* producer,
165                                        StreamID stream_id,
166                                        StreamPriority priority) {
167     return absl::WrapUnique(new Stream(this, producer, stream_id, priority));
168   }
169 
EnableMessageInterleaving(bool enabled)170   void EnableMessageInterleaving(bool enabled) {
171     enable_message_interleaving_ = enabled;
172   }
173 
174   // Makes the scheduler stop producing message from the current stream and
175   // re-evaluates which stream to produce from.
ForceReschedule()176   void ForceReschedule() { currently_sending_a_message_ = false; }
177 
178   // Produces a fragment of data to send. The current wall time is specified as
179   // `now` and will be used to skip chunks with expired limited lifetime. The
180   // parameter `max_size` specifies the maximum amount of actual payload that
181   // may be returned. If no data can be produced, `absl::nullopt` is returned.
182   absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size);
183 
184   std::set<StreamID> ActiveStreamsForTesting() const;
185 
186  private:
187   struct ActiveStreamComparator {
188     // Ordered by virtual finish time (primary), stream-id (secondary).
operatorActiveStreamComparator189     bool operator()(Stream* a, Stream* b) const {
190       VirtualTime a_vft = a->next_finish_time();
191       VirtualTime b_vft = b->next_finish_time();
192       if (a_vft == b_vft) {
193         return a->stream_id() < b->stream_id();
194       }
195       return a_vft < b_vft;
196     }
197   };
198 
199   bool IsConsistent() const;
200 
201   const size_t max_payload_bytes_;
202 
203   // The current virtual time, as defined in the WFQ algorithm.
204   VirtualTime virtual_time_ = VirtualTime::Zero();
205 
206   // The current stream to send chunks from.
207   Stream* current_stream_ = nullptr;
208 
209   bool enable_message_interleaving_ = false;
210 
211   // Indicates if the streams is currently sending a message, and should then
212   // - if message interleaving is not enabled - continue sending from this
213   // stream until that message has been sent in full.
214   bool currently_sending_a_message_ = false;
215 
216   // The currently active streams, ordered by virtual finish time.
217   webrtc::flat_set<Stream*, ActiveStreamComparator> active_streams_;
218 };
219 
220 }  // namespace dcsctp
221 
222 #endif  // NET_DCSCTP_TX_STREAM_SCHEDULER_H_
223