xref: /aosp_15_r20/external/webrtc/test/network/cross_traffic.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "test/network/cross_traffic.h"
12 
13 #include <math.h>
14 
15 #include <utility>
16 
17 #include "absl/memory/memory.h"
18 #include "absl/types/optional.h"
19 #include "cross_traffic.h"
20 #include "rtc_base/logging.h"
21 #include "rtc_base/numerics/safe_minmax.h"
22 
23 namespace webrtc {
24 namespace test {
25 
RandomWalkCrossTraffic(RandomWalkConfig config,CrossTrafficRoute * traffic_route)26 RandomWalkCrossTraffic::RandomWalkCrossTraffic(RandomWalkConfig config,
27                                                CrossTrafficRoute* traffic_route)
28     : config_(config),
29       traffic_route_(traffic_route),
30       random_(config_.random_seed) {
31   sequence_checker_.Detach();
32 }
33 RandomWalkCrossTraffic::~RandomWalkCrossTraffic() = default;
34 
Process(Timestamp at_time)35 void RandomWalkCrossTraffic::Process(Timestamp at_time) {
36   RTC_DCHECK_RUN_ON(&sequence_checker_);
37   if (last_process_time_.IsMinusInfinity()) {
38     last_process_time_ = at_time;
39   }
40   TimeDelta delta = at_time - last_process_time_;
41   last_process_time_ = at_time;
42 
43   if (at_time - last_update_time_ >= config_.update_interval) {
44     intensity_ += random_.Gaussian(config_.bias, config_.variance) *
45                   sqrt((at_time - last_update_time_).seconds<double>());
46     intensity_ = rtc::SafeClamp(intensity_, 0.0, 1.0);
47     last_update_time_ = at_time;
48   }
49   pending_size_ += TrafficRate() * delta;
50 
51   if (pending_size_ >= config_.min_packet_size &&
52       at_time >= last_send_time_ + config_.min_packet_interval) {
53     traffic_route_->SendPacket(pending_size_.bytes());
54     pending_size_ = DataSize::Zero();
55     last_send_time_ = at_time;
56   }
57 }
58 
GetProcessInterval() const59 TimeDelta RandomWalkCrossTraffic::GetProcessInterval() const {
60   return config_.min_packet_interval;
61 }
62 
TrafficRate() const63 DataRate RandomWalkCrossTraffic::TrafficRate() const {
64   RTC_DCHECK_RUN_ON(&sequence_checker_);
65   return config_.peak_rate * intensity_;
66 }
67 
StatsPrinter()68 ColumnPrinter RandomWalkCrossTraffic::StatsPrinter() {
69   return ColumnPrinter::Lambda(
70       "random_walk_cross_traffic_rate",
71       [this](rtc::SimpleStringBuilder& sb) {
72         sb.AppendFormat("%.0lf", TrafficRate().bps() / 8.0);
73       },
74       32);
75 }
76 
PulsedPeaksCrossTraffic(PulsedPeaksConfig config,CrossTrafficRoute * traffic_route)77 PulsedPeaksCrossTraffic::PulsedPeaksCrossTraffic(
78     PulsedPeaksConfig config,
79     CrossTrafficRoute* traffic_route)
80     : config_(config), traffic_route_(traffic_route) {
81   sequence_checker_.Detach();
82 }
83 PulsedPeaksCrossTraffic::~PulsedPeaksCrossTraffic() = default;
84 
Process(Timestamp at_time)85 void PulsedPeaksCrossTraffic::Process(Timestamp at_time) {
86   RTC_DCHECK_RUN_ON(&sequence_checker_);
87   TimeDelta time_since_toggle = at_time - last_update_time_;
88   if (time_since_toggle.IsInfinite() ||
89       (sending_ && time_since_toggle >= config_.send_duration)) {
90     sending_ = false;
91     last_update_time_ = at_time;
92   } else if (!sending_ && time_since_toggle >= config_.hold_duration) {
93     sending_ = true;
94     last_update_time_ = at_time;
95     // Start sending period.
96     last_send_time_ = at_time;
97   }
98 
99   if (sending_) {
100     DataSize pending_size = config_.peak_rate * (at_time - last_send_time_);
101 
102     if (pending_size >= config_.min_packet_size &&
103         at_time >= last_send_time_ + config_.min_packet_interval) {
104       traffic_route_->SendPacket(pending_size.bytes());
105       last_send_time_ = at_time;
106     }
107   }
108 }
109 
GetProcessInterval() const110 TimeDelta PulsedPeaksCrossTraffic::GetProcessInterval() const {
111   return config_.min_packet_interval;
112 }
113 
TrafficRate() const114 DataRate PulsedPeaksCrossTraffic::TrafficRate() const {
115   RTC_DCHECK_RUN_ON(&sequence_checker_);
116   return sending_ ? config_.peak_rate : DataRate::Zero();
117 }
118 
StatsPrinter()119 ColumnPrinter PulsedPeaksCrossTraffic::StatsPrinter() {
120   return ColumnPrinter::Lambda(
121       "pulsed_peaks_cross_traffic_rate",
122       [this](rtc::SimpleStringBuilder& sb) {
123         sb.AppendFormat("%.0lf", TrafficRate().bps() / 8.0);
124       },
125       32);
126 }
127 
TcpMessageRouteImpl(Clock * clock,TaskQueueBase * task_queue,EmulatedRoute * send_route,EmulatedRoute * ret_route)128 TcpMessageRouteImpl::TcpMessageRouteImpl(Clock* clock,
129                                          TaskQueueBase* task_queue,
130                                          EmulatedRoute* send_route,
131                                          EmulatedRoute* ret_route)
132     : clock_(clock),
133       task_queue_(task_queue),
134       request_route_(send_route,
135                      [this](TcpPacket packet, Timestamp) {
136                        OnRequest(std::move(packet));
137                      }),
138       response_route_(ret_route,
__anon0369fb610402(TcpPacket packet, Timestamp arrival_time) 139                       [this](TcpPacket packet, Timestamp arrival_time) {
140                         OnResponse(std::move(packet), arrival_time);
141                       }) {}
142 
SendMessage(size_t size,std::function<void ()> on_received)143 void TcpMessageRouteImpl::SendMessage(size_t size,
144                                       std::function<void()> on_received) {
145   task_queue_->PostTask(
146       [this, size, handler = std::move(on_received)] {
147         // If we are currently sending a message we won't reset the connection,
148         // we'll act as if the messages are sent in the same TCP stream. This is
149         // intended to simulate recreation of a TCP session for each message
150         // in the typical case while avoiding the complexity overhead of
151         // maintaining multiple virtual TCP sessions in parallel.
152         if (pending_.empty() && in_flight_.empty()) {
153           cwnd_ = 10;
154           ssthresh_ = INFINITY;
155         }
156         int64_t data_left = static_cast<int64_t>(size);
157         int64_t kMaxPacketSize = 1200;
158         int64_t kMinPacketSize = 4;
159         Message message{std::move(handler)};
160         while (data_left > 0) {
161           int64_t packet_size = std::min(data_left, kMaxPacketSize);
162           int fragment_id = next_fragment_id_++;
163           pending_.push_back(MessageFragment{
164               fragment_id,
165               static_cast<size_t>(std::max(kMinPacketSize, packet_size))});
166           message.pending_fragment_ids.insert(fragment_id);
167           data_left -= packet_size;
168         }
169         messages_.emplace_back(message);
170         SendPackets(clock_->CurrentTime());
171       });
172 }
173 
OnRequest(TcpPacket packet_info)174 void TcpMessageRouteImpl::OnRequest(TcpPacket packet_info) {
175   for (auto it = messages_.begin(); it != messages_.end(); ++it) {
176     if (it->pending_fragment_ids.count(packet_info.fragment.fragment_id) != 0) {
177       it->pending_fragment_ids.erase(packet_info.fragment.fragment_id);
178       if (it->pending_fragment_ids.empty()) {
179         it->handler();
180         messages_.erase(it);
181       }
182       break;
183     }
184   }
185   const size_t kAckPacketSize = 20;
186   response_route_.SendPacket(kAckPacketSize, packet_info);
187 }
188 
OnResponse(TcpPacket packet_info,Timestamp at_time)189 void TcpMessageRouteImpl::OnResponse(TcpPacket packet_info, Timestamp at_time) {
190   auto it = in_flight_.find(packet_info.sequence_number);
191   if (it != in_flight_.end()) {
192     last_rtt_ = at_time - packet_info.send_time;
193     in_flight_.erase(it);
194   }
195   auto lost_end = in_flight_.lower_bound(packet_info.sequence_number);
196   for (auto lost_it = in_flight_.begin(); lost_it != lost_end;
197        lost_it = in_flight_.erase(lost_it)) {
198     pending_.push_front(lost_it->second.fragment);
199   }
200 
201   if (packet_info.sequence_number - last_acked_seq_num_ > 1) {
202     HandleLoss(at_time);
203   } else if (cwnd_ <= ssthresh_) {
204     cwnd_ += 1;
205   } else {
206     cwnd_ += 1.0f / cwnd_;
207   }
208   last_acked_seq_num_ =
209       std::max(packet_info.sequence_number, last_acked_seq_num_);
210   SendPackets(at_time);
211 }
212 
HandleLoss(Timestamp at_time)213 void TcpMessageRouteImpl::HandleLoss(Timestamp at_time) {
214   if (at_time - last_reduction_time_ < last_rtt_)
215     return;
216   last_reduction_time_ = at_time;
217   ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
218   cwnd_ = ssthresh_;
219 }
220 
SendPackets(Timestamp at_time)221 void TcpMessageRouteImpl::SendPackets(Timestamp at_time) {
222   const TimeDelta kPacketTimeout = TimeDelta::Seconds(1);
223   int cwnd = std::ceil(cwnd_);
224   int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
225   while (packets_to_send-- > 0 && !pending_.empty()) {
226     auto seq_num = next_sequence_number_++;
227     TcpPacket send;
228     send.sequence_number = seq_num;
229     send.send_time = at_time;
230     send.fragment = pending_.front();
231     pending_.pop_front();
232     request_route_.SendPacket(send.fragment.size, send);
233     in_flight_.insert({seq_num, send});
234     task_queue_->PostDelayedTask(
235         [this, seq_num] {
236           HandlePacketTimeout(seq_num, clock_->CurrentTime());
237         },
238         kPacketTimeout);
239   }
240 }
241 
HandlePacketTimeout(int seq_num,Timestamp at_time)242 void TcpMessageRouteImpl::HandlePacketTimeout(int seq_num, Timestamp at_time) {
243   auto lost = in_flight_.find(seq_num);
244   if (lost != in_flight_.end()) {
245     pending_.push_front(lost->second.fragment);
246     in_flight_.erase(lost);
247     HandleLoss(at_time);
248     SendPackets(at_time);
249   }
250 }
251 
FakeTcpCrossTraffic(FakeTcpConfig config,EmulatedRoute * send_route,EmulatedRoute * ret_route)252 FakeTcpCrossTraffic::FakeTcpCrossTraffic(FakeTcpConfig config,
253                                          EmulatedRoute* send_route,
254                                          EmulatedRoute* ret_route)
255     : conf_(config), route_(this, send_route, ret_route) {}
256 
GetProcessInterval() const257 TimeDelta FakeTcpCrossTraffic::GetProcessInterval() const {
258   return conf_.process_interval;
259 }
260 
Process(Timestamp at_time)261 void FakeTcpCrossTraffic::Process(Timestamp at_time) {
262   SendPackets(at_time);
263 }
264 
OnRequest(int sequence_number,Timestamp at_time)265 void FakeTcpCrossTraffic::OnRequest(int sequence_number, Timestamp at_time) {
266   const size_t kAckPacketSize = 20;
267   route_.SendResponse(kAckPacketSize, sequence_number);
268 }
269 
OnResponse(int sequence_number,Timestamp at_time)270 void FakeTcpCrossTraffic::OnResponse(int sequence_number, Timestamp at_time) {
271   ack_received_ = true;
272   auto it = in_flight_.find(sequence_number);
273   if (it != in_flight_.end()) {
274     last_rtt_ = at_time - in_flight_.at(sequence_number);
275     in_flight_.erase(sequence_number);
276   }
277   if (sequence_number - last_acked_seq_num_ > 1) {
278     HandleLoss(at_time);
279   } else if (cwnd_ <= ssthresh_) {
280     cwnd_ += 1;
281   } else {
282     cwnd_ += 1.0f / cwnd_;
283   }
284   last_acked_seq_num_ = std::max(sequence_number, last_acked_seq_num_);
285   SendPackets(at_time);
286 }
287 
HandleLoss(Timestamp at_time)288 void FakeTcpCrossTraffic::HandleLoss(Timestamp at_time) {
289   if (at_time - last_reduction_time_ < last_rtt_)
290     return;
291   last_reduction_time_ = at_time;
292   ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
293   cwnd_ = ssthresh_;
294 }
295 
SendPackets(Timestamp at_time)296 void FakeTcpCrossTraffic::SendPackets(Timestamp at_time) {
297   int cwnd = std::ceil(cwnd_);
298   int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
299   bool timeouts = false;
300   for (auto it = in_flight_.begin(); it != in_flight_.end();) {
301     if (it->second < at_time - conf_.packet_timeout) {
302       it = in_flight_.erase(it);
303       timeouts = true;
304     } else {
305       ++it;
306     }
307   }
308   if (timeouts)
309     HandleLoss(at_time);
310   for (int i = 0; i < packets_to_send; ++i) {
311     if ((total_sent_ + conf_.packet_size) > conf_.send_limit) {
312       break;
313     }
314     in_flight_.insert({next_sequence_number_, at_time});
315     route_.SendRequest(conf_.packet_size.bytes<size_t>(),
316                        next_sequence_number_++);
317     total_sent_ += conf_.packet_size;
318   }
319 }
320 
321 }  // namespace test
322 }  // namespace webrtc
323