1 /*
2 * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include "net/dcsctp/rx/interleaved_reassembly_streams.h"
11
12 #include <stddef.h>
13
14 #include <cstdint>
15 #include <functional>
16 #include <iterator>
17 #include <map>
18 #include <numeric>
19 #include <unordered_map>
20 #include <utility>
21 #include <vector>
22
23 #include "absl/algorithm/container.h"
24 #include "api/array_view.h"
25 #include "net/dcsctp/common/sequence_numbers.h"
26 #include "net/dcsctp/packet/chunk/forward_tsn_common.h"
27 #include "net/dcsctp/packet/data.h"
28 #include "net/dcsctp/public/types.h"
29 #include "rtc_base/logging.h"
30
31 namespace dcsctp {
32
InterleavedReassemblyStreams(absl::string_view log_prefix,OnAssembledMessage on_assembled_message)33 InterleavedReassemblyStreams::InterleavedReassemblyStreams(
34 absl::string_view log_prefix,
35 OnAssembledMessage on_assembled_message)
36 : log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {}
37
TryToAssembleMessage(UnwrappedMID mid)38 size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage(
39 UnwrappedMID mid) {
40 std::map<UnwrappedMID, ChunkMap>::const_iterator it =
41 chunks_by_mid_.find(mid);
42 if (it == chunks_by_mid_.end()) {
43 RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
44 << *mid.Wrap() << " - no chunks";
45 return 0;
46 }
47 const ChunkMap& chunks = it->second;
48 if (!chunks.begin()->second.second.is_beginning ||
49 !chunks.rbegin()->second.second.is_end) {
50 RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
51 << *mid.Wrap() << "- missing beginning or end";
52 return 0;
53 }
54 int64_t fsn_diff = *chunks.rbegin()->first - *chunks.begin()->first;
55 if (fsn_diff != (static_cast<int64_t>(chunks.size()) - 1)) {
56 RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
57 << *mid.Wrap() << "- not all chunks exist (have "
58 << chunks.size() << ", expect " << (fsn_diff + 1)
59 << ")";
60 return 0;
61 }
62
63 size_t removed_bytes = AssembleMessage(chunks);
64 RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
65 << *mid.Wrap() << " - succeeded and removed "
66 << removed_bytes;
67
68 chunks_by_mid_.erase(mid);
69 return removed_bytes;
70 }
71
AssembleMessage(const ChunkMap & tsn_chunks)72 size_t InterleavedReassemblyStreams::Stream::AssembleMessage(
73 const ChunkMap& tsn_chunks) {
74 size_t count = tsn_chunks.size();
75 if (count == 1) {
76 // Fast path - zero-copy
77 const Data& data = tsn_chunks.begin()->second.second;
78 size_t payload_size = data.size();
79 UnwrappedTSN tsns[1] = {tsn_chunks.begin()->second.first};
80 DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
81 parent_.on_assembled_message_(tsns, std::move(message));
82 return payload_size;
83 }
84
85 // Slow path - will need to concatenate the payload.
86 std::vector<UnwrappedTSN> tsns;
87 tsns.reserve(count);
88
89 std::vector<uint8_t> payload;
90 size_t payload_size = absl::c_accumulate(
91 tsn_chunks, 0,
92 [](size_t v, const auto& p) { return v + p.second.second.size(); });
93 payload.reserve(payload_size);
94
95 for (auto& item : tsn_chunks) {
96 const UnwrappedTSN tsn = item.second.first;
97 const Data& data = item.second.second;
98 tsns.push_back(tsn);
99 payload.insert(payload.end(), data.payload.begin(), data.payload.end());
100 }
101
102 const Data& data = tsn_chunks.begin()->second.second;
103
104 DcSctpMessage message(data.stream_id, data.ppid, std::move(payload));
105 parent_.on_assembled_message_(tsns, std::move(message));
106 return payload_size;
107 }
108
EraseTo(MID message_id)109 size_t InterleavedReassemblyStreams::Stream::EraseTo(MID message_id) {
110 UnwrappedMID unwrapped_mid = mid_unwrapper_.Unwrap(message_id);
111
112 size_t removed_bytes = 0;
113 auto it = chunks_by_mid_.begin();
114 while (it != chunks_by_mid_.end() && it->first <= unwrapped_mid) {
115 removed_bytes += absl::c_accumulate(
116 it->second, 0,
117 [](size_t r2, const auto& q) { return r2 + q.second.second.size(); });
118 it = chunks_by_mid_.erase(it);
119 }
120
121 if (!stream_id_.unordered) {
122 // For ordered streams, erasing a message might suddenly unblock that queue
123 // and allow it to deliver any following received messages.
124 if (unwrapped_mid >= next_mid_) {
125 next_mid_ = unwrapped_mid.next_value();
126 }
127
128 removed_bytes += TryToAssembleMessages();
129 }
130
131 return removed_bytes;
132 }
133
Add(UnwrappedTSN tsn,Data data)134 int InterleavedReassemblyStreams::Stream::Add(UnwrappedTSN tsn, Data data) {
135 RTC_DCHECK_EQ(*data.is_unordered, *stream_id_.unordered);
136 RTC_DCHECK_EQ(*data.stream_id, *stream_id_.stream_id);
137 int queued_bytes = data.size();
138 UnwrappedMID mid = mid_unwrapper_.Unwrap(data.message_id);
139 FSN fsn = data.fsn;
140 auto [unused, inserted] =
141 chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data)));
142 if (!inserted) {
143 return 0;
144 }
145
146 if (stream_id_.unordered) {
147 queued_bytes -= TryToAssembleMessage(mid);
148 } else {
149 if (mid == next_mid_) {
150 queued_bytes -= TryToAssembleMessages();
151 }
152 }
153
154 return queued_bytes;
155 }
156
TryToAssembleMessages()157 size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessages() {
158 size_t removed_bytes = 0;
159
160 for (;;) {
161 size_t removed_bytes_this_iter = TryToAssembleMessage(next_mid_);
162 if (removed_bytes_this_iter == 0) {
163 break;
164 }
165
166 removed_bytes += removed_bytes_this_iter;
167 next_mid_.Increment();
168 }
169 return removed_bytes;
170 }
171
AddHandoverState(DcSctpSocketHandoverState & state) const172 void InterleavedReassemblyStreams::Stream::AddHandoverState(
173 DcSctpSocketHandoverState& state) const {
174 if (stream_id_.unordered) {
175 DcSctpSocketHandoverState::UnorderedStream state_stream;
176 state_stream.id = stream_id_.stream_id.value();
177 state.rx.unordered_streams.push_back(std::move(state_stream));
178 } else {
179 DcSctpSocketHandoverState::OrderedStream state_stream;
180 state_stream.id = stream_id_.stream_id.value();
181 state_stream.next_ssn = next_mid_.Wrap().value();
182 state.rx.ordered_streams.push_back(std::move(state_stream));
183 }
184 }
185
186 InterleavedReassemblyStreams::Stream&
GetOrCreateStream(const FullStreamId & stream_id)187 InterleavedReassemblyStreams::GetOrCreateStream(const FullStreamId& stream_id) {
188 auto it = streams_.find(stream_id);
189 if (it == streams_.end()) {
190 it =
191 streams_
192 .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
193 std::forward_as_tuple(stream_id, this))
194 .first;
195 }
196 return it->second;
197 }
198
Add(UnwrappedTSN tsn,Data data)199 int InterleavedReassemblyStreams::Add(UnwrappedTSN tsn, Data data) {
200 return GetOrCreateStream(FullStreamId(data.is_unordered, data.stream_id))
201 .Add(tsn, std::move(data));
202 }
203
HandleForwardTsn(UnwrappedTSN new_cumulative_ack_tsn,rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams)204 size_t InterleavedReassemblyStreams::HandleForwardTsn(
205 UnwrappedTSN new_cumulative_ack_tsn,
206 rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
207 size_t removed_bytes = 0;
208 for (const auto& skipped : skipped_streams) {
209 removed_bytes +=
210 GetOrCreateStream(FullStreamId(skipped.unordered, skipped.stream_id))
211 .EraseTo(skipped.message_id);
212 }
213 return removed_bytes;
214 }
215
ResetStreams(rtc::ArrayView<const StreamID> stream_ids)216 void InterleavedReassemblyStreams::ResetStreams(
217 rtc::ArrayView<const StreamID> stream_ids) {
218 if (stream_ids.empty()) {
219 for (auto& entry : streams_) {
220 entry.second.Reset();
221 }
222 } else {
223 for (StreamID stream_id : stream_ids) {
224 GetOrCreateStream(FullStreamId(IsUnordered(true), stream_id)).Reset();
225 GetOrCreateStream(FullStreamId(IsUnordered(false), stream_id)).Reset();
226 }
227 }
228 }
229
GetHandoverReadiness() const230 HandoverReadinessStatus InterleavedReassemblyStreams::GetHandoverReadiness()
231 const {
232 HandoverReadinessStatus status;
233 for (const auto& [stream_id, stream] : streams_) {
234 if (stream.has_unassembled_chunks()) {
235 status.Add(
236 stream_id.unordered
237 ? HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks
238 : HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks);
239 break;
240 }
241 }
242 return status;
243 }
244
AddHandoverState(DcSctpSocketHandoverState & state)245 void InterleavedReassemblyStreams::AddHandoverState(
246 DcSctpSocketHandoverState& state) {
247 for (const auto& [unused, stream] : streams_) {
248 stream.AddHandoverState(state);
249 }
250 }
251
RestoreFromState(const DcSctpSocketHandoverState & state)252 void InterleavedReassemblyStreams::RestoreFromState(
253 const DcSctpSocketHandoverState& state) {
254 // Validate that the component is in pristine state.
255 RTC_DCHECK(streams_.empty());
256
257 for (const DcSctpSocketHandoverState::OrderedStream& state :
258 state.rx.ordered_streams) {
259 FullStreamId stream_id(IsUnordered(false), StreamID(state.id));
260 streams_.emplace(
261 std::piecewise_construct, std::forward_as_tuple(stream_id),
262 std::forward_as_tuple(stream_id, this, MID(state.next_ssn)));
263 }
264 for (const DcSctpSocketHandoverState::UnorderedStream& state :
265 state.rx.unordered_streams) {
266 FullStreamId stream_id(IsUnordered(true), StreamID(state.id));
267 streams_.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
268 std::forward_as_tuple(stream_id, this));
269 }
270 }
271
272 } // namespace dcsctp
273