xref: /aosp_15_r20/external/webrtc/modules/rtp_rtcp/source/rtp_sender_egress.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/rtp_rtcp/source/rtp_sender_egress.h"
12 
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16 #include <utility>
17 
18 #include "absl/strings/match.h"
19 #include "api/transport/field_trial_based_config.h"
20 #include "logging/rtc_event_log/events/rtc_event_rtp_packet_outgoing.h"
21 #include "rtc_base/logging.h"
22 
23 namespace webrtc {
24 namespace {
25 constexpr uint32_t kTimestampTicksPerMs = 90;
26 constexpr int kSendSideDelayWindowMs = 1000;
27 constexpr int kBitrateStatisticsWindowMs = 1000;
28 constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13;
29 constexpr TimeDelta kUpdateInterval =
30     TimeDelta::Millis(kBitrateStatisticsWindowMs);
31 }  // namespace
32 
NonPacedPacketSender(RtpSenderEgress * sender,PacketSequencer * sequencer)33 RtpSenderEgress::NonPacedPacketSender::NonPacedPacketSender(
34     RtpSenderEgress* sender,
35     PacketSequencer* sequencer)
36     : transport_sequence_number_(0), sender_(sender), sequencer_(sequencer) {
37   RTC_DCHECK(sequencer);
38 }
39 RtpSenderEgress::NonPacedPacketSender::~NonPacedPacketSender() = default;
40 
EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets)41 void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets(
42     std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
43   for (auto& packet : packets) {
44     PrepareForSend(packet.get());
45     sender_->SendPacket(packet.get(), PacedPacketInfo());
46   }
47   auto fec_packets = sender_->FetchFecPackets();
48   if (!fec_packets.empty()) {
49     EnqueuePackets(std::move(fec_packets));
50   }
51 }
52 
PrepareForSend(RtpPacketToSend * packet)53 void RtpSenderEgress::NonPacedPacketSender::PrepareForSend(
54     RtpPacketToSend* packet) {
55   // Assign sequence numbers, but not for flexfec which is already running on
56   // an internally maintained sequence number series.
57   if (packet->Ssrc() != sender_->FlexFecSsrc()) {
58     sequencer_->Sequence(*packet);
59   }
60   if (!packet->SetExtension<TransportSequenceNumber>(
61           ++transport_sequence_number_)) {
62     --transport_sequence_number_;
63   }
64   packet->ReserveExtension<TransmissionOffset>();
65   packet->ReserveExtension<AbsoluteSendTime>();
66 }
67 
RtpSenderEgress(const RtpRtcpInterface::Configuration & config,RtpPacketHistory * packet_history)68 RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
69                                  RtpPacketHistory* packet_history)
70     : worker_queue_(TaskQueueBase::Current()),
71       ssrc_(config.local_media_ssrc),
72       rtx_ssrc_(config.rtx_send_ssrc),
73       flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc()
74                                          : absl::nullopt),
75       populate_network2_timestamp_(config.populate_network2_timestamp),
76       clock_(config.clock),
77       packet_history_(packet_history),
78       transport_(config.outgoing_transport),
79       event_log_(config.event_log),
80 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
81       is_audio_(config.audio),
82 #endif
83       need_rtp_packet_infos_(config.need_rtp_packet_infos),
84       fec_generator_(config.fec_generator),
85       transport_feedback_observer_(config.transport_feedback_callback),
86       send_side_delay_observer_(config.send_side_delay_observer),
87       send_packet_observer_(config.send_packet_observer),
88       rtp_stats_callback_(config.rtp_stats_callback),
89       bitrate_callback_(config.send_bitrate_observer),
90       media_has_been_sent_(false),
91       force_part_of_allocation_(false),
92       timestamp_offset_(0),
93       max_delay_it_(send_delays_.end()),
94       sum_delays_ms_(0),
95       send_rates_(kNumMediaTypes,
96                   {kBitrateStatisticsWindowMs, RateStatistics::kBpsScale}),
97       rtp_sequence_number_map_(need_rtp_packet_infos_
98                                    ? std::make_unique<RtpSequenceNumberMap>(
99                                          kRtpSequenceNumberMapMaxEntries)
100                                    : nullptr) {
101   RTC_DCHECK(worker_queue_);
102   pacer_checker_.Detach();
103   if (bitrate_callback_) {
104     update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_,
__anona0a773940202() 105                                                      kUpdateInterval, [this]() {
106                                                        PeriodicUpdate();
107                                                        return kUpdateInterval;
108                                                      });
109   }
110 }
111 
~RtpSenderEgress()112 RtpSenderEgress::~RtpSenderEgress() {
113   RTC_DCHECK_RUN_ON(worker_queue_);
114   update_task_.Stop();
115 }
116 
SendPacket(RtpPacketToSend * packet,const PacedPacketInfo & pacing_info)117 void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
118                                  const PacedPacketInfo& pacing_info) {
119   RTC_DCHECK_RUN_ON(&pacer_checker_);
120   RTC_DCHECK(packet);
121 
122   if (packet->Ssrc() == ssrc_ &&
123       packet->packet_type() != RtpPacketMediaType::kRetransmission) {
124     if (last_sent_seq_.has_value()) {
125       RTC_DCHECK_EQ(static_cast<uint16_t>(*last_sent_seq_ + 1),
126                     packet->SequenceNumber());
127     }
128     last_sent_seq_ = packet->SequenceNumber();
129   } else if (packet->Ssrc() == rtx_ssrc_) {
130     if (last_sent_rtx_seq_.has_value()) {
131       RTC_DCHECK_EQ(static_cast<uint16_t>(*last_sent_rtx_seq_ + 1),
132                     packet->SequenceNumber());
133     }
134     last_sent_rtx_seq_ = packet->SequenceNumber();
135   }
136 
137   RTC_DCHECK(packet->packet_type().has_value());
138   RTC_DCHECK(HasCorrectSsrc(*packet));
139   if (packet->packet_type() == RtpPacketMediaType::kRetransmission) {
140     RTC_DCHECK(packet->retransmitted_sequence_number().has_value());
141   }
142 
143   const uint32_t packet_ssrc = packet->Ssrc();
144   const Timestamp now = clock_->CurrentTime();
145 
146 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
147   worker_queue_->PostTask(
148       SafeTask(task_safety_.flag(), [this, now, packet_ssrc]() {
149         BweTestLoggingPlot(now.ms(), packet_ssrc);
150       }));
151 #endif
152 
153   if (need_rtp_packet_infos_ &&
154       packet->packet_type() == RtpPacketToSend::Type::kVideo) {
155     worker_queue_->PostTask(SafeTask(
156         task_safety_.flag(),
157         [this, packet_timestamp = packet->Timestamp(),
158          is_first_packet_of_frame = packet->is_first_packet_of_frame(),
159          is_last_packet_of_frame = packet->Marker(),
160          sequence_number = packet->SequenceNumber()]() {
161           RTC_DCHECK_RUN_ON(worker_queue_);
162           // Last packet of a frame, add it to sequence number info map.
163           const uint32_t timestamp = packet_timestamp - timestamp_offset_;
164           rtp_sequence_number_map_->InsertPacket(
165               sequence_number,
166               RtpSequenceNumberMap::Info(timestamp, is_first_packet_of_frame,
167                                          is_last_packet_of_frame));
168         }));
169   }
170 
171   if (fec_generator_ && packet->fec_protect_packet()) {
172     // This packet should be protected by FEC, add it to packet generator.
173     RTC_DCHECK(fec_generator_);
174     RTC_DCHECK(packet->packet_type() == RtpPacketMediaType::kVideo);
175     absl::optional<std::pair<FecProtectionParams, FecProtectionParams>>
176         new_fec_params;
177     {
178       MutexLock lock(&lock_);
179       new_fec_params.swap(pending_fec_params_);
180     }
181     if (new_fec_params) {
182       fec_generator_->SetProtectionParameters(new_fec_params->first,
183                                               new_fec_params->second);
184     }
185     if (packet->is_red()) {
186       RtpPacketToSend unpacked_packet(*packet);
187 
188       const rtc::CopyOnWriteBuffer buffer = packet->Buffer();
189       // Grab media payload type from RED header.
190       const size_t headers_size = packet->headers_size();
191       unpacked_packet.SetPayloadType(buffer[headers_size]);
192 
193       // Copy the media payload into the unpacked buffer.
194       uint8_t* payload_buffer =
195           unpacked_packet.SetPayloadSize(packet->payload_size() - 1);
196       std::copy(&packet->payload()[0] + 1,
197                 &packet->payload()[0] + packet->payload_size(), payload_buffer);
198 
199       fec_generator_->AddPacketAndGenerateFec(unpacked_packet);
200     } else {
201       // If not RED encapsulated - we can just insert packet directly.
202       fec_generator_->AddPacketAndGenerateFec(*packet);
203     }
204   }
205 
206   // Bug webrtc:7859. While FEC is invoked from rtp_sender_video, and not after
207   // the pacer, these modifications of the header below are happening after the
208   // FEC protection packets are calculated. This will corrupt recovered packets
209   // at the same place. It's not an issue for extensions, which are present in
210   // all the packets (their content just may be incorrect on recovered packets).
211   // In case of VideoTimingExtension, since it's present not in every packet,
212   // data after rtp header may be corrupted if these packets are protected by
213   // the FEC.
214   TimeDelta diff = now - packet->capture_time();
215   if (packet->HasExtension<TransmissionOffset>()) {
216     packet->SetExtension<TransmissionOffset>(kTimestampTicksPerMs * diff.ms());
217   }
218   if (packet->HasExtension<AbsoluteSendTime>()) {
219     packet->SetExtension<AbsoluteSendTime>(AbsoluteSendTime::To24Bits(now));
220   }
221 
222   if (packet->HasExtension<VideoTimingExtension>()) {
223     if (populate_network2_timestamp_) {
224       packet->set_network2_time(now);
225     } else {
226       packet->set_pacer_exit_time(now);
227     }
228   }
229 
230   const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
231                         packet->packet_type() == RtpPacketMediaType::kVideo;
232 
233   PacketOptions options;
234   {
235     MutexLock lock(&lock_);
236     options.included_in_allocation = force_part_of_allocation_;
237   }
238 
239   // Downstream code actually uses this flag to distinguish between media and
240   // everything else.
241   options.is_retransmit = !is_media;
242   if (auto packet_id = packet->GetExtension<TransportSequenceNumber>()) {
243     options.packet_id = *packet_id;
244     options.included_in_feedback = true;
245     options.included_in_allocation = true;
246     AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
247   }
248 
249   options.additional_data = packet->additional_data();
250 
251   if (packet->packet_type() != RtpPacketMediaType::kPadding &&
252       packet->packet_type() != RtpPacketMediaType::kRetransmission) {
253     UpdateDelayStatistics(packet->capture_time().ms(), now.ms(), packet_ssrc);
254     UpdateOnSendPacket(options.packet_id, packet->capture_time().ms(),
255                        packet_ssrc);
256   }
257 
258   const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
259 
260   // Put packet in retransmission history or update pending status even if
261   // actual sending fails.
262   if (is_media && packet->allow_retransmission()) {
263     packet_history_->PutRtpPacket(std::make_unique<RtpPacketToSend>(*packet),
264                                   now);
265   } else if (packet->retransmitted_sequence_number()) {
266     packet_history_->MarkPacketAsSent(*packet->retransmitted_sequence_number());
267   }
268 
269   if (send_success) {
270     // `media_has_been_sent_` is used by RTPSender to figure out if it can send
271     // padding in the absence of transport-cc or abs-send-time.
272     // In those cases media must be sent first to set a reference timestamp.
273     media_has_been_sent_ = true;
274 
275     // TODO(sprang): Add support for FEC protecting all header extensions, add
276     // media packet to generator here instead.
277 
278     RTC_DCHECK(packet->packet_type().has_value());
279     RtpPacketMediaType packet_type = *packet->packet_type();
280     RtpPacketCounter counter(*packet);
281     size_t size = packet->size();
282     worker_queue_->PostTask(
283         SafeTask(task_safety_.flag(), [this, now, packet_ssrc, packet_type,
284                                        counter = std::move(counter), size]() {
285           RTC_DCHECK_RUN_ON(worker_queue_);
286           UpdateRtpStats(now.ms(), packet_ssrc, packet_type, std::move(counter),
287                          size);
288         }));
289   }
290 }
291 
GetSendRates() const292 RtpSendRates RtpSenderEgress::GetSendRates() const {
293   MutexLock lock(&lock_);
294   const int64_t now_ms = clock_->TimeInMilliseconds();
295   return GetSendRatesLocked(now_ms);
296 }
297 
GetSendRatesLocked(int64_t now_ms) const298 RtpSendRates RtpSenderEgress::GetSendRatesLocked(int64_t now_ms) const {
299   RtpSendRates current_rates;
300   for (size_t i = 0; i < kNumMediaTypes; ++i) {
301     RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i);
302     current_rates[type] =
303         DataRate::BitsPerSec(send_rates_[i].Rate(now_ms).value_or(0));
304   }
305   return current_rates;
306 }
307 
GetDataCounters(StreamDataCounters * rtp_stats,StreamDataCounters * rtx_stats) const308 void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats,
309                                       StreamDataCounters* rtx_stats) const {
310   // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
311   // only touched on the worker thread.
312   MutexLock lock(&lock_);
313   *rtp_stats = rtp_stats_;
314   *rtx_stats = rtx_rtp_stats_;
315 }
316 
ForceIncludeSendPacketsInAllocation(bool part_of_allocation)317 void RtpSenderEgress::ForceIncludeSendPacketsInAllocation(
318     bool part_of_allocation) {
319   MutexLock lock(&lock_);
320   force_part_of_allocation_ = part_of_allocation;
321 }
322 
MediaHasBeenSent() const323 bool RtpSenderEgress::MediaHasBeenSent() const {
324   RTC_DCHECK_RUN_ON(&pacer_checker_);
325   return media_has_been_sent_;
326 }
327 
SetMediaHasBeenSent(bool media_sent)328 void RtpSenderEgress::SetMediaHasBeenSent(bool media_sent) {
329   RTC_DCHECK_RUN_ON(&pacer_checker_);
330   media_has_been_sent_ = media_sent;
331 }
332 
SetTimestampOffset(uint32_t timestamp)333 void RtpSenderEgress::SetTimestampOffset(uint32_t timestamp) {
334   RTC_DCHECK_RUN_ON(worker_queue_);
335   timestamp_offset_ = timestamp;
336 }
337 
GetSentRtpPacketInfos(rtc::ArrayView<const uint16_t> sequence_numbers) const338 std::vector<RtpSequenceNumberMap::Info> RtpSenderEgress::GetSentRtpPacketInfos(
339     rtc::ArrayView<const uint16_t> sequence_numbers) const {
340   RTC_DCHECK_RUN_ON(worker_queue_);
341   RTC_DCHECK(!sequence_numbers.empty());
342   if (!need_rtp_packet_infos_) {
343     return std::vector<RtpSequenceNumberMap::Info>();
344   }
345 
346   std::vector<RtpSequenceNumberMap::Info> results;
347   results.reserve(sequence_numbers.size());
348 
349   for (uint16_t sequence_number : sequence_numbers) {
350     const auto& info = rtp_sequence_number_map_->Get(sequence_number);
351     if (!info) {
352       // The empty vector will be returned. We can delay the clearing
353       // of the vector until after we exit the critical section.
354       return std::vector<RtpSequenceNumberMap::Info>();
355     }
356     results.push_back(*info);
357   }
358 
359   return results;
360 }
361 
SetFecProtectionParameters(const FecProtectionParams & delta_params,const FecProtectionParams & key_params)362 void RtpSenderEgress::SetFecProtectionParameters(
363     const FecProtectionParams& delta_params,
364     const FecProtectionParams& key_params) {
365   // TODO(sprang): Post task to pacer queue instead, one pacer is fully
366   // migrated to a task queue.
367   MutexLock lock(&lock_);
368   pending_fec_params_.emplace(delta_params, key_params);
369 }
370 
371 std::vector<std::unique_ptr<RtpPacketToSend>>
FetchFecPackets()372 RtpSenderEgress::FetchFecPackets() {
373   RTC_DCHECK_RUN_ON(&pacer_checker_);
374   if (fec_generator_) {
375     return fec_generator_->GetFecPackets();
376   }
377   return {};
378 }
379 
OnAbortedRetransmissions(rtc::ArrayView<const uint16_t> sequence_numbers)380 void RtpSenderEgress::OnAbortedRetransmissions(
381     rtc::ArrayView<const uint16_t> sequence_numbers) {
382   RTC_DCHECK_RUN_ON(&pacer_checker_);
383   // Mark aborted retransmissions as sent, rather than leaving them in
384   // a 'pending' state - otherwise they can not be requested again and
385   // will not be cleared until the history has reached its max size.
386   for (uint16_t seq_no : sequence_numbers) {
387     packet_history_->MarkPacketAsSent(seq_no);
388   }
389 }
390 
HasCorrectSsrc(const RtpPacketToSend & packet) const391 bool RtpSenderEgress::HasCorrectSsrc(const RtpPacketToSend& packet) const {
392   switch (*packet.packet_type()) {
393     case RtpPacketMediaType::kAudio:
394     case RtpPacketMediaType::kVideo:
395       return packet.Ssrc() == ssrc_;
396     case RtpPacketMediaType::kRetransmission:
397     case RtpPacketMediaType::kPadding:
398       // Both padding and retransmission must be on either the media or the
399       // RTX stream.
400       return packet.Ssrc() == rtx_ssrc_ || packet.Ssrc() == ssrc_;
401     case RtpPacketMediaType::kForwardErrorCorrection:
402       // FlexFEC is on separate SSRC, ULPFEC uses media SSRC.
403       return packet.Ssrc() == ssrc_ || packet.Ssrc() == flexfec_ssrc_;
404   }
405   return false;
406 }
407 
AddPacketToTransportFeedback(uint16_t packet_id,const RtpPacketToSend & packet,const PacedPacketInfo & pacing_info)408 void RtpSenderEgress::AddPacketToTransportFeedback(
409     uint16_t packet_id,
410     const RtpPacketToSend& packet,
411     const PacedPacketInfo& pacing_info) {
412   if (transport_feedback_observer_) {
413     RtpPacketSendInfo packet_info;
414     packet_info.transport_sequence_number = packet_id;
415     packet_info.rtp_timestamp = packet.Timestamp();
416     packet_info.length = packet.size();
417     packet_info.pacing_info = pacing_info;
418     packet_info.packet_type = packet.packet_type();
419 
420     switch (*packet_info.packet_type) {
421       case RtpPacketMediaType::kAudio:
422       case RtpPacketMediaType::kVideo:
423         packet_info.media_ssrc = ssrc_;
424         packet_info.rtp_sequence_number = packet.SequenceNumber();
425         break;
426       case RtpPacketMediaType::kRetransmission:
427         // For retransmissions, we're want to remove the original media packet
428         // if the retransmit arrives - so populate that in the packet info.
429         packet_info.media_ssrc = ssrc_;
430         packet_info.rtp_sequence_number =
431             *packet.retransmitted_sequence_number();
432         break;
433       case RtpPacketMediaType::kPadding:
434       case RtpPacketMediaType::kForwardErrorCorrection:
435         // We're not interested in feedback about these packets being received
436         // or lost.
437         break;
438     }
439 
440     transport_feedback_observer_->OnAddPacket(packet_info);
441   }
442 }
443 
UpdateDelayStatistics(int64_t capture_time_ms,int64_t now_ms,uint32_t ssrc)444 void RtpSenderEgress::UpdateDelayStatistics(int64_t capture_time_ms,
445                                             int64_t now_ms,
446                                             uint32_t ssrc) {
447   if (!send_side_delay_observer_ || capture_time_ms <= 0)
448     return;
449 
450   int avg_delay_ms = 0;
451   int max_delay_ms = 0;
452   {
453     MutexLock lock(&lock_);
454     // Compute the max and average of the recent capture-to-send delays.
455     // The time complexity of the current approach depends on the distribution
456     // of the delay values. This could be done more efficiently.
457 
458     // Remove elements older than kSendSideDelayWindowMs.
459     auto lower_bound =
460         send_delays_.lower_bound(now_ms - kSendSideDelayWindowMs);
461     for (auto it = send_delays_.begin(); it != lower_bound; ++it) {
462       if (max_delay_it_ == it) {
463         max_delay_it_ = send_delays_.end();
464       }
465       sum_delays_ms_ -= it->second;
466     }
467     send_delays_.erase(send_delays_.begin(), lower_bound);
468     if (max_delay_it_ == send_delays_.end()) {
469       // Removed the previous max. Need to recompute.
470       RecomputeMaxSendDelay();
471     }
472 
473     // Add the new element.
474     RTC_DCHECK_GE(now_ms, 0);
475     RTC_DCHECK_LE(now_ms, std::numeric_limits<int64_t>::max() / 2);
476     RTC_DCHECK_GE(capture_time_ms, 0);
477     RTC_DCHECK_LE(capture_time_ms, std::numeric_limits<int64_t>::max() / 2);
478     int64_t diff_ms = now_ms - capture_time_ms;
479     RTC_DCHECK_GE(diff_ms, static_cast<int64_t>(0));
480     RTC_DCHECK_LE(diff_ms, std::numeric_limits<int>::max());
481     int new_send_delay = rtc::dchecked_cast<int>(now_ms - capture_time_ms);
482     SendDelayMap::iterator it;
483     bool inserted;
484     std::tie(it, inserted) =
485         send_delays_.insert(std::make_pair(now_ms, new_send_delay));
486     if (!inserted) {
487       // TODO(terelius): If we have multiple delay measurements during the same
488       // millisecond then we keep the most recent one. It is not clear that this
489       // is the right decision, but it preserves an earlier behavior.
490       int previous_send_delay = it->second;
491       sum_delays_ms_ -= previous_send_delay;
492       it->second = new_send_delay;
493       if (max_delay_it_ == it && new_send_delay < previous_send_delay) {
494         RecomputeMaxSendDelay();
495       }
496     }
497     if (max_delay_it_ == send_delays_.end() ||
498         it->second >= max_delay_it_->second) {
499       max_delay_it_ = it;
500     }
501     sum_delays_ms_ += new_send_delay;
502 
503     size_t num_delays = send_delays_.size();
504     RTC_DCHECK(max_delay_it_ != send_delays_.end());
505     max_delay_ms = rtc::dchecked_cast<int>(max_delay_it_->second);
506     int64_t avg_ms = (sum_delays_ms_ + num_delays / 2) / num_delays;
507     RTC_DCHECK_GE(avg_ms, static_cast<int64_t>(0));
508     RTC_DCHECK_LE(avg_ms,
509                   static_cast<int64_t>(std::numeric_limits<int>::max()));
510     avg_delay_ms =
511         rtc::dchecked_cast<int>((sum_delays_ms_ + num_delays / 2) / num_delays);
512   }
513   send_side_delay_observer_->SendSideDelayUpdated(avg_delay_ms, max_delay_ms,
514                                                   ssrc);
515 }
516 
RecomputeMaxSendDelay()517 void RtpSenderEgress::RecomputeMaxSendDelay() {
518   max_delay_it_ = send_delays_.begin();
519   for (auto it = send_delays_.begin(); it != send_delays_.end(); ++it) {
520     if (it->second >= max_delay_it_->second) {
521       max_delay_it_ = it;
522     }
523   }
524 }
525 
UpdateOnSendPacket(int packet_id,int64_t capture_time_ms,uint32_t ssrc)526 void RtpSenderEgress::UpdateOnSendPacket(int packet_id,
527                                          int64_t capture_time_ms,
528                                          uint32_t ssrc) {
529   if (!send_packet_observer_ || capture_time_ms <= 0 || packet_id == -1) {
530     return;
531   }
532 
533   send_packet_observer_->OnSendPacket(packet_id, capture_time_ms, ssrc);
534 }
535 
SendPacketToNetwork(const RtpPacketToSend & packet,const PacketOptions & options,const PacedPacketInfo & pacing_info)536 bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
537                                           const PacketOptions& options,
538                                           const PacedPacketInfo& pacing_info) {
539   int bytes_sent = -1;
540   if (transport_) {
541     bytes_sent = transport_->SendRtp(packet.data(), packet.size(), options)
542                      ? static_cast<int>(packet.size())
543                      : -1;
544     if (event_log_ && bytes_sent > 0) {
545       event_log_->Log(std::make_unique<RtcEventRtpPacketOutgoing>(
546           packet, pacing_info.probe_cluster_id));
547     }
548   }
549 
550   if (bytes_sent <= 0) {
551     RTC_LOG(LS_WARNING) << "Transport failed to send packet.";
552     return false;
553   }
554   return true;
555 }
556 
UpdateRtpStats(int64_t now_ms,uint32_t packet_ssrc,RtpPacketMediaType packet_type,RtpPacketCounter counter,size_t packet_size)557 void RtpSenderEgress::UpdateRtpStats(int64_t now_ms,
558                                      uint32_t packet_ssrc,
559                                      RtpPacketMediaType packet_type,
560                                      RtpPacketCounter counter,
561                                      size_t packet_size) {
562   RTC_DCHECK_RUN_ON(worker_queue_);
563 
564   // TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the
565   // worker thread.
566   RtpSendRates send_rates;
567   {
568     MutexLock lock(&lock_);
569 
570     // TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
571     // only touched on the worker thread.
572     StreamDataCounters* counters =
573         packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
574 
575     if (counters->first_packet_time_ms == -1) {
576       counters->first_packet_time_ms = now_ms;
577     }
578 
579     if (packet_type == RtpPacketMediaType::kForwardErrorCorrection) {
580       counters->fec.Add(counter);
581     } else if (packet_type == RtpPacketMediaType::kRetransmission) {
582       counters->retransmitted.Add(counter);
583     }
584     counters->transmitted.Add(counter);
585 
586     send_rates_[static_cast<size_t>(packet_type)].Update(packet_size, now_ms);
587     if (bitrate_callback_) {
588       send_rates = GetSendRatesLocked(now_ms);
589     }
590 
591     if (rtp_stats_callback_) {
592       rtp_stats_callback_->DataCountersUpdated(*counters, packet_ssrc);
593     }
594   }
595 
596   // The bitrate_callback_ and rtp_stats_callback_ pointers in practice point
597   // to the same object, so these callbacks could be consolidated into one.
598   if (bitrate_callback_) {
599     bitrate_callback_->Notify(
600         send_rates.Sum().bps(),
601         send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
602   }
603 }
604 
PeriodicUpdate()605 void RtpSenderEgress::PeriodicUpdate() {
606   RTC_DCHECK_RUN_ON(worker_queue_);
607   RTC_DCHECK(bitrate_callback_);
608   RtpSendRates send_rates = GetSendRates();
609   bitrate_callback_->Notify(
610       send_rates.Sum().bps(),
611       send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
612 }
613 
614 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
BweTestLoggingPlot(int64_t now_ms,uint32_t packet_ssrc)615 void RtpSenderEgress::BweTestLoggingPlot(int64_t now_ms, uint32_t packet_ssrc) {
616   RTC_DCHECK_RUN_ON(worker_queue_);
617 
618   const auto rates = GetSendRates();
619   if (is_audio_) {
620     BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "AudioTotBitrate_kbps", now_ms,
621                                     rates.Sum().kbps(), packet_ssrc);
622     BWE_TEST_LOGGING_PLOT_WITH_SSRC(
623         1, "AudioNackBitrate_kbps", now_ms,
624         rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc);
625   } else {
626     BWE_TEST_LOGGING_PLOT_WITH_SSRC(1, "VideoTotBitrate_kbps", now_ms,
627                                     rates.Sum().kbps(), packet_ssrc);
628     BWE_TEST_LOGGING_PLOT_WITH_SSRC(
629         1, "VideoNackBitrate_kbps", now_ms,
630         rates[RtpPacketMediaType::kRetransmission].kbps(), packet_ssrc);
631   }
632 }
633 #endif  // BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
634 
635 }  // namespace webrtc
636