xref: /aosp_15_r20/external/webrtc/modules/pacing/prioritized_packet_queue.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/pacing/prioritized_packet_queue.h"
12 
13 #include <utility>
14 
15 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
16 #include "rtc_base/checks.h"
17 
18 namespace webrtc {
19 namespace {
20 
21 constexpr int kAudioPrioLevel = 0;
22 
GetPriorityForType(RtpPacketMediaType type)23 int GetPriorityForType(RtpPacketMediaType type) {
24   // Lower number takes priority over higher.
25   switch (type) {
26     case RtpPacketMediaType::kAudio:
27       // Audio is always prioritized over other packet types.
28       return kAudioPrioLevel;
29     case RtpPacketMediaType::kRetransmission:
30       // Send retransmissions before new media.
31       return kAudioPrioLevel + 1;
32     case RtpPacketMediaType::kVideo:
33     case RtpPacketMediaType::kForwardErrorCorrection:
34       // Video has "normal" priority, in the old speak.
35       // Send redundancy concurrently to video. If it is delayed it might have a
36       // lower chance of being useful.
37       return kAudioPrioLevel + 2;
38     case RtpPacketMediaType::kPadding:
39       // Packets that are in themselves likely useless, only sent to keep the
40       // BWE high.
41       return kAudioPrioLevel + 3;
42   }
43   RTC_CHECK_NOTREACHED();
44 }
45 
46 }  // namespace
47 
PacketSize() const48 DataSize PrioritizedPacketQueue::QueuedPacket::PacketSize() const {
49   return DataSize::Bytes(packet->payload_size() + packet->padding_size());
50 }
51 
StreamQueue(Timestamp creation_time)52 PrioritizedPacketQueue::StreamQueue::StreamQueue(Timestamp creation_time)
53     : last_enqueue_time_(creation_time) {}
54 
EnqueuePacket(QueuedPacket packet,int priority_level)55 bool PrioritizedPacketQueue::StreamQueue::EnqueuePacket(QueuedPacket packet,
56                                                         int priority_level) {
57   bool first_packet_at_level = packets_[priority_level].empty();
58   packets_[priority_level].push_back(std::move(packet));
59   return first_packet_at_level;
60 }
61 
62 PrioritizedPacketQueue::QueuedPacket
DequePacket(int priority_level)63 PrioritizedPacketQueue::StreamQueue::DequePacket(int priority_level) {
64   RTC_DCHECK(!packets_[priority_level].empty());
65   QueuedPacket packet = std::move(packets_[priority_level].front());
66   packets_[priority_level].pop_front();
67   return packet;
68 }
69 
HasPacketsAtPrio(int priority_level) const70 bool PrioritizedPacketQueue::StreamQueue::HasPacketsAtPrio(
71     int priority_level) const {
72   return !packets_[priority_level].empty();
73 }
74 
IsEmpty() const75 bool PrioritizedPacketQueue::StreamQueue::IsEmpty() const {
76   for (const std::deque<QueuedPacket>& queue : packets_) {
77     if (!queue.empty()) {
78       return false;
79     }
80   }
81   return true;
82 }
83 
LeadingPacketEnqueueTime(int priority_level) const84 Timestamp PrioritizedPacketQueue::StreamQueue::LeadingPacketEnqueueTime(
85     int priority_level) const {
86   RTC_DCHECK(!packets_[priority_level].empty());
87   return packets_[priority_level].begin()->enqueue_time;
88 }
89 
LastEnqueueTime() const90 Timestamp PrioritizedPacketQueue::StreamQueue::LastEnqueueTime() const {
91   return last_enqueue_time_;
92 }
93 
PrioritizedPacketQueue(Timestamp creation_time)94 PrioritizedPacketQueue::PrioritizedPacketQueue(Timestamp creation_time)
95     : queue_time_sum_(TimeDelta::Zero()),
96       pause_time_sum_(TimeDelta::Zero()),
97       size_packets_(0),
98       size_packets_per_media_type_({}),
99       size_payload_(DataSize::Zero()),
100       last_update_time_(creation_time),
101       paused_(false),
102       last_culling_time_(creation_time),
103       top_active_prio_level_(-1) {}
104 
Push(Timestamp enqueue_time,std::unique_ptr<RtpPacketToSend> packet)105 void PrioritizedPacketQueue::Push(Timestamp enqueue_time,
106                                   std::unique_ptr<RtpPacketToSend> packet) {
107   StreamQueue* stream_queue;
108   auto [it, inserted] = streams_.emplace(packet->Ssrc(), nullptr);
109   if (inserted) {
110     it->second = std::make_unique<StreamQueue>(enqueue_time);
111   }
112   stream_queue = it->second.get();
113 
114   auto enqueue_time_iterator =
115       enqueue_times_.insert(enqueue_times_.end(), enqueue_time);
116   RTC_DCHECK(packet->packet_type().has_value());
117   RtpPacketMediaType packet_type = packet->packet_type().value();
118   int prio_level = GetPriorityForType(packet_type);
119   RTC_DCHECK_GE(prio_level, 0);
120   RTC_DCHECK_LT(prio_level, kNumPriorityLevels);
121   QueuedPacket queued_packed = {.packet = std::move(packet),
122                                 .enqueue_time = enqueue_time,
123                                 .enqueue_time_iterator = enqueue_time_iterator};
124   // In order to figure out how much time a packet has spent in the queue
125   // while not in a paused state, we subtract the total amount of time the
126   // queue has been paused so far, and when the packet is popped we subtract
127   // the total amount of time the queue has been paused at that moment. This
128   // way we subtract the total amount of time the packet has spent in the
129   // queue while in a paused state.
130   UpdateAverageQueueTime(enqueue_time);
131   queued_packed.enqueue_time -= pause_time_sum_;
132   ++size_packets_;
133   ++size_packets_per_media_type_[static_cast<size_t>(packet_type)];
134   size_payload_ += queued_packed.PacketSize();
135 
136   if (stream_queue->EnqueuePacket(std::move(queued_packed), prio_level)) {
137     // Number packets at `prio_level` for this steam is now non-zero.
138     streams_by_prio_[prio_level].push_back(stream_queue);
139   }
140   if (top_active_prio_level_ < 0 || prio_level < top_active_prio_level_) {
141     top_active_prio_level_ = prio_level;
142   }
143 
144   static constexpr TimeDelta kTimeout = TimeDelta::Millis(500);
145   if (enqueue_time - last_culling_time_ > kTimeout) {
146     for (auto it = streams_.begin(); it != streams_.end();) {
147       if (it->second->IsEmpty() &&
148           it->second->LastEnqueueTime() + kTimeout < enqueue_time) {
149         streams_.erase(it++);
150       } else {
151         ++it;
152       }
153     }
154     last_culling_time_ = enqueue_time;
155   }
156 }
157 
Pop()158 std::unique_ptr<RtpPacketToSend> PrioritizedPacketQueue::Pop() {
159   if (size_packets_ == 0) {
160     return nullptr;
161   }
162 
163   RTC_DCHECK_GE(top_active_prio_level_, 0);
164   StreamQueue& stream_queue = *streams_by_prio_[top_active_prio_level_].front();
165   QueuedPacket packet = stream_queue.DequePacket(top_active_prio_level_);
166   --size_packets_;
167   RTC_DCHECK(packet.packet->packet_type().has_value());
168   RtpPacketMediaType packet_type = packet.packet->packet_type().value();
169   --size_packets_per_media_type_[static_cast<size_t>(packet_type)];
170   RTC_DCHECK_GE(size_packets_per_media_type_[static_cast<size_t>(packet_type)],
171                 0);
172   size_payload_ -= packet.PacketSize();
173 
174   // Calculate the total amount of time spent by this packet in the queue
175   // while in a non-paused state. Note that the `pause_time_sum_ms_` was
176   // subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
177   // by subtracting it now we effectively remove the time spent in in the
178   // queue while in a paused state.
179   TimeDelta time_in_non_paused_state =
180       last_update_time_ - packet.enqueue_time - pause_time_sum_;
181   queue_time_sum_ -= time_in_non_paused_state;
182 
183   // Set the time spent in the send queue, which is the per-packet equivalent of
184   // totalPacketSendDelay. The notion of being paused is an implementation
185   // detail that we do not want to expose, so it makes sense to report the
186   // metric excluding the pause time. This also avoids spikes in the metric.
187   // https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay
188   packet.packet->set_time_in_send_queue(time_in_non_paused_state);
189 
190   RTC_DCHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
191 
192   RTC_CHECK(packet.enqueue_time_iterator != enqueue_times_.end());
193   enqueue_times_.erase(packet.enqueue_time_iterator);
194 
195   // Remove StreamQueue from head of fifo-queue for this prio level, and
196   // and add it to the end if it still has packets.
197   streams_by_prio_[top_active_prio_level_].pop_front();
198   if (stream_queue.HasPacketsAtPrio(top_active_prio_level_)) {
199     streams_by_prio_[top_active_prio_level_].push_back(&stream_queue);
200   } else if (streams_by_prio_[top_active_prio_level_].empty()) {
201     // No stream queues have packets at this prio level, find top priority
202     // that is not empty.
203     if (size_packets_ == 0) {
204       top_active_prio_level_ = -1;
205     } else {
206       for (int i = 0; i < kNumPriorityLevels; ++i) {
207         if (!streams_by_prio_[i].empty()) {
208           top_active_prio_level_ = i;
209           break;
210         }
211       }
212     }
213   }
214 
215   return std::move(packet.packet);
216 }
217 
SizeInPackets() const218 int PrioritizedPacketQueue::SizeInPackets() const {
219   return size_packets_;
220 }
221 
SizeInPayloadBytes() const222 DataSize PrioritizedPacketQueue::SizeInPayloadBytes() const {
223   return size_payload_;
224 }
225 
Empty() const226 bool PrioritizedPacketQueue::Empty() const {
227   return size_packets_ == 0;
228 }
229 
230 const std::array<int, kNumMediaTypes>&
SizeInPacketsPerRtpPacketMediaType() const231 PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const {
232   return size_packets_per_media_type_;
233 }
234 
LeadingPacketEnqueueTime(RtpPacketMediaType type) const235 Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime(
236     RtpPacketMediaType type) const {
237   const int priority_level = GetPriorityForType(type);
238   if (streams_by_prio_[priority_level].empty()) {
239     return Timestamp::MinusInfinity();
240   }
241   return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime(
242       priority_level);
243 }
244 
OldestEnqueueTime() const245 Timestamp PrioritizedPacketQueue::OldestEnqueueTime() const {
246   return enqueue_times_.empty() ? Timestamp::MinusInfinity()
247                                 : enqueue_times_.front();
248 }
249 
AverageQueueTime() const250 TimeDelta PrioritizedPacketQueue::AverageQueueTime() const {
251   if (size_packets_ == 0) {
252     return TimeDelta::Zero();
253   }
254   return queue_time_sum_ / size_packets_;
255 }
256 
UpdateAverageQueueTime(Timestamp now)257 void PrioritizedPacketQueue::UpdateAverageQueueTime(Timestamp now) {
258   RTC_CHECK_GE(now, last_update_time_);
259   if (now == last_update_time_) {
260     return;
261   }
262 
263   TimeDelta delta = now - last_update_time_;
264 
265   if (paused_) {
266     pause_time_sum_ += delta;
267   } else {
268     queue_time_sum_ += delta * size_packets_;
269   }
270 
271   last_update_time_ = now;
272 }
273 
SetPauseState(bool paused,Timestamp now)274 void PrioritizedPacketQueue::SetPauseState(bool paused, Timestamp now) {
275   UpdateAverageQueueTime(now);
276   paused_ = paused;
277 }
278 
279 }  // namespace webrtc
280