xref: /aosp_15_r20/external/webrtc/net/dcsctp/tx/rr_send_queue.h (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_
11 #define NET_DCSCTP_TX_RR_SEND_QUEUE_H_
12 
13 #include <cstdint>
14 #include <deque>
15 #include <map>
16 #include <memory>
17 #include <string>
18 #include <utility>
19 #include <vector>
20 
21 #include "absl/algorithm/container.h"
22 #include "absl/strings/string_view.h"
23 #include "absl/types/optional.h"
24 #include "api/array_view.h"
25 #include "net/dcsctp/public/dcsctp_message.h"
26 #include "net/dcsctp/public/dcsctp_socket.h"
27 #include "net/dcsctp/public/types.h"
28 #include "net/dcsctp/tx/send_queue.h"
29 #include "net/dcsctp/tx/stream_scheduler.h"
30 
31 namespace dcsctp {
32 
33 // The Round Robin SendQueue holds all messages that the client wants to send,
34 // but that haven't yet been split into chunks and fully sent on the wire.
35 //
36 // As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2,
37 // it will cycle to send messages from different streams. It will send all
38 // fragments from one message before continuing with a different message on
39 // possibly a different stream, until support for message interleaving has been
40 // implemented.
41 //
42 // As messages can be (requested to be) sent before the connection is properly
43 // established, this send queue is always present - even for closed connections.
44 //
45 // The send queue may trigger callbacks:
46 //  * `OnBufferedAmountLow`, `OnTotalBufferedAmountLow`
47 //    These will be triggered as defined in their documentation.
48 //  * `OnLifecycleMessageExpired(/*maybe_delivered=*/false)`, `OnLifecycleEnd`
49 //    These will be triggered when messages have been expired, abandoned or
50 //    discarded from the send queue. If a message is fully produced, meaning
51 //    that the last fragment has been produced, the responsibility to send
52 //    lifecycle events is then transferred to the retransmission queue, which
53 //    is the one asking to produce the message.
54 class RRSendQueue : public SendQueue {
55  public:
56   RRSendQueue(absl::string_view log_prefix,
57               DcSctpSocketCallbacks* callbacks,
58               size_t buffer_size,
59               size_t mtu,
60               StreamPriority default_priority,
61               size_t total_buffered_amount_low_threshold);
62 
63   // Indicates if the buffer is full. Note that it's up to the caller to ensure
64   // that the buffer is not full prior to adding new items to it.
65   bool IsFull() const;
66   // Indicates if the buffer is empty.
67   bool IsEmpty() const;
68 
69   // Adds the message to be sent using the `send_options` provided. The current
70   // time should be in `now`. Note that it's the responsibility of the caller to
71   // ensure that the buffer is not full (by calling `IsFull`) before adding
72   // messages to it.
73   void Add(TimeMs now,
74            DcSctpMessage message,
75            const SendOptions& send_options = {});
76 
77   // Implementation of `SendQueue`.
78   absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override;
79   bool Discard(IsUnordered unordered,
80                StreamID stream_id,
81                MID message_id) override;
82   void PrepareResetStream(StreamID streams) override;
83   bool HasStreamsReadyToBeReset() const override;
84   std::vector<StreamID> GetStreamsReadyToBeReset() override;
85   void CommitResetStreams() override;
86   void RollbackResetStreams() override;
87   void Reset() override;
88   size_t buffered_amount(StreamID stream_id) const override;
total_buffered_amount()89   size_t total_buffered_amount() const override {
90     return total_buffered_amount_.value();
91   }
92   size_t buffered_amount_low_threshold(StreamID stream_id) const override;
93   void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
EnableMessageInterleaving(bool enabled)94   void EnableMessageInterleaving(bool enabled) override {
95     scheduler_.EnableMessageInterleaving(enabled);
96   }
97 
98   void SetStreamPriority(StreamID stream_id, StreamPriority priority);
99   StreamPriority GetStreamPriority(StreamID stream_id) const;
100   HandoverReadinessStatus GetHandoverReadiness() const;
101   void AddHandoverState(DcSctpSocketHandoverState& state);
102   void RestoreFromState(const DcSctpSocketHandoverState& state);
103 
104  private:
105   struct MessageAttributes {
106     IsUnordered unordered;
107     MaxRetransmits max_retransmissions;
108     TimeMs expires_at;
109     LifecycleId lifecycle_id;
110   };
111 
112   // Represents a value and a "low threshold" that when the value reaches or
113   // goes under the "low threshold", will trigger `on_threshold_reached`
114   // callback.
115   class ThresholdWatcher {
116    public:
ThresholdWatcher(std::function<void ()> on_threshold_reached)117     explicit ThresholdWatcher(std::function<void()> on_threshold_reached)
118         : on_threshold_reached_(std::move(on_threshold_reached)) {}
119     // Increases the value.
Increase(size_t bytes)120     void Increase(size_t bytes) { value_ += bytes; }
121     // Decreases the value and triggers `on_threshold_reached` if it's at or
122     // below `low_threshold()`.
123     void Decrease(size_t bytes);
124 
value()125     size_t value() const { return value_; }
low_threshold()126     size_t low_threshold() const { return low_threshold_; }
127     void SetLowThreshold(size_t low_threshold);
128 
129    private:
130     const std::function<void()> on_threshold_reached_;
131     size_t value_ = 0;
132     size_t low_threshold_ = 0;
133   };
134 
135   // Per-stream information.
136   class OutgoingStream : public StreamScheduler::StreamProducer {
137    public:
138     OutgoingStream(
139         RRSendQueue* parent,
140         StreamScheduler* scheduler,
141         StreamID stream_id,
142         StreamPriority priority,
143         std::function<void()> on_buffered_amount_low,
144         const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
145         : parent_(*parent),
146           scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
147           next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
148           next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
149           next_ssn_(SSN(state ? state->next_ssn : 0)),
150           buffered_amount_(std::move(on_buffered_amount_low)) {}
151 
stream_id()152     StreamID stream_id() const { return scheduler_stream_->stream_id(); }
153 
154     // Enqueues a message to this stream.
155     void Add(DcSctpMessage message, MessageAttributes attributes);
156 
157     // Implementing `StreamScheduler::StreamProducer`.
158     absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
159                                                   size_t max_size) override;
160     size_t bytes_to_send_in_next_message() const override;
161 
buffered_amount()162     const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
buffered_amount()163     ThresholdWatcher& buffered_amount() { return buffered_amount_; }
164 
165     // Discards a partially sent message, see `SendQueue::Discard`.
166     bool Discard(IsUnordered unordered, MID message_id);
167 
168     // Pauses this stream, which is used before resetting it.
169     void Pause();
170 
171     // Resumes a paused stream.
172     void Resume();
173 
IsReadyToBeReset()174     bool IsReadyToBeReset() const {
175       return pause_state_ == PauseState::kPaused;
176     }
177 
IsResetting()178     bool IsResetting() const { return pause_state_ == PauseState::kResetting; }
179 
SetAsResetting()180     void SetAsResetting() {
181       RTC_DCHECK(pause_state_ == PauseState::kPaused);
182       pause_state_ = PauseState::kResetting;
183     }
184 
185     // Resets this stream, meaning MIDs and SSNs are set to zero.
186     void Reset();
187 
188     // Indicates if this stream has a partially sent message in it.
189     bool has_partially_sent_message() const;
190 
priority()191     StreamPriority priority() const { return scheduler_stream_->priority(); }
SetPriority(StreamPriority priority)192     void SetPriority(StreamPriority priority) {
193       scheduler_stream_->SetPriority(priority);
194     }
195 
196     void AddHandoverState(
197         DcSctpSocketHandoverState::OutgoingStream& state) const;
198 
199    private:
200     // Streams are paused before they can be reset. To reset a stream, the
201     // socket sends an outgoing stream reset command with the TSN of the last
202     // fragment of the last message, so that receivers and senders can agree on
203     // when it stopped. And if the send queue is in the middle of sending a
204     // message, and without fragments not yet sent and without TSNs allocated to
205     // them, it will keep sending data until that message has ended.
206     enum class PauseState {
207       // The stream is not paused, and not scheduled to be reset.
208       kNotPaused,
209       // The stream has requested to be reset/paused but is still producing
210       // fragments of a message that hasn't ended yet. When it does, it will
211       // transition to the `kPaused` state.
212       kPending,
213       // The stream is fully paused and can be reset.
214       kPaused,
215       // The stream has been added to an outgoing stream reset request and a
216       // response from the peer hasn't been received yet.
217       kResetting,
218     };
219 
220     // An enqueued message and metadata.
221     struct Item {
ItemItem222       explicit Item(DcSctpMessage msg, MessageAttributes attributes)
223           : message(std::move(msg)),
224             attributes(std::move(attributes)),
225             remaining_offset(0),
226             remaining_size(message.payload().size()) {}
227       DcSctpMessage message;
228       MessageAttributes attributes;
229       // The remaining payload (offset and size) to be sent, when it has been
230       // fragmented.
231       size_t remaining_offset;
232       size_t remaining_size;
233       // If set, an allocated Message ID and SSN. Will be allocated when the
234       // first fragment is sent.
235       absl::optional<MID> message_id = absl::nullopt;
236       absl::optional<SSN> ssn = absl::nullopt;
237       // The current Fragment Sequence Number, incremented for each fragment.
238       FSN current_fsn = FSN(0);
239     };
240 
241     bool IsConsistent() const;
242     void HandleMessageExpired(OutgoingStream::Item& item);
243 
244     RRSendQueue& parent_;
245 
246     const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
247 
248     PauseState pause_state_ = PauseState::kNotPaused;
249     // MIDs are different for unordered and ordered messages sent on a stream.
250     MID next_unordered_mid_;
251     MID next_ordered_mid_;
252 
253     SSN next_ssn_;
254     // Enqueued messages, and metadata.
255     std::deque<Item> items_;
256 
257     // The current amount of buffered data.
258     ThresholdWatcher buffered_amount_;
259   };
260 
261   bool IsConsistent() const;
262   OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id);
263   absl::optional<DataToSend> Produce(
264       std::map<StreamID, OutgoingStream>::iterator it,
265       TimeMs now,
266       size_t max_size);
267 
268   const std::string log_prefix_;
269   DcSctpSocketCallbacks& callbacks_;
270   const size_t buffer_size_;
271   const StreamPriority default_priority_;
272   StreamScheduler scheduler_;
273 
274   // The total amount of buffer data, for all streams.
275   ThresholdWatcher total_buffered_amount_;
276 
277   // All streams, and messages added to those.
278   std::map<StreamID, OutgoingStream> streams_;
279 };
280 }  // namespace dcsctp
281 
282 #endif  // NET_DCSCTP_TX_RR_SEND_QUEUE_H_
283