1 /*
2 * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/pacing/prioritized_packet_queue.h"
12
13 #include <utility>
14
15 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
16 #include "rtc_base/checks.h"
17
18 namespace webrtc {
19 namespace {
20
21 constexpr int kAudioPrioLevel = 0;
22
GetPriorityForType(RtpPacketMediaType type)23 int GetPriorityForType(RtpPacketMediaType type) {
24 // Lower number takes priority over higher.
25 switch (type) {
26 case RtpPacketMediaType::kAudio:
27 // Audio is always prioritized over other packet types.
28 return kAudioPrioLevel;
29 case RtpPacketMediaType::kRetransmission:
30 // Send retransmissions before new media.
31 return kAudioPrioLevel + 1;
32 case RtpPacketMediaType::kVideo:
33 case RtpPacketMediaType::kForwardErrorCorrection:
34 // Video has "normal" priority, in the old speak.
35 // Send redundancy concurrently to video. If it is delayed it might have a
36 // lower chance of being useful.
37 return kAudioPrioLevel + 2;
38 case RtpPacketMediaType::kPadding:
39 // Packets that are in themselves likely useless, only sent to keep the
40 // BWE high.
41 return kAudioPrioLevel + 3;
42 }
43 RTC_CHECK_NOTREACHED();
44 }
45
46 } // namespace
47
PacketSize() const48 DataSize PrioritizedPacketQueue::QueuedPacket::PacketSize() const {
49 return DataSize::Bytes(packet->payload_size() + packet->padding_size());
50 }
51
StreamQueue(Timestamp creation_time)52 PrioritizedPacketQueue::StreamQueue::StreamQueue(Timestamp creation_time)
53 : last_enqueue_time_(creation_time) {}
54
EnqueuePacket(QueuedPacket packet,int priority_level)55 bool PrioritizedPacketQueue::StreamQueue::EnqueuePacket(QueuedPacket packet,
56 int priority_level) {
57 bool first_packet_at_level = packets_[priority_level].empty();
58 packets_[priority_level].push_back(std::move(packet));
59 return first_packet_at_level;
60 }
61
62 PrioritizedPacketQueue::QueuedPacket
DequePacket(int priority_level)63 PrioritizedPacketQueue::StreamQueue::DequePacket(int priority_level) {
64 RTC_DCHECK(!packets_[priority_level].empty());
65 QueuedPacket packet = std::move(packets_[priority_level].front());
66 packets_[priority_level].pop_front();
67 return packet;
68 }
69
HasPacketsAtPrio(int priority_level) const70 bool PrioritizedPacketQueue::StreamQueue::HasPacketsAtPrio(
71 int priority_level) const {
72 return !packets_[priority_level].empty();
73 }
74
IsEmpty() const75 bool PrioritizedPacketQueue::StreamQueue::IsEmpty() const {
76 for (const std::deque<QueuedPacket>& queue : packets_) {
77 if (!queue.empty()) {
78 return false;
79 }
80 }
81 return true;
82 }
83
LeadingPacketEnqueueTime(int priority_level) const84 Timestamp PrioritizedPacketQueue::StreamQueue::LeadingPacketEnqueueTime(
85 int priority_level) const {
86 RTC_DCHECK(!packets_[priority_level].empty());
87 return packets_[priority_level].begin()->enqueue_time;
88 }
89
LastEnqueueTime() const90 Timestamp PrioritizedPacketQueue::StreamQueue::LastEnqueueTime() const {
91 return last_enqueue_time_;
92 }
93
PrioritizedPacketQueue(Timestamp creation_time)94 PrioritizedPacketQueue::PrioritizedPacketQueue(Timestamp creation_time)
95 : queue_time_sum_(TimeDelta::Zero()),
96 pause_time_sum_(TimeDelta::Zero()),
97 size_packets_(0),
98 size_packets_per_media_type_({}),
99 size_payload_(DataSize::Zero()),
100 last_update_time_(creation_time),
101 paused_(false),
102 last_culling_time_(creation_time),
103 top_active_prio_level_(-1) {}
104
Push(Timestamp enqueue_time,std::unique_ptr<RtpPacketToSend> packet)105 void PrioritizedPacketQueue::Push(Timestamp enqueue_time,
106 std::unique_ptr<RtpPacketToSend> packet) {
107 StreamQueue* stream_queue;
108 auto [it, inserted] = streams_.emplace(packet->Ssrc(), nullptr);
109 if (inserted) {
110 it->second = std::make_unique<StreamQueue>(enqueue_time);
111 }
112 stream_queue = it->second.get();
113
114 auto enqueue_time_iterator =
115 enqueue_times_.insert(enqueue_times_.end(), enqueue_time);
116 RTC_DCHECK(packet->packet_type().has_value());
117 RtpPacketMediaType packet_type = packet->packet_type().value();
118 int prio_level = GetPriorityForType(packet_type);
119 RTC_DCHECK_GE(prio_level, 0);
120 RTC_DCHECK_LT(prio_level, kNumPriorityLevels);
121 QueuedPacket queued_packed = {.packet = std::move(packet),
122 .enqueue_time = enqueue_time,
123 .enqueue_time_iterator = enqueue_time_iterator};
124 // In order to figure out how much time a packet has spent in the queue
125 // while not in a paused state, we subtract the total amount of time the
126 // queue has been paused so far, and when the packet is popped we subtract
127 // the total amount of time the queue has been paused at that moment. This
128 // way we subtract the total amount of time the packet has spent in the
129 // queue while in a paused state.
130 UpdateAverageQueueTime(enqueue_time);
131 queued_packed.enqueue_time -= pause_time_sum_;
132 ++size_packets_;
133 ++size_packets_per_media_type_[static_cast<size_t>(packet_type)];
134 size_payload_ += queued_packed.PacketSize();
135
136 if (stream_queue->EnqueuePacket(std::move(queued_packed), prio_level)) {
137 // Number packets at `prio_level` for this steam is now non-zero.
138 streams_by_prio_[prio_level].push_back(stream_queue);
139 }
140 if (top_active_prio_level_ < 0 || prio_level < top_active_prio_level_) {
141 top_active_prio_level_ = prio_level;
142 }
143
144 static constexpr TimeDelta kTimeout = TimeDelta::Millis(500);
145 if (enqueue_time - last_culling_time_ > kTimeout) {
146 for (auto it = streams_.begin(); it != streams_.end();) {
147 if (it->second->IsEmpty() &&
148 it->second->LastEnqueueTime() + kTimeout < enqueue_time) {
149 streams_.erase(it++);
150 } else {
151 ++it;
152 }
153 }
154 last_culling_time_ = enqueue_time;
155 }
156 }
157
Pop()158 std::unique_ptr<RtpPacketToSend> PrioritizedPacketQueue::Pop() {
159 if (size_packets_ == 0) {
160 return nullptr;
161 }
162
163 RTC_DCHECK_GE(top_active_prio_level_, 0);
164 StreamQueue& stream_queue = *streams_by_prio_[top_active_prio_level_].front();
165 QueuedPacket packet = stream_queue.DequePacket(top_active_prio_level_);
166 --size_packets_;
167 RTC_DCHECK(packet.packet->packet_type().has_value());
168 RtpPacketMediaType packet_type = packet.packet->packet_type().value();
169 --size_packets_per_media_type_[static_cast<size_t>(packet_type)];
170 RTC_DCHECK_GE(size_packets_per_media_type_[static_cast<size_t>(packet_type)],
171 0);
172 size_payload_ -= packet.PacketSize();
173
174 // Calculate the total amount of time spent by this packet in the queue
175 // while in a non-paused state. Note that the `pause_time_sum_ms_` was
176 // subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
177 // by subtracting it now we effectively remove the time spent in in the
178 // queue while in a paused state.
179 TimeDelta time_in_non_paused_state =
180 last_update_time_ - packet.enqueue_time - pause_time_sum_;
181 queue_time_sum_ -= time_in_non_paused_state;
182
183 // Set the time spent in the send queue, which is the per-packet equivalent of
184 // totalPacketSendDelay. The notion of being paused is an implementation
185 // detail that we do not want to expose, so it makes sense to report the
186 // metric excluding the pause time. This also avoids spikes in the metric.
187 // https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay
188 packet.packet->set_time_in_send_queue(time_in_non_paused_state);
189
190 RTC_DCHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
191
192 RTC_CHECK(packet.enqueue_time_iterator != enqueue_times_.end());
193 enqueue_times_.erase(packet.enqueue_time_iterator);
194
195 // Remove StreamQueue from head of fifo-queue for this prio level, and
196 // and add it to the end if it still has packets.
197 streams_by_prio_[top_active_prio_level_].pop_front();
198 if (stream_queue.HasPacketsAtPrio(top_active_prio_level_)) {
199 streams_by_prio_[top_active_prio_level_].push_back(&stream_queue);
200 } else if (streams_by_prio_[top_active_prio_level_].empty()) {
201 // No stream queues have packets at this prio level, find top priority
202 // that is not empty.
203 if (size_packets_ == 0) {
204 top_active_prio_level_ = -1;
205 } else {
206 for (int i = 0; i < kNumPriorityLevels; ++i) {
207 if (!streams_by_prio_[i].empty()) {
208 top_active_prio_level_ = i;
209 break;
210 }
211 }
212 }
213 }
214
215 return std::move(packet.packet);
216 }
217
SizeInPackets() const218 int PrioritizedPacketQueue::SizeInPackets() const {
219 return size_packets_;
220 }
221
SizeInPayloadBytes() const222 DataSize PrioritizedPacketQueue::SizeInPayloadBytes() const {
223 return size_payload_;
224 }
225
Empty() const226 bool PrioritizedPacketQueue::Empty() const {
227 return size_packets_ == 0;
228 }
229
230 const std::array<int, kNumMediaTypes>&
SizeInPacketsPerRtpPacketMediaType() const231 PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const {
232 return size_packets_per_media_type_;
233 }
234
LeadingPacketEnqueueTime(RtpPacketMediaType type) const235 Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime(
236 RtpPacketMediaType type) const {
237 const int priority_level = GetPriorityForType(type);
238 if (streams_by_prio_[priority_level].empty()) {
239 return Timestamp::MinusInfinity();
240 }
241 return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime(
242 priority_level);
243 }
244
OldestEnqueueTime() const245 Timestamp PrioritizedPacketQueue::OldestEnqueueTime() const {
246 return enqueue_times_.empty() ? Timestamp::MinusInfinity()
247 : enqueue_times_.front();
248 }
249
AverageQueueTime() const250 TimeDelta PrioritizedPacketQueue::AverageQueueTime() const {
251 if (size_packets_ == 0) {
252 return TimeDelta::Zero();
253 }
254 return queue_time_sum_ / size_packets_;
255 }
256
UpdateAverageQueueTime(Timestamp now)257 void PrioritizedPacketQueue::UpdateAverageQueueTime(Timestamp now) {
258 RTC_CHECK_GE(now, last_update_time_);
259 if (now == last_update_time_) {
260 return;
261 }
262
263 TimeDelta delta = now - last_update_time_;
264
265 if (paused_) {
266 pause_time_sum_ += delta;
267 } else {
268 queue_time_sum_ += delta * size_packets_;
269 }
270
271 last_update_time_ = now;
272 }
273
SetPauseState(bool paused,Timestamp now)274 void PrioritizedPacketQueue::SetPauseState(bool paused, Timestamp now) {
275 UpdateAverageQueueTime(now);
276 paused_ = paused;
277 }
278
279 } // namespace webrtc
280