xref: /aosp_15_r20/external/webrtc/modules/rtp_rtcp/source/rtp_packet_history.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/rtp_rtcp/source/rtp_packet_history.h"
12 
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16 #include <utility>
17 
18 #include "modules/include/module_common_types_public.h"
19 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
20 #include "rtc_base/checks.h"
21 #include "rtc_base/logging.h"
22 #include "system_wrappers/include/clock.h"
23 
24 namespace webrtc {
25 
StoredPacket(std::unique_ptr<RtpPacketToSend> packet,Timestamp send_time,uint64_t insert_order)26 RtpPacketHistory::StoredPacket::StoredPacket(
27     std::unique_ptr<RtpPacketToSend> packet,
28     Timestamp send_time,
29     uint64_t insert_order)
30     : packet_(std::move(packet)),
31       pending_transmission_(false),
32       send_time_(send_time),
33       insert_order_(insert_order),
34       times_retransmitted_(0) {}
35 
36 RtpPacketHistory::StoredPacket::StoredPacket(StoredPacket&&) = default;
37 RtpPacketHistory::StoredPacket& RtpPacketHistory::StoredPacket::operator=(
38     RtpPacketHistory::StoredPacket&&) = default;
39 RtpPacketHistory::StoredPacket::~StoredPacket() = default;
40 
IncrementTimesRetransmitted(PacketPrioritySet * priority_set)41 void RtpPacketHistory::StoredPacket::IncrementTimesRetransmitted(
42     PacketPrioritySet* priority_set) {
43   // Check if this StoredPacket is in the priority set. If so, we need to remove
44   // it before updating `times_retransmitted_` since that is used in sorting,
45   // and then add it back.
46   const bool in_priority_set = priority_set && priority_set->erase(this) > 0;
47   ++times_retransmitted_;
48   if (in_priority_set) {
49     auto it = priority_set->insert(this);
50     RTC_DCHECK(it.second)
51         << "ERROR: Priority set already contains matching packet! In set: "
52            "insert order = "
53         << (*it.first)->insert_order_
54         << ", times retransmitted = " << (*it.first)->times_retransmitted_
55         << ". Trying to add: insert order = " << insert_order_
56         << ", times retransmitted = " << times_retransmitted_;
57   }
58 }
59 
operator ()(StoredPacket * lhs,StoredPacket * rhs) const60 bool RtpPacketHistory::MoreUseful::operator()(StoredPacket* lhs,
61                                               StoredPacket* rhs) const {
62   // Prefer to send packets we haven't already sent as padding.
63   if (lhs->times_retransmitted() != rhs->times_retransmitted()) {
64     return lhs->times_retransmitted() < rhs->times_retransmitted();
65   }
66   // All else being equal, prefer newer packets.
67   return lhs->insert_order() > rhs->insert_order();
68 }
69 
RtpPacketHistory(Clock * clock,bool enable_padding_prio)70 RtpPacketHistory::RtpPacketHistory(Clock* clock, bool enable_padding_prio)
71     : clock_(clock),
72       enable_padding_prio_(enable_padding_prio),
73       number_to_store_(0),
74       mode_(StorageMode::kDisabled),
75       rtt_(TimeDelta::MinusInfinity()),
76       packets_inserted_(0) {}
77 
~RtpPacketHistory()78 RtpPacketHistory::~RtpPacketHistory() {}
79 
SetStorePacketsStatus(StorageMode mode,size_t number_to_store)80 void RtpPacketHistory::SetStorePacketsStatus(StorageMode mode,
81                                              size_t number_to_store) {
82   RTC_DCHECK_LE(number_to_store, kMaxCapacity);
83   MutexLock lock(&lock_);
84   if (mode != StorageMode::kDisabled && mode_ != StorageMode::kDisabled) {
85     RTC_LOG(LS_WARNING) << "Purging packet history in order to re-set status.";
86   }
87   Reset();
88   mode_ = mode;
89   number_to_store_ = std::min(kMaxCapacity, number_to_store);
90 }
91 
GetStorageMode() const92 RtpPacketHistory::StorageMode RtpPacketHistory::GetStorageMode() const {
93   MutexLock lock(&lock_);
94   return mode_;
95 }
96 
SetRtt(TimeDelta rtt)97 void RtpPacketHistory::SetRtt(TimeDelta rtt) {
98   MutexLock lock(&lock_);
99   RTC_DCHECK_GE(rtt, TimeDelta::Zero());
100   rtt_ = rtt;
101   // If storage is not disabled,  packets will be removed after a timeout
102   // that depends on the RTT. Changing the RTT may thus cause some packets
103   // become "old" and subject to removal.
104   if (mode_ != StorageMode::kDisabled) {
105     CullOldPackets();
106   }
107 }
108 
PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet,Timestamp send_time)109 void RtpPacketHistory::PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
110                                     Timestamp send_time) {
111   RTC_DCHECK(packet);
112   MutexLock lock(&lock_);
113   if (mode_ == StorageMode::kDisabled) {
114     return;
115   }
116 
117   RTC_DCHECK(packet->allow_retransmission());
118   CullOldPackets();
119 
120   // Store packet.
121   const uint16_t rtp_seq_no = packet->SequenceNumber();
122   int packet_index = GetPacketIndex(rtp_seq_no);
123   if (packet_index >= 0 &&
124       static_cast<size_t>(packet_index) < packet_history_.size() &&
125       packet_history_[packet_index].packet_ != nullptr) {
126     RTC_LOG(LS_WARNING) << "Duplicate packet inserted: " << rtp_seq_no;
127     // Remove previous packet to avoid inconsistent state.
128     RemovePacket(packet_index);
129     packet_index = GetPacketIndex(rtp_seq_no);
130   }
131 
132   // Packet to be inserted ahead of first packet, expand front.
133   for (; packet_index < 0; ++packet_index) {
134     packet_history_.emplace_front();
135   }
136   // Packet to be inserted behind last packet, expand back.
137   while (static_cast<int>(packet_history_.size()) <= packet_index) {
138     packet_history_.emplace_back();
139   }
140 
141   RTC_DCHECK_GE(packet_index, 0);
142   RTC_DCHECK_LT(packet_index, packet_history_.size());
143   RTC_DCHECK(packet_history_[packet_index].packet_ == nullptr);
144 
145   packet_history_[packet_index] =
146       StoredPacket(std::move(packet), send_time, packets_inserted_++);
147 
148   if (enable_padding_prio_) {
149     if (padding_priority_.size() >= kMaxPaddingHistory - 1) {
150       padding_priority_.erase(std::prev(padding_priority_.end()));
151     }
152     auto prio_it = padding_priority_.insert(&packet_history_[packet_index]);
153     RTC_DCHECK(prio_it.second) << "Failed to insert packet into prio set.";
154   }
155 }
156 
GetPacketAndMarkAsPending(uint16_t sequence_number)157 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending(
158     uint16_t sequence_number) {
159   return GetPacketAndMarkAsPending(
160       sequence_number, [](const RtpPacketToSend& packet) {
161         return std::make_unique<RtpPacketToSend>(packet);
162       });
163 }
164 
GetPacketAndMarkAsPending(uint16_t sequence_number,rtc::FunctionView<std::unique_ptr<RtpPacketToSend> (const RtpPacketToSend &)> encapsulate)165 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPacketAndMarkAsPending(
166     uint16_t sequence_number,
167     rtc::FunctionView<std::unique_ptr<RtpPacketToSend>(const RtpPacketToSend&)>
168         encapsulate) {
169   MutexLock lock(&lock_);
170   if (mode_ == StorageMode::kDisabled) {
171     return nullptr;
172   }
173 
174   StoredPacket* packet = GetStoredPacket(sequence_number);
175   if (packet == nullptr) {
176     return nullptr;
177   }
178 
179   if (packet->pending_transmission_) {
180     // Packet already in pacer queue, ignore this request.
181     return nullptr;
182   }
183 
184   if (!VerifyRtt(*packet)) {
185     // Packet already resent within too short a time window, ignore.
186     return nullptr;
187   }
188 
189   // Copy and/or encapsulate packet.
190   std::unique_ptr<RtpPacketToSend> encapsulated_packet =
191       encapsulate(*packet->packet_);
192   if (encapsulated_packet) {
193     packet->pending_transmission_ = true;
194   }
195 
196   return encapsulated_packet;
197 }
198 
MarkPacketAsSent(uint16_t sequence_number)199 void RtpPacketHistory::MarkPacketAsSent(uint16_t sequence_number) {
200   MutexLock lock(&lock_);
201   if (mode_ == StorageMode::kDisabled) {
202     return;
203   }
204 
205   StoredPacket* packet = GetStoredPacket(sequence_number);
206   if (packet == nullptr) {
207     return;
208   }
209 
210   // Update send-time, mark as no longer in pacer queue, and increment
211   // transmission count.
212   packet->set_send_time(clock_->CurrentTime());
213   packet->pending_transmission_ = false;
214   packet->IncrementTimesRetransmitted(enable_padding_prio_ ? &padding_priority_
215                                                            : nullptr);
216 }
217 
GetPacketState(uint16_t sequence_number) const218 bool RtpPacketHistory::GetPacketState(uint16_t sequence_number) const {
219   MutexLock lock(&lock_);
220   if (mode_ == StorageMode::kDisabled) {
221     return false;
222   }
223 
224   int packet_index = GetPacketIndex(sequence_number);
225   if (packet_index < 0 ||
226       static_cast<size_t>(packet_index) >= packet_history_.size()) {
227     return false;
228   }
229   const StoredPacket& packet = packet_history_[packet_index];
230   if (packet.packet_ == nullptr) {
231     return false;
232   }
233 
234   if (!VerifyRtt(packet)) {
235     return false;
236   }
237 
238   return true;
239 }
240 
VerifyRtt(const RtpPacketHistory::StoredPacket & packet) const241 bool RtpPacketHistory::VerifyRtt(
242     const RtpPacketHistory::StoredPacket& packet) const {
243   if (packet.times_retransmitted() > 0 &&
244       clock_->CurrentTime() - packet.send_time() < rtt_) {
245     // This packet has already been retransmitted once, and the time since
246     // that even is lower than on RTT. Ignore request as this packet is
247     // likely already in the network pipe.
248     return false;
249   }
250 
251   return true;
252 }
253 
GetPayloadPaddingPacket()254 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPayloadPaddingPacket() {
255   // Default implementation always just returns a copy of the packet.
256   return GetPayloadPaddingPacket([](const RtpPacketToSend& packet) {
257     return std::make_unique<RtpPacketToSend>(packet);
258   });
259 }
260 
GetPayloadPaddingPacket(rtc::FunctionView<std::unique_ptr<RtpPacketToSend> (const RtpPacketToSend &)> encapsulate)261 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetPayloadPaddingPacket(
262     rtc::FunctionView<std::unique_ptr<RtpPacketToSend>(const RtpPacketToSend&)>
263         encapsulate) {
264   MutexLock lock(&lock_);
265   if (mode_ == StorageMode::kDisabled) {
266     return nullptr;
267   }
268 
269   StoredPacket* best_packet = nullptr;
270   if (enable_padding_prio_ && !padding_priority_.empty()) {
271     auto best_packet_it = padding_priority_.begin();
272     best_packet = *best_packet_it;
273   } else if (!enable_padding_prio_ && !packet_history_.empty()) {
274     // Prioritization not available, pick the last packet.
275     for (auto it = packet_history_.rbegin(); it != packet_history_.rend();
276          ++it) {
277       if (it->packet_ != nullptr) {
278         best_packet = &(*it);
279         break;
280       }
281     }
282   }
283   if (best_packet == nullptr) {
284     return nullptr;
285   }
286 
287   if (best_packet->pending_transmission_) {
288     // Because PacedSender releases it's lock when it calls
289     // GeneratePadding() there is the potential for a race where a new
290     // packet ends up here instead of the regular transmit path. In such a
291     // case, just return empty and it will be picked up on the next
292     // Process() call.
293     return nullptr;
294   }
295 
296   auto padding_packet = encapsulate(*best_packet->packet_);
297   if (!padding_packet) {
298     return nullptr;
299   }
300 
301   best_packet->set_send_time(clock_->CurrentTime());
302   best_packet->IncrementTimesRetransmitted(
303       enable_padding_prio_ ? &padding_priority_ : nullptr);
304 
305   return padding_packet;
306 }
307 
CullAcknowledgedPackets(rtc::ArrayView<const uint16_t> sequence_numbers)308 void RtpPacketHistory::CullAcknowledgedPackets(
309     rtc::ArrayView<const uint16_t> sequence_numbers) {
310   MutexLock lock(&lock_);
311   for (uint16_t sequence_number : sequence_numbers) {
312     int packet_index = GetPacketIndex(sequence_number);
313     if (packet_index < 0 ||
314         static_cast<size_t>(packet_index) >= packet_history_.size()) {
315       continue;
316     }
317     RemovePacket(packet_index);
318   }
319 }
320 
Clear()321 void RtpPacketHistory::Clear() {
322   MutexLock lock(&lock_);
323   Reset();
324 }
325 
Reset()326 void RtpPacketHistory::Reset() {
327   packet_history_.clear();
328   padding_priority_.clear();
329 }
330 
CullOldPackets()331 void RtpPacketHistory::CullOldPackets() {
332   Timestamp now = clock_->CurrentTime();
333   TimeDelta packet_duration =
334       rtt_.IsFinite()
335           ? std::max(kMinPacketDurationRtt * rtt_, kMinPacketDuration)
336           : kMinPacketDuration;
337   while (!packet_history_.empty()) {
338     if (packet_history_.size() >= kMaxCapacity) {
339       // We have reached the absolute max capacity, remove one packet
340       // unconditionally.
341       RemovePacket(0);
342       continue;
343     }
344 
345     const StoredPacket& stored_packet = packet_history_.front();
346     if (stored_packet.pending_transmission_) {
347       // Don't remove packets in the pacer queue, pending tranmission.
348       return;
349     }
350 
351     if (stored_packet.send_time() + packet_duration > now) {
352       // Don't cull packets too early to avoid failed retransmission requests.
353       return;
354     }
355 
356     if (packet_history_.size() >= number_to_store_ ||
357         stored_packet.send_time() +
358                 (packet_duration * kPacketCullingDelayFactor) <=
359             now) {
360       // Too many packets in history, or this packet has timed out. Remove it
361       // and continue.
362       RemovePacket(0);
363     } else {
364       // No more packets can be removed right now.
365       return;
366     }
367   }
368 }
369 
RemovePacket(int packet_index)370 std::unique_ptr<RtpPacketToSend> RtpPacketHistory::RemovePacket(
371     int packet_index) {
372   // Move the packet out from the StoredPacket container.
373   std::unique_ptr<RtpPacketToSend> rtp_packet =
374       std::move(packet_history_[packet_index].packet_);
375 
376   // Erase from padding priority set, if eligible.
377   if (enable_padding_prio_) {
378     padding_priority_.erase(&packet_history_[packet_index]);
379   }
380 
381   if (packet_index == 0) {
382     while (!packet_history_.empty() &&
383            packet_history_.front().packet_ == nullptr) {
384       packet_history_.pop_front();
385     }
386   }
387 
388   return rtp_packet;
389 }
390 
GetPacketIndex(uint16_t sequence_number) const391 int RtpPacketHistory::GetPacketIndex(uint16_t sequence_number) const {
392   if (packet_history_.empty()) {
393     return 0;
394   }
395 
396   RTC_DCHECK(packet_history_.front().packet_ != nullptr);
397   int first_seq = packet_history_.front().packet_->SequenceNumber();
398   if (first_seq == sequence_number) {
399     return 0;
400   }
401 
402   int packet_index = sequence_number - first_seq;
403   constexpr int kSeqNumSpan = std::numeric_limits<uint16_t>::max() + 1;
404 
405   if (IsNewerSequenceNumber(sequence_number, first_seq)) {
406     if (sequence_number < first_seq) {
407       // Forward wrap.
408       packet_index += kSeqNumSpan;
409     }
410   } else if (sequence_number > first_seq) {
411     // Backwards wrap.
412     packet_index -= kSeqNumSpan;
413   }
414 
415   return packet_index;
416 }
417 
GetStoredPacket(uint16_t sequence_number)418 RtpPacketHistory::StoredPacket* RtpPacketHistory::GetStoredPacket(
419     uint16_t sequence_number) {
420   int index = GetPacketIndex(sequence_number);
421   if (index < 0 || static_cast<size_t>(index) >= packet_history_.size() ||
422       packet_history_[index].packet_ == nullptr) {
423     return nullptr;
424   }
425   return &packet_history_[index];
426 }
427 
428 }  // namespace webrtc
429