1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/rtp_packet_history.h"
12
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16 #include <utility>
17
18 #include "modules/include/module_common_types_public.h"
19 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
20 #include "rtc_base/checks.h"
21 #include "rtc_base/logging.h"
22 #include "system_wrappers/include/clock.h"
23
24 namespace webrtc {
25
StoredPacket(std::unique_ptr<RtpPacketToSend> packet,Timestamp send_time,uint64_t insert_order)26 RtpPacketHistory::StoredPacket::StoredPacket(
27 std::unique_ptr<RtpPacketToSend> packet,
28 Timestamp send_time,
29 uint64_t insert_order)
30 : packet_(std::move(packet)),
31 pending_transmission_(false),
32 send_time_(send_time),
33 insert_order_(insert_order),
34 times_retransmitted_(0) {}
35
36 RtpPacketHistory::StoredPacket::StoredPacket(StoredPacket&&) = default;
37 RtpPacketHistory::StoredPacket& RtpPacketHistory::StoredPacket::operator=(
38 RtpPacketHistory::StoredPacket&&) = default;
39 RtpPacketHistory::StoredPacket::~StoredPacket() = default;
40
IncrementTimesRetransmitted(PacketPrioritySet * priority_set)41 void RtpPacketHistory::StoredPacket::IncrementTimesRetransmitted(
42 PacketPrioritySet* priority_set) {
43 // Check if this StoredPacket is in the priority set. If so, we need to remove
44 // it before updating `times_retransmitted_` since that is used in sorting,
45 // and then add it back.
46 const bool in_priority_set = priority_set && priority_set->erase(this) > 0;
47 ++times_retransmitted_;
48 if (in_priority_set) {
49 auto it = priority_set->insert(this);
50 RTC_DCHECK(it.second)
51 << "ERROR: Priority set already contains matching packet! In set: "
52 "insert order = "
53 << (*it.first)->insert_order_
54 << ", times retransmitted = " << (*it.first)->times_retransmitted_
55 << ". Trying to add: insert order = " << insert_order_
56 << ", times retransmitted = " << times_retransmitted_;
57 }
58 }
59
operator ()(StoredPacket * lhs,StoredPacket * rhs) const60 bool RtpPacketHistory::MoreUseful::operator()(StoredPacket* lhs,
61 StoredPacket* rhs) const {
62 // Prefer to send packets we haven't already sent as padding.
63 if (lhs->times_retransmitted() != rhs->times_retransmitted()) {
64 return lhs->times_retransmitted() < rhs->times_retransmitted();
65 }
66 // All else being equal, prefer newer packets.
67 return lhs->insert_order() > rhs->insert_order();
68 }
69
RtpPacketHistory(Clock * clock,bool enable_padding_prio)70 RtpPacketHistory::RtpPacketHistory(Clock* clock, bool enable_padding_prio)
71 : clock_(clock),
72 enable_padding_prio_(enable_padding_prio),
73 number_to_store_(0),
74 mode_(StorageMode::kDisabled),
75 rtt_(TimeDelta::MinusInfinity()),
76 packets_inserted_(0) {}
77
~RtpPacketHistory()78 RtpPacketHistory::~RtpPacketHistory() {}
79
SetStorePacketsStatus(StorageMode mode,size_t number_to_store)80 void RtpPacketHistory::SetStorePacketsStatus(StorageMode mode,
81 size_t number_to_store) {
82 RTC_DCHECK_LE(number_to_store, kMaxCapacity);
83 MutexLock lock(&lock_);
84 if (mode != StorageMode::kDisabled && mode_ != StorageMode::kDisabled) {
85 RTC_LOG(LS_WARNING) << "Purging packet history in order to re-set status.";
86 }
87 Reset();
88 mode_ = mode;
89 number_to_store_ = std::min(kMaxCapacity, number_to_store);
90 }
91
GetStorageMode() const92 RtpPacketHistory::StorageMode RtpPacketHistory::GetStorageMode() const {
93 MutexLock lock(&lock_);
94 return mode_;
95 }
96
SetRtt(TimeDelta rtt)97 void RtpPacketHistory::SetRtt(TimeDelta rtt) {
98 MutexLock lock(&lock_);
99 RTC_DCHECK_GE(rtt, TimeDelta::Zero());
100 rtt_ = rtt;
101 // If storage is not disabled, packets will be removed after a timeout
102 // that depends on the RTT. Changing the RTT may thus cause some packets
103 // become "old" and subject to removal.
104 if (mode_ != StorageMode::kDisabled) {
105 CullOldPackets();
106 }
107 }
108
PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet,Timestamp send_time)109 void RtpPacketHistory::PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
110 Timestamp send_time) {
111 RTC_DCHECK(packet);
112 MutexLock lock(&lock_);
113 if (mode_ == StorageMode::kDisabled) {
114 return;
115 }
116
117 RTC_DCHECK(packet->allow_retransmission());
118 CullOldPackets();
119
120 // Store packet.
121 const uint16_t rtp_seq_no = packet->SequenceNumber();
122 int packet_index = GetPacketIndex(rtp_seq_no);
123 if (packet_index >= 0 &&
124 static_cast<size_t>(packet_index) < packet_history_.size() &&
125 packet_history_[packet_index].packet_ != nullptr) {
126 RTC_LOG(LS_WARNING) << "Duplicate packet inserted: " << rtp_seq_no;
127 // Remove previous packet to avoid inconsistent state.
128 RemovePacket(packet_index);
129 packet_index = GetPacketIndex(rtp_seq_no);
130 }
131
132 // Packet to be inserted ahead of first packet, expand front.
133 for (; packet_index < 0; ++packet_index) {
134 packet_history_.emplace_front();
135 }
136 // Packet to be inserted behind last packet, expand back.
137 while (static_cast<int>(packet_history_.size()) <= packet_index) {
138 packet_history_.emplace_back();
139 }
140
141 RTC_DCHECK_GE(packet_index, 0);
142 RTC_DCHECK_LT(packet_index, packet_history_.size());
143 RTC_DCHECK(packet_history_[packet_index].packet_ == nullptr);
144
145 packet_history_[packet_index] =
146 StoredPacket(std::move(packet), send_time, packets_inserted_++);
147
148 if (enable_padding_prio_) {
149 if (padding_priority_.size() >= kMaxPaddingHistory - 1) {
150 padding_priority_.erase(std::prev(padding_priority_.end()));
151 }
152 auto prio_it = padding_priority_.insert(&packet_history_[packet_index]);
153 RTC_DCHECK(prio_it.second) << "Failed to insert packet into prio set.";
154 }
155 }
156
GetPacketAndMarkAsPending(uint16_t sequence_number)157 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending(
158 uint16_t sequence_number) {
159 return GetPacketAndMarkAsPending(
160 sequence_number, [](const RtpPacketToSend& packet) {
161 return std::make_unique<RtpPacketToSend>(packet);
162 });
163 }
164
GetPacketAndMarkAsPending(uint16_t sequence_number,rtc::FunctionView<std::unique_ptr<RtpPacketToSend> (const RtpPacketToSend &)> encapsulate)165 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending(
166 uint16_t sequence_number,
167 rtc::FunctionView<std::unique_ptr<RtpPacketToSend>(const RtpPacketToSend&)>
168 encapsulate) {
169 MutexLock lock(&lock_);
170 if (mode_ == StorageMode::kDisabled) {
171 return nullptr;
172 }
173
174 StoredPacket* packet = GetStoredPacket(sequence_number);
175 if (packet == nullptr) {
176 return nullptr;
177 }
178
179 if (packet->pending_transmission_) {
180 // Packet already in pacer queue, ignore this request.
181 return nullptr;
182 }
183
184 if (!VerifyRtt(*packet)) {
185 // Packet already resent within too short a time window, ignore.
186 return nullptr;
187 }
188
189 // Copy and/or encapsulate packet.
190 std::unique_ptr<RtpPacketToSend> encapsulated_packet =
191 encapsulate(*packet->packet_);
192 if (encapsulated_packet) {
193 packet->pending_transmission_ = true;
194 }
195
196 return encapsulated_packet;
197 }
198
MarkPacketAsSent(uint16_t sequence_number)199 void RtpPacketHistory::MarkPacketAsSent(uint16_t sequence_number) {
200 MutexLock lock(&lock_);
201 if (mode_ == StorageMode::kDisabled) {
202 return;
203 }
204
205 StoredPacket* packet = GetStoredPacket(sequence_number);
206 if (packet == nullptr) {
207 return;
208 }
209
210 // Update send-time, mark as no longer in pacer queue, and increment
211 // transmission count.
212 packet->set_send_time(clock_->CurrentTime());
213 packet->pending_transmission_ = false;
214 packet->IncrementTimesRetransmitted(enable_padding_prio_ ? &padding_priority_
215 : nullptr);
216 }
217
GetPacketState(uint16_t sequence_number) const218 bool RtpPacketHistory::GetPacketState(uint16_t sequence_number) const {
219 MutexLock lock(&lock_);
220 if (mode_ == StorageMode::kDisabled) {
221 return false;
222 }
223
224 int packet_index = GetPacketIndex(sequence_number);
225 if (packet_index < 0 ||
226 static_cast<size_t>(packet_index) >= packet_history_.size()) {
227 return false;
228 }
229 const StoredPacket& packet = packet_history_[packet_index];
230 if (packet.packet_ == nullptr) {
231 return false;
232 }
233
234 if (!VerifyRtt(packet)) {
235 return false;
236 }
237
238 return true;
239 }
240
VerifyRtt(const RtpPacketHistory::StoredPacket & packet) const241 bool RtpPacketHistory::VerifyRtt(
242 const RtpPacketHistory::StoredPacket& packet) const {
243 if (packet.times_retransmitted() > 0 &&
244 clock_->CurrentTime() - packet.send_time() < rtt_) {
245 // This packet has already been retransmitted once, and the time since
246 // that even is lower than on RTT. Ignore request as this packet is
247 // likely already in the network pipe.
248 return false;
249 }
250
251 return true;
252 }
253
GetPayloadPaddingPacket()254 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPayloadPaddingPacket() {
255 // Default implementation always just returns a copy of the packet.
256 return GetPayloadPaddingPacket([](const RtpPacketToSend& packet) {
257 return std::make_unique<RtpPacketToSend>(packet);
258 });
259 }
260
GetPayloadPaddingPacket(rtc::FunctionView<std::unique_ptr<RtpPacketToSend> (const RtpPacketToSend &)> encapsulate)261 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPayloadPaddingPacket(
262 rtc::FunctionView<std::unique_ptr<RtpPacketToSend>(const RtpPacketToSend&)>
263 encapsulate) {
264 MutexLock lock(&lock_);
265 if (mode_ == StorageMode::kDisabled) {
266 return nullptr;
267 }
268
269 StoredPacket* best_packet = nullptr;
270 if (enable_padding_prio_ && !padding_priority_.empty()) {
271 auto best_packet_it = padding_priority_.begin();
272 best_packet = *best_packet_it;
273 } else if (!enable_padding_prio_ && !packet_history_.empty()) {
274 // Prioritization not available, pick the last packet.
275 for (auto it = packet_history_.rbegin(); it != packet_history_.rend();
276 ++it) {
277 if (it->packet_ != nullptr) {
278 best_packet = &(*it);
279 break;
280 }
281 }
282 }
283 if (best_packet == nullptr) {
284 return nullptr;
285 }
286
287 if (best_packet->pending_transmission_) {
288 // Because PacedSender releases it's lock when it calls
289 // GeneratePadding() there is the potential for a race where a new
290 // packet ends up here instead of the regular transmit path. In such a
291 // case, just return empty and it will be picked up on the next
292 // Process() call.
293 return nullptr;
294 }
295
296 auto padding_packet = encapsulate(*best_packet->packet_);
297 if (!padding_packet) {
298 return nullptr;
299 }
300
301 best_packet->set_send_time(clock_->CurrentTime());
302 best_packet->IncrementTimesRetransmitted(
303 enable_padding_prio_ ? &padding_priority_ : nullptr);
304
305 return padding_packet;
306 }
307
CullAcknowledgedPackets(rtc::ArrayView<const uint16_t> sequence_numbers)308 void RtpPacketHistory::CullAcknowledgedPackets(
309 rtc::ArrayView<const uint16_t> sequence_numbers) {
310 MutexLock lock(&lock_);
311 for (uint16_t sequence_number : sequence_numbers) {
312 int packet_index = GetPacketIndex(sequence_number);
313 if (packet_index < 0 ||
314 static_cast<size_t>(packet_index) >= packet_history_.size()) {
315 continue;
316 }
317 RemovePacket(packet_index);
318 }
319 }
320
Clear()321 void RtpPacketHistory::Clear() {
322 MutexLock lock(&lock_);
323 Reset();
324 }
325
Reset()326 void RtpPacketHistory::Reset() {
327 packet_history_.clear();
328 padding_priority_.clear();
329 }
330
CullOldPackets()331 void RtpPacketHistory::CullOldPackets() {
332 Timestamp now = clock_->CurrentTime();
333 TimeDelta packet_duration =
334 rtt_.IsFinite()
335 ? std::max(kMinPacketDurationRtt * rtt_, kMinPacketDuration)
336 : kMinPacketDuration;
337 while (!packet_history_.empty()) {
338 if (packet_history_.size() >= kMaxCapacity) {
339 // We have reached the absolute max capacity, remove one packet
340 // unconditionally.
341 RemovePacket(0);
342 continue;
343 }
344
345 const StoredPacket& stored_packet = packet_history_.front();
346 if (stored_packet.pending_transmission_) {
347 // Don't remove packets in the pacer queue, pending tranmission.
348 return;
349 }
350
351 if (stored_packet.send_time() + packet_duration > now) {
352 // Don't cull packets too early to avoid failed retransmission requests.
353 return;
354 }
355
356 if (packet_history_.size() >= number_to_store_ ||
357 stored_packet.send_time() +
358 (packet_duration * kPacketCullingDelayFactor) <=
359 now) {
360 // Too many packets in history, or this packet has timed out. Remove it
361 // and continue.
362 RemovePacket(0);
363 } else {
364 // No more packets can be removed right now.
365 return;
366 }
367 }
368 }
369
RemovePacket(int packet_index)370 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::RemovePacket(
371 int packet_index) {
372 // Move the packet out from the StoredPacket container.
373 std::unique_ptr<RtpPacketToSend> rtp_packet =
374 std::move(packet_history_[packet_index].packet_);
375
376 // Erase from padding priority set, if eligible.
377 if (enable_padding_prio_) {
378 padding_priority_.erase(&packet_history_[packet_index]);
379 }
380
381 if (packet_index == 0) {
382 while (!packet_history_.empty() &&
383 packet_history_.front().packet_ == nullptr) {
384 packet_history_.pop_front();
385 }
386 }
387
388 return rtp_packet;
389 }
390
GetPacketIndex(uint16_t sequence_number) const391 int RtpPacketHistory::GetPacketIndex(uint16_t sequence_number) const {
392 if (packet_history_.empty()) {
393 return 0;
394 }
395
396 RTC_DCHECK(packet_history_.front().packet_ != nullptr);
397 int first_seq = packet_history_.front().packet_->SequenceNumber();
398 if (first_seq == sequence_number) {
399 return 0;
400 }
401
402 int packet_index = sequence_number - first_seq;
403 constexpr int kSeqNumSpan = std::numeric_limits<uint16_t>::max() + 1;
404
405 if (IsNewerSequenceNumber(sequence_number, first_seq)) {
406 if (sequence_number < first_seq) {
407 // Forward wrap.
408 packet_index += kSeqNumSpan;
409 }
410 } else if (sequence_number > first_seq) {
411 // Backwards wrap.
412 packet_index -= kSeqNumSpan;
413 }
414
415 return packet_index;
416 }
417
GetStoredPacket(uint16_t sequence_number)418 RtpPacketHistory::StoredPacket* RtpPacketHistory::GetStoredPacket(
419 uint16_t sequence_number) {
420 int index = GetPacketIndex(sequence_number);
421 if (index < 0 || static_cast<size_t>(index) >= packet_history_.size() ||
422 packet_history_[index].packet_ == nullptr) {
423 return nullptr;
424 }
425 return &packet_history_[index];
426 }
427
428 } // namespace webrtc
429