1 /* 2 * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 #ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_ 11 #define NET_DCSCTP_TX_RR_SEND_QUEUE_H_ 12 13 #include <cstdint> 14 #include <deque> 15 #include <map> 16 #include <memory> 17 #include <string> 18 #include <utility> 19 #include <vector> 20 21 #include "absl/algorithm/container.h" 22 #include "absl/strings/string_view.h" 23 #include "absl/types/optional.h" 24 #include "api/array_view.h" 25 #include "net/dcsctp/public/dcsctp_message.h" 26 #include "net/dcsctp/public/dcsctp_socket.h" 27 #include "net/dcsctp/public/types.h" 28 #include "net/dcsctp/tx/send_queue.h" 29 #include "net/dcsctp/tx/stream_scheduler.h" 30 31 namespace dcsctp { 32 33 // The Round Robin SendQueue holds all messages that the client wants to send, 34 // but that haven't yet been split into chunks and fully sent on the wire. 35 // 36 // As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2, 37 // it will cycle to send messages from different streams. It will send all 38 // fragments from one message before continuing with a different message on 39 // possibly a different stream, until support for message interleaving has been 40 // implemented. 41 // 42 // As messages can be (requested to be) sent before the connection is properly 43 // established, this send queue is always present - even for closed connections. 44 // 45 // The send queue may trigger callbacks: 46 // * `OnBufferedAmountLow`, `OnTotalBufferedAmountLow` 47 // These will be triggered as defined in their documentation. 48 // * `OnLifecycleMessageExpired(/*maybe_delivered=*/false)`, `OnLifecycleEnd` 49 // These will be triggered when messages have been expired, abandoned or 50 // discarded from the send queue. If a message is fully produced, meaning 51 // that the last fragment has been produced, the responsibility to send 52 // lifecycle events is then transferred to the retransmission queue, which 53 // is the one asking to produce the message. 54 class RRSendQueue : public SendQueue { 55 public: 56 RRSendQueue(absl::string_view log_prefix, 57 DcSctpSocketCallbacks* callbacks, 58 size_t buffer_size, 59 size_t mtu, 60 StreamPriority default_priority, 61 size_t total_buffered_amount_low_threshold); 62 63 // Indicates if the buffer is full. Note that it's up to the caller to ensure 64 // that the buffer is not full prior to adding new items to it. 65 bool IsFull() const; 66 // Indicates if the buffer is empty. 67 bool IsEmpty() const; 68 69 // Adds the message to be sent using the `send_options` provided. The current 70 // time should be in `now`. Note that it's the responsibility of the caller to 71 // ensure that the buffer is not full (by calling `IsFull`) before adding 72 // messages to it. 73 void Add(TimeMs now, 74 DcSctpMessage message, 75 const SendOptions& send_options = {}); 76 77 // Implementation of `SendQueue`. 78 absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override; 79 bool Discard(IsUnordered unordered, 80 StreamID stream_id, 81 MID message_id) override; 82 void PrepareResetStream(StreamID streams) override; 83 bool HasStreamsReadyToBeReset() const override; 84 std::vector<StreamID> GetStreamsReadyToBeReset() override; 85 void CommitResetStreams() override; 86 void RollbackResetStreams() override; 87 void Reset() override; 88 size_t buffered_amount(StreamID stream_id) const override; total_buffered_amount()89 size_t total_buffered_amount() const override { 90 return total_buffered_amount_.value(); 91 } 92 size_t buffered_amount_low_threshold(StreamID stream_id) const override; 93 void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override; EnableMessageInterleaving(bool enabled)94 void EnableMessageInterleaving(bool enabled) override { 95 scheduler_.EnableMessageInterleaving(enabled); 96 } 97 98 void SetStreamPriority(StreamID stream_id, StreamPriority priority); 99 StreamPriority GetStreamPriority(StreamID stream_id) const; 100 HandoverReadinessStatus GetHandoverReadiness() const; 101 void AddHandoverState(DcSctpSocketHandoverState& state); 102 void RestoreFromState(const DcSctpSocketHandoverState& state); 103 104 private: 105 struct MessageAttributes { 106 IsUnordered unordered; 107 MaxRetransmits max_retransmissions; 108 TimeMs expires_at; 109 LifecycleId lifecycle_id; 110 }; 111 112 // Represents a value and a "low threshold" that when the value reaches or 113 // goes under the "low threshold", will trigger `on_threshold_reached` 114 // callback. 115 class ThresholdWatcher { 116 public: ThresholdWatcher(std::function<void ()> on_threshold_reached)117 explicit ThresholdWatcher(std::function<void()> on_threshold_reached) 118 : on_threshold_reached_(std::move(on_threshold_reached)) {} 119 // Increases the value. Increase(size_t bytes)120 void Increase(size_t bytes) { value_ += bytes; } 121 // Decreases the value and triggers `on_threshold_reached` if it's at or 122 // below `low_threshold()`. 123 void Decrease(size_t bytes); 124 value()125 size_t value() const { return value_; } low_threshold()126 size_t low_threshold() const { return low_threshold_; } 127 void SetLowThreshold(size_t low_threshold); 128 129 private: 130 const std::function<void()> on_threshold_reached_; 131 size_t value_ = 0; 132 size_t low_threshold_ = 0; 133 }; 134 135 // Per-stream information. 136 class OutgoingStream : public StreamScheduler::StreamProducer { 137 public: 138 OutgoingStream( 139 RRSendQueue* parent, 140 StreamScheduler* scheduler, 141 StreamID stream_id, 142 StreamPriority priority, 143 std::function<void()> on_buffered_amount_low, 144 const DcSctpSocketHandoverState::OutgoingStream* state = nullptr) 145 : parent_(*parent), 146 scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)), 147 next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), 148 next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)), 149 next_ssn_(SSN(state ? state->next_ssn : 0)), 150 buffered_amount_(std::move(on_buffered_amount_low)) {} 151 stream_id()152 StreamID stream_id() const { return scheduler_stream_->stream_id(); } 153 154 // Enqueues a message to this stream. 155 void Add(DcSctpMessage message, MessageAttributes attributes); 156 157 // Implementing `StreamScheduler::StreamProducer`. 158 absl::optional<SendQueue::DataToSend> Produce(TimeMs now, 159 size_t max_size) override; 160 size_t bytes_to_send_in_next_message() const override; 161 buffered_amount()162 const ThresholdWatcher& buffered_amount() const { return buffered_amount_; } buffered_amount()163 ThresholdWatcher& buffered_amount() { return buffered_amount_; } 164 165 // Discards a partially sent message, see `SendQueue::Discard`. 166 bool Discard(IsUnordered unordered, MID message_id); 167 168 // Pauses this stream, which is used before resetting it. 169 void Pause(); 170 171 // Resumes a paused stream. 172 void Resume(); 173 IsReadyToBeReset()174 bool IsReadyToBeReset() const { 175 return pause_state_ == PauseState::kPaused; 176 } 177 IsResetting()178 bool IsResetting() const { return pause_state_ == PauseState::kResetting; } 179 SetAsResetting()180 void SetAsResetting() { 181 RTC_DCHECK(pause_state_ == PauseState::kPaused); 182 pause_state_ = PauseState::kResetting; 183 } 184 185 // Resets this stream, meaning MIDs and SSNs are set to zero. 186 void Reset(); 187 188 // Indicates if this stream has a partially sent message in it. 189 bool has_partially_sent_message() const; 190 priority()191 StreamPriority priority() const { return scheduler_stream_->priority(); } SetPriority(StreamPriority priority)192 void SetPriority(StreamPriority priority) { 193 scheduler_stream_->SetPriority(priority); 194 } 195 196 void AddHandoverState( 197 DcSctpSocketHandoverState::OutgoingStream& state) const; 198 199 private: 200 // Streams are paused before they can be reset. To reset a stream, the 201 // socket sends an outgoing stream reset command with the TSN of the last 202 // fragment of the last message, so that receivers and senders can agree on 203 // when it stopped. And if the send queue is in the middle of sending a 204 // message, and without fragments not yet sent and without TSNs allocated to 205 // them, it will keep sending data until that message has ended. 206 enum class PauseState { 207 // The stream is not paused, and not scheduled to be reset. 208 kNotPaused, 209 // The stream has requested to be reset/paused but is still producing 210 // fragments of a message that hasn't ended yet. When it does, it will 211 // transition to the `kPaused` state. 212 kPending, 213 // The stream is fully paused and can be reset. 214 kPaused, 215 // The stream has been added to an outgoing stream reset request and a 216 // response from the peer hasn't been received yet. 217 kResetting, 218 }; 219 220 // An enqueued message and metadata. 221 struct Item { ItemItem222 explicit Item(DcSctpMessage msg, MessageAttributes attributes) 223 : message(std::move(msg)), 224 attributes(std::move(attributes)), 225 remaining_offset(0), 226 remaining_size(message.payload().size()) {} 227 DcSctpMessage message; 228 MessageAttributes attributes; 229 // The remaining payload (offset and size) to be sent, when it has been 230 // fragmented. 231 size_t remaining_offset; 232 size_t remaining_size; 233 // If set, an allocated Message ID and SSN. Will be allocated when the 234 // first fragment is sent. 235 absl::optional<MID> message_id = absl::nullopt; 236 absl::optional<SSN> ssn = absl::nullopt; 237 // The current Fragment Sequence Number, incremented for each fragment. 238 FSN current_fsn = FSN(0); 239 }; 240 241 bool IsConsistent() const; 242 void HandleMessageExpired(OutgoingStream::Item& item); 243 244 RRSendQueue& parent_; 245 246 const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_; 247 248 PauseState pause_state_ = PauseState::kNotPaused; 249 // MIDs are different for unordered and ordered messages sent on a stream. 250 MID next_unordered_mid_; 251 MID next_ordered_mid_; 252 253 SSN next_ssn_; 254 // Enqueued messages, and metadata. 255 std::deque<Item> items_; 256 257 // The current amount of buffered data. 258 ThresholdWatcher buffered_amount_; 259 }; 260 261 bool IsConsistent() const; 262 OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id); 263 absl::optional<DataToSend> Produce( 264 std::map<StreamID, OutgoingStream>::iterator it, 265 TimeMs now, 266 size_t max_size); 267 268 const std::string log_prefix_; 269 DcSctpSocketCallbacks& callbacks_; 270 const size_t buffer_size_; 271 const StreamPriority default_priority_; 272 StreamScheduler scheduler_; 273 274 // The total amount of buffer data, for all streams. 275 ThresholdWatcher total_buffered_amount_; 276 277 // All streams, and messages added to those. 278 std::map<StreamID, OutgoingStream> streams_; 279 }; 280 } // namespace dcsctp 281 282 #endif // NET_DCSCTP_TX_RR_SEND_QUEUE_H_ 283