xref: /aosp_15_r20/external/webrtc/net/dcsctp/rx/interleaved_reassembly_streams.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "net/dcsctp/rx/interleaved_reassembly_streams.h"
11 
12 #include <stddef.h>
13 
14 #include <cstdint>
15 #include <functional>
16 #include <iterator>
17 #include <map>
18 #include <numeric>
19 #include <unordered_map>
20 #include <utility>
21 #include <vector>
22 
23 #include "absl/algorithm/container.h"
24 #include "api/array_view.h"
25 #include "net/dcsctp/common/sequence_numbers.h"
26 #include "net/dcsctp/packet/chunk/forward_tsn_common.h"
27 #include "net/dcsctp/packet/data.h"
28 #include "net/dcsctp/public/types.h"
29 #include "rtc_base/logging.h"
30 
31 namespace dcsctp {
32 
InterleavedReassemblyStreams(absl::string_view log_prefix,OnAssembledMessage on_assembled_message)33 InterleavedReassemblyStreams::InterleavedReassemblyStreams(
34     absl::string_view log_prefix,
35     OnAssembledMessage on_assembled_message)
36     : log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {}
37 
TryToAssembleMessage(UnwrappedMID mid)38 size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage(
39     UnwrappedMID mid) {
40   std::map<UnwrappedMID, ChunkMap>::const_iterator it =
41       chunks_by_mid_.find(mid);
42   if (it == chunks_by_mid_.end()) {
43     RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
44                          << *mid.Wrap() << " - no chunks";
45     return 0;
46   }
47   const ChunkMap& chunks = it->second;
48   if (!chunks.begin()->second.second.is_beginning ||
49       !chunks.rbegin()->second.second.is_end) {
50     RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
51                          << *mid.Wrap() << "- missing beginning or end";
52     return 0;
53   }
54   int64_t fsn_diff = *chunks.rbegin()->first - *chunks.begin()->first;
55   if (fsn_diff != (static_cast<int64_t>(chunks.size()) - 1)) {
56     RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
57                          << *mid.Wrap() << "- not all chunks exist (have "
58                          << chunks.size() << ", expect " << (fsn_diff + 1)
59                          << ")";
60     return 0;
61   }
62 
63   size_t removed_bytes = AssembleMessage(chunks);
64   RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
65                        << *mid.Wrap() << " - succeeded and removed "
66                        << removed_bytes;
67 
68   chunks_by_mid_.erase(mid);
69   return removed_bytes;
70 }
71 
AssembleMessage(const ChunkMap & tsn_chunks)72 size_t InterleavedReassemblyStreams::Stream::AssembleMessage(
73     const ChunkMap& tsn_chunks) {
74   size_t count = tsn_chunks.size();
75   if (count == 1) {
76     // Fast path - zero-copy
77     const Data& data = tsn_chunks.begin()->second.second;
78     size_t payload_size = data.size();
79     UnwrappedTSN tsns[1] = {tsn_chunks.begin()->second.first};
80     DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
81     parent_.on_assembled_message_(tsns, std::move(message));
82     return payload_size;
83   }
84 
85   // Slow path - will need to concatenate the payload.
86   std::vector<UnwrappedTSN> tsns;
87   tsns.reserve(count);
88 
89   std::vector<uint8_t> payload;
90   size_t payload_size = absl::c_accumulate(
91       tsn_chunks, 0,
92       [](size_t v, const auto& p) { return v + p.second.second.size(); });
93   payload.reserve(payload_size);
94 
95   for (auto& item : tsn_chunks) {
96     const UnwrappedTSN tsn = item.second.first;
97     const Data& data = item.second.second;
98     tsns.push_back(tsn);
99     payload.insert(payload.end(), data.payload.begin(), data.payload.end());
100   }
101 
102   const Data& data = tsn_chunks.begin()->second.second;
103 
104   DcSctpMessage message(data.stream_id, data.ppid, std::move(payload));
105   parent_.on_assembled_message_(tsns, std::move(message));
106   return payload_size;
107 }
108 
EraseTo(MID message_id)109 size_t InterleavedReassemblyStreams::Stream::EraseTo(MID message_id) {
110   UnwrappedMID unwrapped_mid = mid_unwrapper_.Unwrap(message_id);
111 
112   size_t removed_bytes = 0;
113   auto it = chunks_by_mid_.begin();
114   while (it != chunks_by_mid_.end() && it->first <= unwrapped_mid) {
115     removed_bytes += absl::c_accumulate(
116         it->second, 0,
117         [](size_t r2, const auto& q) { return r2 + q.second.second.size(); });
118     it = chunks_by_mid_.erase(it);
119   }
120 
121   if (!stream_id_.unordered) {
122     // For ordered streams, erasing a message might suddenly unblock that queue
123     // and allow it to deliver any following received messages.
124     if (unwrapped_mid >= next_mid_) {
125       next_mid_ = unwrapped_mid.next_value();
126     }
127 
128     removed_bytes += TryToAssembleMessages();
129   }
130 
131   return removed_bytes;
132 }
133 
Add(UnwrappedTSN tsn,Data data)134 int InterleavedReassemblyStreams::Stream::Add(UnwrappedTSN tsn, Data data) {
135   RTC_DCHECK_EQ(*data.is_unordered, *stream_id_.unordered);
136   RTC_DCHECK_EQ(*data.stream_id, *stream_id_.stream_id);
137   int queued_bytes = data.size();
138   UnwrappedMID mid = mid_unwrapper_.Unwrap(data.message_id);
139   FSN fsn = data.fsn;
140   auto [unused, inserted] =
141       chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data)));
142   if (!inserted) {
143     return 0;
144   }
145 
146   if (stream_id_.unordered) {
147     queued_bytes -= TryToAssembleMessage(mid);
148   } else {
149     if (mid == next_mid_) {
150       queued_bytes -= TryToAssembleMessages();
151     }
152   }
153 
154   return queued_bytes;
155 }
156 
TryToAssembleMessages()157 size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessages() {
158   size_t removed_bytes = 0;
159 
160   for (;;) {
161     size_t removed_bytes_this_iter = TryToAssembleMessage(next_mid_);
162     if (removed_bytes_this_iter == 0) {
163       break;
164     }
165 
166     removed_bytes += removed_bytes_this_iter;
167     next_mid_.Increment();
168   }
169   return removed_bytes;
170 }
171 
AddHandoverState(DcSctpSocketHandoverState & state) const172 void InterleavedReassemblyStreams::Stream::AddHandoverState(
173     DcSctpSocketHandoverState& state) const {
174   if (stream_id_.unordered) {
175     DcSctpSocketHandoverState::UnorderedStream state_stream;
176     state_stream.id = stream_id_.stream_id.value();
177     state.rx.unordered_streams.push_back(std::move(state_stream));
178   } else {
179     DcSctpSocketHandoverState::OrderedStream state_stream;
180     state_stream.id = stream_id_.stream_id.value();
181     state_stream.next_ssn = next_mid_.Wrap().value();
182     state.rx.ordered_streams.push_back(std::move(state_stream));
183   }
184 }
185 
186 InterleavedReassemblyStreams::Stream&
GetOrCreateStream(const FullStreamId & stream_id)187 InterleavedReassemblyStreams::GetOrCreateStream(const FullStreamId& stream_id) {
188   auto it = streams_.find(stream_id);
189   if (it == streams_.end()) {
190     it =
191         streams_
192             .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
193                      std::forward_as_tuple(stream_id, this))
194             .first;
195   }
196   return it->second;
197 }
198 
Add(UnwrappedTSN tsn,Data data)199 int InterleavedReassemblyStreams::Add(UnwrappedTSN tsn, Data data) {
200   return GetOrCreateStream(FullStreamId(data.is_unordered, data.stream_id))
201       .Add(tsn, std::move(data));
202 }
203 
HandleForwardTsn(UnwrappedTSN new_cumulative_ack_tsn,rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams)204 size_t InterleavedReassemblyStreams::HandleForwardTsn(
205     UnwrappedTSN new_cumulative_ack_tsn,
206     rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
207   size_t removed_bytes = 0;
208   for (const auto& skipped : skipped_streams) {
209     removed_bytes +=
210         GetOrCreateStream(FullStreamId(skipped.unordered, skipped.stream_id))
211             .EraseTo(skipped.message_id);
212   }
213   return removed_bytes;
214 }
215 
ResetStreams(rtc::ArrayView<const StreamID> stream_ids)216 void InterleavedReassemblyStreams::ResetStreams(
217     rtc::ArrayView<const StreamID> stream_ids) {
218   if (stream_ids.empty()) {
219     for (auto& entry : streams_) {
220       entry.second.Reset();
221     }
222   } else {
223     for (StreamID stream_id : stream_ids) {
224       GetOrCreateStream(FullStreamId(IsUnordered(true), stream_id)).Reset();
225       GetOrCreateStream(FullStreamId(IsUnordered(false), stream_id)).Reset();
226     }
227   }
228 }
229 
GetHandoverReadiness() const230 HandoverReadinessStatus InterleavedReassemblyStreams::GetHandoverReadiness()
231     const {
232   HandoverReadinessStatus status;
233   for (const auto& [stream_id, stream] : streams_) {
234     if (stream.has_unassembled_chunks()) {
235       status.Add(
236           stream_id.unordered
237               ? HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks
238               : HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks);
239       break;
240     }
241   }
242   return status;
243 }
244 
AddHandoverState(DcSctpSocketHandoverState & state)245 void InterleavedReassemblyStreams::AddHandoverState(
246     DcSctpSocketHandoverState& state) {
247   for (const auto& [unused, stream] : streams_) {
248     stream.AddHandoverState(state);
249   }
250 }
251 
RestoreFromState(const DcSctpSocketHandoverState & state)252 void InterleavedReassemblyStreams::RestoreFromState(
253     const DcSctpSocketHandoverState& state) {
254   // Validate that the component is in pristine state.
255   RTC_DCHECK(streams_.empty());
256 
257   for (const DcSctpSocketHandoverState::OrderedStream& state :
258        state.rx.ordered_streams) {
259     FullStreamId stream_id(IsUnordered(false), StreamID(state.id));
260     streams_.emplace(
261         std::piecewise_construct, std::forward_as_tuple(stream_id),
262         std::forward_as_tuple(stream_id, this, MID(state.next_ssn)));
263   }
264   for (const DcSctpSocketHandoverState::UnorderedStream& state :
265        state.rx.unordered_streams) {
266     FullStreamId stream_id(IsUnordered(true), StreamID(state.id));
267     streams_.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
268                      std::forward_as_tuple(stream_id, this));
269   }
270 }
271 
272 }  // namespace dcsctp
273