1 /*
2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/rtp_sender_egress.h"
12
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16 #include <utility>
17
18 #include "absl/strings/match.h"
19 #include "api/transport/field_trial_based_config.h"
20 #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h"
21 #include "rtc_base/logging.h"
22
23 namespace webrtc {
24 namespace {
25 constexpr uint32_t kTimestampTicksPerMs = 90;
26 constexpr int kSendSideDelayWindowMs = 1000;
27 constexpr int kBitrateStatisticsWindowMs = 1000;
28 constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13;
29 constexpr TimeDelta kUpdateInterval =
30 TimeDelta::Millis(kBitrateStatisticsWindowMs);
31 } // namespace
32
NonPacedPacketSender(RtpSenderEgress * sender,PacketSequencer * sequencer)33 RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender(
34 RtpSenderEgress* sender,
35 PacketSequencer* sequencer)
36 : transport_sequence_number_(0), sender_(sender), sequencer_(sequencer) {
37 RTC_DCHECK(sequencer);
38 }
39 RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() = default;
40
EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets)41 void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets(
42 std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
43 for (auto& packet : packets) {
44 PrepareForSend(packet.get());
45 sender_->SendPacket(packet.get(), PacedPacketInfo());
46 }
47 auto fec_packets = sender_->FetchFecPackets();
48 if (!fec_packets.empty()) {
49 EnqueuePackets(std::move(fec_packets));
50 }
51 }
52
PrepareForSend(RtpPacketToSend * packet)53 void RtpSenderEgress::NonPacedPacketSender::PrepareForSend(
54 RtpPacketToSend* packet) {
55 // Assign sequence numbers, but not for flexfec which is already running on
56 // an internally maintained sequence number series.
57 if (packet->Ssrc() != sender_->FlexFecSsrc()) {
58 sequencer_->Sequence(*packet);
59 }
60 if (!packet->SetExtension<TransportSequenceNumber>(
61 ++transport_sequence_number_)) {
62 --transport_sequence_number_;
63 }
64 packet->ReserveExtension<TransmissionOffset>();
65 packet->ReserveExtension<AbsoluteSendTime>();
66 }
67
RtpSenderEgress(const RtpRtcpInterface::Configuration & config,RtpPacketHistory * packet_history)68 RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
69 RtpPacketHistory* packet_history)
70 : worker_queue_(TaskQueueBase::Current()),
71 ssrc_(config.local_media_ssrc),
72 rtx_ssrc_(config.rtx_send_ssrc),
73 flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc()
74 : absl::nullopt),
75 populate_network2_timestamp_(config.populate_network2_timestamp),
76 clock_(config.clock),
77 packet_history_(packet_history),
78 transport_(config.outgoing_transport),
79 event_log_(config.event_log),
80 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
81 is_audio_(config.audio),
82 #endif
83 need_rtp_packet_infos_(config.need_rtp_packet_infos),
84 fec_generator_(config.fec_generator),
85 transport_feedback_observer_(config.transport_feedback_callback),
86 send_side_delay_observer_(config.send_side_delay_observer),
87 send_packet_observer_(config.send_packet_observer),
88 rtp_stats_callback_(config.rtp_stats_callback),
89 bitrate_callback_(config.send_bitrate_observer),
90 media_has_been_sent_(false),
91 force_part_of_allocation_(false),
92 timestamp_offset_(0),
93 max_delay_it_(send_delays_.end()),
94 sum_delays_ms_(0),
95 send_rates_(kNumMediaTypes,
96 {kBitrateStatisticsWindowMs, RateStatistics::kBpsScale}),
97 rtp_sequence_number_map_(need_rtp_packet_infos_
98 ? std::make_unique<RtpSequenceNumberMap>(
99 kRtpSequenceNumberMapMaxEntries)
100 : nullptr) {
101 RTC_DCHECK(worker_queue_);
102 pacer_checker_.Detach();
103 if (bitrate_callback_) {
104 update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_,
__anona0a773940202() 105 kUpdateInterval, [this]() {
106 PeriodicUpdate();
107 return kUpdateInterval;
108 });
109 }
110 }
111
~RtpSenderEgress()112 RtpSenderEgress::~RtpSenderEgress() {
113 RTC_DCHECK_RUN_ON(worker_queue_);
114 update_task_.Stop();
115 }
116
SendPacket(RtpPacketToSend * packet,const PacedPacketInfo & pacing_info)117 void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
118 const PacedPacketInfo& pacing_info) {
119 RTC_DCHECK_RUN_ON(&pacer_checker_);
120 RTC_DCHECK(packet);
121
122 if (packet->Ssrc() == ssrc_ &&
123 packet->packet_type() != RtpPacketMediaType::kRetransmission) {
124 if (last_sent_seq_.has_value()) {
125 RTC_DCHECK_EQ(static_cast<uint16_t>(*last_sent_seq_ + 1),
126 packet->SequenceNumber());
127 }
128 last_sent_seq_ = packet->SequenceNumber();
129 } else if (packet->Ssrc() == rtx_ssrc_) {
130 if (last_sent_rtx_seq_.has_value()) {
131 RTC_DCHECK_EQ(static_cast<uint16_t>(*last_sent_rtx_seq_ + 1),
132 packet->SequenceNumber());
133 }
134 last_sent_rtx_seq_ = packet->SequenceNumber();
135 }
136
137 RTC_DCHECK(packet->packet_type().has_value());
138 RTC_DCHECK(HasCorrectSsrc(*packet));
139 if (packet->packet_type() == RtpPacketMediaType::kRetransmission) {
140 RTC_DCHECK(packet->retransmitted_sequence_number().has_value());
141 }
142
143 const uint32_t packet_ssrc = packet->Ssrc();
144 const Timestamp now = clock_->CurrentTime();
145
146 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
147 worker_queue_->PostTask(
148 SafeTask(task_safety_.flag(), [this, now, packet_ssrc]() {
149 BweTestLoggingPlot(now.ms(), packet_ssrc);
150 }));
151 #endif
152
153 if (need_rtp_packet_infos_ &&
154 packet->packet_type() == RtpPacketToSend::Type::kVideo) {
155 worker_queue_->PostTask(SafeTask(
156 task_safety_.flag(),
157 [this, packet_timestamp = packet->Timestamp(),
158 is_first_packet_of_frame = packet->is_first_packet_of_frame(),
159 is_last_packet_of_frame = packet->Marker(),
160 sequence_number = packet->SequenceNumber()]() {
161 RTC_DCHECK_RUN_ON(worker_queue_);
162 // Last packet of a frame, add it to sequence number info map.
163 const uint32_t timestamp = packet_timestamp - timestamp_offset_;
164 rtp_sequence_number_map_->InsertPacket(
165 sequence_number,
166 RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame,
167 is_last_packet_of_frame));
168 }));
169 }
170
171 if (fec_generator_ && packet->fec_protect_packet()) {
172 // This packet should be protected by FEC, add it to packet generator.
173 RTC_DCHECK(fec_generator_);
174 RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kVideo);
175 absl::optional<std::pair<FecProtectionParams, FecProtectionParams>>
176 new_fec_params;
177 {
178 MutexLock lock(&lock_);
179 new_fec_params.swap(pending_fec_params_);
180 }
181 if (new_fec_params) {
182 fec_generator_->SetProtectionParameters(new_fec_params->first,
183 new_fec_params->second);
184 }
185 if (packet->is_red()) {
186 RtpPacketToSend unpacked_packet(*packet);
187
188 const rtc::CopyOnWriteBuffer buffer = packet->Buffer();
189 // Grab media payload type from RED header.
190 const size_t headers_size = packet->headers_size();
191 unpacked_packet.SetPayloadType(buffer[headers_size]);
192
193 // Copy the media payload into the unpacked buffer.
194 uint8_t* payload_buffer =
195 unpacked_packet.SetPayloadSize(packet->payload_size() - 1);
196 std::copy(&packet->payload()[0] + 1,
197 &packet->payload()[0] + packet->payload_size(), payload_buffer);
198
199 fec_generator_->AddPacketAndGenerateFec(unpacked_packet);
200 } else {
201 // If not RED encapsulated - we can just insert packet directly.
202 fec_generator_->AddPacketAndGenerateFec(*packet);
203 }
204 }
205
206 // Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after
207 // the pacer, these modifications of the header below are happening after the
208 // FEC protection packets are calculated. This will corrupt recovered packets
209 // at the same place. It's not an issue for extensions, which are present in
210 // all the packets (their content just may be incorrect on recovered packets).
211 // In case of VideoTimingExtension, since it's present not in every packet,
212 // data after rtp header may be corrupted if these packets are protected by
213 // the FEC.
214 TimeDelta diff = now - packet->capture_time();
215 if (packet->HasExtension<TransmissionOffset>()) {
216 packet->SetExtension<TransmissionOffset>(kTimestampTicksPerMs * diff.ms());
217 }
218 if (packet->HasExtension<AbsoluteSendTime>()) {
219 packet->SetExtension<AbsoluteSendTime>(AbsoluteSendTime::To24Bits(now));
220 }
221
222 if (packet->HasExtension<VideoTimingExtension>()) {
223 if (populate_network2_timestamp_) {
224 packet->set_network2_time(now);
225 } else {
226 packet->set_pacer_exit_time(now);
227 }
228 }
229
230 const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
231 packet->packet_type() == RtpPacketMediaType::kVideo;
232
233 PacketOptions options;
234 {
235 MutexLock lock(&lock_);
236 options.included_in_allocation = force_part_of_allocation_;
237 }
238
239 // Downstream code actually uses this flag to distinguish between media and
240 // everything else.
241 options.is_retransmit = !is_media;
242 if (auto packet_id = packet->GetExtension<TransportSequenceNumber>()) {
243 options.packet_id = *packet_id;
244 options.included_in_feedback = true;
245 options.included_in_allocation = true;
246 AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
247 }
248
249 options.additional_data = packet->additional_data();
250
251 if (packet->packet_type() != RtpPacketMediaType::kPadding &&
252 packet->packet_type() != RtpPacketMediaType::kRetransmission) {
253 UpdateDelayStatistics(packet->capture_time().ms(), now.ms(), packet_ssrc);
254 UpdateOnSendPacket(options.packet_id, packet->capture_time().ms(),
255 packet_ssrc);
256 }
257
258 const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
259
260 // Put packet in retransmission history or update pending status even if
261 // actual sending fails.
262 if (is_media && packet->allow_retransmission()) {
263 packet_history_->PutRtpPacket(std::make_unique<RtpPacketToSend>(*packet),
264 now);
265 } else if (packet->retransmitted_sequence_number()) {
266 packet_history_->MarkPacketAsSent(*packet->retransmitted_sequence_number());
267 }
268
269 if (send_success) {
270 // `media_has_been_sent_` is used by RTPSender to figure out if it can send
271 // padding in the absence of transport-cc or abs-send-time.
272 // In those cases media must be sent first to set a reference timestamp.
273 media_has_been_sent_ = true;
274
275 // TODO(sprang): Add support for FEC protecting all header extensions, add
276 // media packet to generator here instead.
277
278 RTC_DCHECK(packet->packet_type().has_value());
279 RtpPacketMediaType packet_type = *packet->packet_type();
280 RtpPacketCounter counter(*packet);
281 size_t size = packet->size();
282 worker_queue_->PostTask(
283 SafeTask(task_safety_.flag(), [this, now, packet_ssrc, packet_type,
284 counter = std::move(counter), size]() {
285 RTC_DCHECK_RUN_ON(worker_queue_);
286 UpdateRtpStats(now.ms(), packet_ssrc, packet_type, std::move(counter),
287 size);
288 }));
289 }
290 }
291
GetSendRates() const292 RtpSendRates RtpSenderEgress::GetSendRates() const {
293 MutexLock lock(&lock_);
294 const int64_t now_ms = clock_->TimeInMilliseconds();
295 return GetSendRatesLocked(now_ms);
296 }
297
GetSendRatesLocked(int64_t now_ms) const298 RtpSendRates RtpSenderEgress::GetSendRatesLocked(int64_t now_ms) const {
299 RtpSendRates current_rates;
300 for (size_t i = 0; i < kNumMediaTypes; ++i) {
301 RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i);
302 current_rates[type] =
303 DataRate::BitsPerSec(send_rates_[i].Rate(now_ms).value_or(0));
304 }
305 return current_rates;
306 }
307
GetDataCounters(StreamDataCounters * rtp_stats,StreamDataCounters * rtx_stats) const308 void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats,
309 StreamDataCounters* rtx_stats) const {
310 // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
311 // only touched on the worker thread.
312 MutexLock lock(&lock_);
313 *rtp_stats = rtp_stats_;
314 *rtx_stats = rtx_rtp_stats_;
315 }
316
ForceIncludeSendPacketsInAllocation(bool part_of_allocation)317 void RtpSenderEgress::ForceIncludeSendPacketsInAllocation(
318 bool part_of_allocation) {
319 MutexLock lock(&lock_);
320 force_part_of_allocation_ = part_of_allocation;
321 }
322
MediaHasBeenSent() const323 bool RtpSenderEgress::MediaHasBeenSent() const {
324 RTC_DCHECK_RUN_ON(&pacer_checker_);
325 return media_has_been_sent_;
326 }
327
SetMediaHasBeenSent(bool media_sent)328 void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) {
329 RTC_DCHECK_RUN_ON(&pacer_checker_);
330 media_has_been_sent_ = media_sent;
331 }
332
SetTimestampOffset(uint32_t timestamp)333 void RtpSenderEgress::SetTimestampOffset(uint32_t timestamp) {
334 RTC_DCHECK_RUN_ON(worker_queue_);
335 timestamp_offset_ = timestamp;
336 }
337
GetSentRtpPacketInfos(rtc::ArrayView<const uint16_t> sequence_numbers) const338 std::vector<RtpSequenceNumberMap::Info> RtpSenderEgress::GetSentRtpPacketInfos(
339 rtc::ArrayView<const uint16_t> sequence_numbers) const {
340 RTC_DCHECK_RUN_ON(worker_queue_);
341 RTC_DCHECK(!sequence_numbers.empty());
342 if (!need_rtp_packet_infos_) {
343 return std::vector<RtpSequenceNumberMap::Info>();
344 }
345
346 std::vector<RtpSequenceNumberMap::Info> results;
347 results.reserve(sequence_numbers.size());
348
349 for (uint16_t sequence_number : sequence_numbers) {
350 const auto& info = rtp_sequence_number_map_->Get(sequence_number);
351 if (!info) {
352 // The empty vector will be returned. We can delay the clearing
353 // of the vector until after we exit the critical section.
354 return std::vector<RtpSequenceNumberMap::Info>();
355 }
356 results.push_back(*info);
357 }
358
359 return results;
360 }
361
SetFecProtectionParameters(const FecProtectionParams & delta_params,const FecProtectionParams & key_params)362 void RtpSenderEgress::SetFecProtectionParameters(
363 const FecProtectionParams& delta_params,
364 const FecProtectionParams& key_params) {
365 // TODO(sprang): Post task to pacer queue instead, one pacer is fully
366 // migrated to a task queue.
367 MutexLock lock(&lock_);
368 pending_fec_params_.emplace(delta_params, key_params);
369 }
370
371 std::vector<std::unique_ptr<RtpPacketToSend>>
FetchFecPackets()372 RtpSenderEgress::FetchFecPackets() {
373 RTC_DCHECK_RUN_ON(&pacer_checker_);
374 if (fec_generator_) {
375 return fec_generator_->GetFecPackets();
376 }
377 return {};
378 }
379
OnAbortedRetransmissions(rtc::ArrayView<const uint16_t> sequence_numbers)380 void RtpSenderEgress::OnAbortedRetransmissions(
381 rtc::ArrayView<const uint16_t> sequence_numbers) {
382 RTC_DCHECK_RUN_ON(&pacer_checker_);
383 // Mark aborted retransmissions as sent, rather than leaving them in
384 // a 'pending' state - otherwise they can not be requested again and
385 // will not be cleared until the history has reached its max size.
386 for (uint16_t seq_no : sequence_numbers) {
387 packet_history_->MarkPacketAsSent(seq_no);
388 }
389 }
390
HasCorrectSsrc(const RtpPacketToSend & packet) const391 bool RtpSenderEgress::HasCorrectSsrc(const RtpPacketToSend& packet) const {
392 switch (*packet.packet_type()) {
393 case RtpPacketMediaType::kAudio:
394 case RtpPacketMediaType::kVideo:
395 return packet.Ssrc() == ssrc_;
396 case RtpPacketMediaType::kRetransmission:
397 case RtpPacketMediaType::kPadding:
398 // Both padding and retransmission must be on either the media or the
399 // RTX stream.
400 return packet.Ssrc() == rtx_ssrc_ || packet.Ssrc() == ssrc_;
401 case RtpPacketMediaType::kForwardErrorCorrection:
402 // FlexFEC is on separate SSRC, ULPFEC uses media SSRC.
403 return packet.Ssrc() == ssrc_ || packet.Ssrc() == flexfec_ssrc_;
404 }
405 return false;
406 }
407
AddPacketToTransportFeedback(uint16_t packet_id,const RtpPacketToSend & packet,const PacedPacketInfo & pacing_info)408 void RtpSenderEgress::AddPacketToTransportFeedback(
409 uint16_t packet_id,
410 const RtpPacketToSend& packet,
411 const PacedPacketInfo& pacing_info) {
412 if (transport_feedback_observer_) {
413 RtpPacketSendInfo packet_info;
414 packet_info.transport_sequence_number = packet_id;
415 packet_info.rtp_timestamp = packet.Timestamp();
416 packet_info.length = packet.size();
417 packet_info.pacing_info = pacing_info;
418 packet_info.packet_type = packet.packet_type();
419
420 switch (*packet_info.packet_type) {
421 case RtpPacketMediaType::kAudio:
422 case RtpPacketMediaType::kVideo:
423 packet_info.media_ssrc = ssrc_;
424 packet_info.rtp_sequence_number = packet.SequenceNumber();
425 break;
426 case RtpPacketMediaType::kRetransmission:
427 // For retransmissions, we're want to remove the original media packet
428 // if the retransmit arrives - so populate that in the packet info.
429 packet_info.media_ssrc = ssrc_;
430 packet_info.rtp_sequence_number =
431 *packet.retransmitted_sequence_number();
432 break;
433 case RtpPacketMediaType::kPadding:
434 case RtpPacketMediaType::kForwardErrorCorrection:
435 // We're not interested in feedback about these packets being received
436 // or lost.
437 break;
438 }
439
440 transport_feedback_observer_->OnAddPacket(packet_info);
441 }
442 }
443
UpdateDelayStatistics(int64_t capture_time_ms,int64_t now_ms,uint32_t ssrc)444 void RtpSenderEgress::UpdateDelayStatistics(int64_t capture_time_ms,
445 int64_t now_ms,
446 uint32_t ssrc) {
447 if (!send_side_delay_observer_ || capture_time_ms <= 0)
448 return;
449
450 int avg_delay_ms = 0;
451 int max_delay_ms = 0;
452 {
453 MutexLock lock(&lock_);
454 // Compute the max and average of the recent capture-to-send delays.
455 // The time complexity of the current approach depends on the distribution
456 // of the delay values. This could be done more efficiently.
457
458 // Remove elements older than kSendSideDelayWindowMs.
459 auto lower_bound =
460 send_delays_.lower_bound(now_ms - kSendSideDelayWindowMs);
461 for (auto it = send_delays_.begin(); it != lower_bound; ++it) {
462 if (max_delay_it_ == it) {
463 max_delay_it_ = send_delays_.end();
464 }
465 sum_delays_ms_ -= it->second;
466 }
467 send_delays_.erase(send_delays_.begin(), lower_bound);
468 if (max_delay_it_ == send_delays_.end()) {
469 // Removed the previous max. Need to recompute.
470 RecomputeMaxSendDelay();
471 }
472
473 // Add the new element.
474 RTC_DCHECK_GE(now_ms, 0);
475 RTC_DCHECK_LE(now_ms, std::numeric_limits<int64_t>::max() / 2);
476 RTC_DCHECK_GE(capture_time_ms, 0);
477 RTC_DCHECK_LE(capture_time_ms, std::numeric_limits<int64_t>::max() / 2);
478 int64_t diff_ms = now_ms - capture_time_ms;
479 RTC_DCHECK_GE(diff_ms, static_cast<int64_t>(0));
480 RTC_DCHECK_LE(diff_ms, std::numeric_limits<int>::max());
481 int new_send_delay = rtc::dchecked_cast<int>(now_ms - capture_time_ms);
482 SendDelayMap::iterator it;
483 bool inserted;
484 std::tie(it, inserted) =
485 send_delays_.insert(std::make_pair(now_ms, new_send_delay));
486 if (!inserted) {
487 // TODO(terelius): If we have multiple delay measurements during the same
488 // millisecond then we keep the most recent one. It is not clear that this
489 // is the right decision, but it preserves an earlier behavior.
490 int previous_send_delay = it->second;
491 sum_delays_ms_ -= previous_send_delay;
492 it->second = new_send_delay;
493 if (max_delay_it_ == it && new_send_delay < previous_send_delay) {
494 RecomputeMaxSendDelay();
495 }
496 }
497 if (max_delay_it_ == send_delays_.end() ||
498 it->second >= max_delay_it_->second) {
499 max_delay_it_ = it;
500 }
501 sum_delays_ms_ += new_send_delay;
502
503 size_t num_delays = send_delays_.size();
504 RTC_DCHECK(max_delay_it_ != send_delays_.end());
505 max_delay_ms = rtc::dchecked_cast<int>(max_delay_it_->second);
506 int64_t avg_ms = (sum_delays_ms_ + num_delays / 2) / num_delays;
507 RTC_DCHECK_GE(avg_ms, static_cast<int64_t>(0));
508 RTC_DCHECK_LE(avg_ms,
509 static_cast<int64_t>(std::numeric_limits<int>::max()));
510 avg_delay_ms =
511 rtc::dchecked_cast<int>((sum_delays_ms_ + num_delays / 2) / num_delays);
512 }
513 send_side_delay_observer_->SendSideDelayUpdated(avg_delay_ms, max_delay_ms,
514 ssrc);
515 }
516
RecomputeMaxSendDelay()517 void RtpSenderEgress::RecomputeMaxSendDelay() {
518 max_delay_it_ = send_delays_.begin();
519 for (auto it = send_delays_.begin(); it != send_delays_.end(); ++it) {
520 if (it->second >= max_delay_it_->second) {
521 max_delay_it_ = it;
522 }
523 }
524 }
525
UpdateOnSendPacket(int packet_id,int64_t capture_time_ms,uint32_t ssrc)526 void RtpSenderEgress::UpdateOnSendPacket(int packet_id,
527 int64_t capture_time_ms,
528 uint32_t ssrc) {
529 if (!send_packet_observer_ || capture_time_ms <= 0 || packet_id == -1) {
530 return;
531 }
532
533 send_packet_observer_->OnSendPacket(packet_id, capture_time_ms, ssrc);
534 }
535
SendPacketToNetwork(const RtpPacketToSend & packet,const PacketOptions & options,const PacedPacketInfo & pacing_info)536 bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
537 const PacketOptions& options,
538 const PacedPacketInfo& pacing_info) {
539 int bytes_sent = -1;
540 if (transport_) {
541 bytes_sent = transport_->SendRtp(packet.data(), packet.size(), options)
542 ? static_cast<int>(packet.size())
543 : -1;
544 if (event_log_ && bytes_sent > 0) {
545 event_log_->Log(std::make_unique<RtcEventRtpPacketOutgoing>(
546 packet, pacing_info.probe_cluster_id));
547 }
548 }
549
550 if (bytes_sent <= 0) {
551 RTC_LOG(LS_WARNING) << "Transport failed to send packet.";
552 return false;
553 }
554 return true;
555 }
556
UpdateRtpStats(int64_t now_ms,uint32_t packet_ssrc,RtpPacketMediaType packet_type,RtpPacketCounter counter,size_t packet_size)557 void RtpSenderEgress::UpdateRtpStats(int64_t now_ms,
558 uint32_t packet_ssrc,
559 RtpPacketMediaType packet_type,
560 RtpPacketCounter counter,
561 size_t packet_size) {
562 RTC_DCHECK_RUN_ON(worker_queue_);
563
564 // TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the
565 // worker thread.
566 RtpSendRates send_rates;
567 {
568 MutexLock lock(&lock_);
569
570 // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
571 // only touched on the worker thread.
572 StreamDataCounters* counters =
573 packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
574
575 if (counters->first_packet_time_ms == -1) {
576 counters->first_packet_time_ms = now_ms;
577 }
578
579 if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) {
580 counters->fec.Add(counter);
581 } else if (packet_type == RtpPacketMediaType::kRetransmission) {
582 counters->retransmitted.Add(counter);
583 }
584 counters->transmitted.Add(counter);
585
586 send_rates_[static_cast<size_t>(packet_type)].Update(packet_size, now_ms);
587 if (bitrate_callback_) {
588 send_rates = GetSendRatesLocked(now_ms);
589 }
590
591 if (rtp_stats_callback_) {
592 rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc);
593 }
594 }
595
596 // The bitrate_callback_ and rtp_stats_callback_ pointers in practice point
597 // to the same object, so these callbacks could be consolidated into one.
598 if (bitrate_callback_) {
599 bitrate_callback_->Notify(
600 send_rates.Sum().bps(),
601 send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
602 }
603 }
604
PeriodicUpdate()605 void RtpSenderEgress::PeriodicUpdate() {
606 RTC_DCHECK_RUN_ON(worker_queue_);
607 RTC_DCHECK(bitrate_callback_);
608 RtpSendRates send_rates = GetSendRates();
609 bitrate_callback_->Notify(
610 send_rates.Sum().bps(),
611 send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
612 }
613
614 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
BweTestLoggingPlot(int64_t now_ms,uint32_t packet_ssrc)615 void RtpSenderEgress::BweTestLoggingPlot(int64_t now_ms, uint32_t packet_ssrc) {
616 RTC_DCHECK_RUN_ON(worker_queue_);
617
618 const auto rates = GetSendRates();
619 if (is_audio_) {
620 BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now_ms,
621 rates.Sum().kbps(), packet_ssrc);
622 BWE_TEST_LOGGING_PLOT_WITH_SSRC(
623 1, "AudioNackBitrate_kbps", now_ms,
624 rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc);
625 } else {
626 BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "VideoTotBitrate_kbps", now_ms,
627 rates.Sum().kbps(), packet_ssrc);
628 BWE_TEST_LOGGING_PLOT_WITH_SSRC(
629 1, "VideoNackBitrate_kbps", now_ms,
630 rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc);
631 }
632 }
633 #endif // BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
634
635 } // namespace webrtc
636