1 /*
2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "test/network/cross_traffic.h"
12
13 #include <math.h>
14
15 #include <utility>
16
17 #include "absl/memory/memory.h"
18 #include "absl/types/optional.h"
19 #include "cross_traffic.h"
20 #include "rtc_base/logging.h"
21 #include "rtc_base/numerics/safe_minmax.h"
22
23 namespace webrtc {
24 namespace test {
25
RandomWalkCrossTraffic(RandomWalkConfig config,CrossTrafficRoute * traffic_route)26 RandomWalkCrossTraffic::RandomWalkCrossTraffic(RandomWalkConfig config,
27 CrossTrafficRoute* traffic_route)
28 : config_(config),
29 traffic_route_(traffic_route),
30 random_(config_.random_seed) {
31 sequence_checker_.Detach();
32 }
33 RandomWalkCrossTraffic::~RandomWalkCrossTraffic() = default;
34
Process(Timestamp at_time)35 void RandomWalkCrossTraffic::Process(Timestamp at_time) {
36 RTC_DCHECK_RUN_ON(&sequence_checker_);
37 if (last_process_time_.IsMinusInfinity()) {
38 last_process_time_ = at_time;
39 }
40 TimeDelta delta = at_time - last_process_time_;
41 last_process_time_ = at_time;
42
43 if (at_time - last_update_time_ >= config_.update_interval) {
44 intensity_ += random_.Gaussian(config_.bias, config_.variance) *
45 sqrt((at_time - last_update_time_).seconds<double>());
46 intensity_ = rtc::SafeClamp(intensity_, 0.0, 1.0);
47 last_update_time_ = at_time;
48 }
49 pending_size_ += TrafficRate() * delta;
50
51 if (pending_size_ >= config_.min_packet_size &&
52 at_time >= last_send_time_ + config_.min_packet_interval) {
53 traffic_route_->SendPacket(pending_size_.bytes());
54 pending_size_ = DataSize::Zero();
55 last_send_time_ = at_time;
56 }
57 }
58
GetProcessInterval() const59 TimeDelta RandomWalkCrossTraffic::GetProcessInterval() const {
60 return config_.min_packet_interval;
61 }
62
TrafficRate() const63 DataRate RandomWalkCrossTraffic::TrafficRate() const {
64 RTC_DCHECK_RUN_ON(&sequence_checker_);
65 return config_.peak_rate * intensity_;
66 }
67
StatsPrinter()68 ColumnPrinter RandomWalkCrossTraffic::StatsPrinter() {
69 return ColumnPrinter::Lambda(
70 "random_walk_cross_traffic_rate",
71 [this](rtc::SimpleStringBuilder& sb) {
72 sb.AppendFormat("%.0lf", TrafficRate().bps() / 8.0);
73 },
74 32);
75 }
76
PulsedPeaksCrossTraffic(PulsedPeaksConfig config,CrossTrafficRoute * traffic_route)77 PulsedPeaksCrossTraffic::PulsedPeaksCrossTraffic(
78 PulsedPeaksConfig config,
79 CrossTrafficRoute* traffic_route)
80 : config_(config), traffic_route_(traffic_route) {
81 sequence_checker_.Detach();
82 }
83 PulsedPeaksCrossTraffic::~PulsedPeaksCrossTraffic() = default;
84
Process(Timestamp at_time)85 void PulsedPeaksCrossTraffic::Process(Timestamp at_time) {
86 RTC_DCHECK_RUN_ON(&sequence_checker_);
87 TimeDelta time_since_toggle = at_time - last_update_time_;
88 if (time_since_toggle.IsInfinite() ||
89 (sending_ && time_since_toggle >= config_.send_duration)) {
90 sending_ = false;
91 last_update_time_ = at_time;
92 } else if (!sending_ && time_since_toggle >= config_.hold_duration) {
93 sending_ = true;
94 last_update_time_ = at_time;
95 // Start sending period.
96 last_send_time_ = at_time;
97 }
98
99 if (sending_) {
100 DataSize pending_size = config_.peak_rate * (at_time - last_send_time_);
101
102 if (pending_size >= config_.min_packet_size &&
103 at_time >= last_send_time_ + config_.min_packet_interval) {
104 traffic_route_->SendPacket(pending_size.bytes());
105 last_send_time_ = at_time;
106 }
107 }
108 }
109
GetProcessInterval() const110 TimeDelta PulsedPeaksCrossTraffic::GetProcessInterval() const {
111 return config_.min_packet_interval;
112 }
113
TrafficRate() const114 DataRate PulsedPeaksCrossTraffic::TrafficRate() const {
115 RTC_DCHECK_RUN_ON(&sequence_checker_);
116 return sending_ ? config_.peak_rate : DataRate::Zero();
117 }
118
StatsPrinter()119 ColumnPrinter PulsedPeaksCrossTraffic::StatsPrinter() {
120 return ColumnPrinter::Lambda(
121 "pulsed_peaks_cross_traffic_rate",
122 [this](rtc::SimpleStringBuilder& sb) {
123 sb.AppendFormat("%.0lf", TrafficRate().bps() / 8.0);
124 },
125 32);
126 }
127
TcpMessageRouteImpl(Clock * clock,TaskQueueBase * task_queue,EmulatedRoute * send_route,EmulatedRoute * ret_route)128 TcpMessageRouteImpl::TcpMessageRouteImpl(Clock* clock,
129 TaskQueueBase* task_queue,
130 EmulatedRoute* send_route,
131 EmulatedRoute* ret_route)
132 : clock_(clock),
133 task_queue_(task_queue),
134 request_route_(send_route,
135 [this](TcpPacket packet, Timestamp) {
136 OnRequest(std::move(packet));
137 }),
138 response_route_(ret_route,
__anon0369fb610402(TcpPacket packet, Timestamp arrival_time) 139 [this](TcpPacket packet, Timestamp arrival_time) {
140 OnResponse(std::move(packet), arrival_time);
141 }) {}
142
SendMessage(size_t size,std::function<void ()> on_received)143 void TcpMessageRouteImpl::SendMessage(size_t size,
144 std::function<void()> on_received) {
145 task_queue_->PostTask(
146 [this, size, handler = std::move(on_received)] {
147 // If we are currently sending a message we won't reset the connection,
148 // we'll act as if the messages are sent in the same TCP stream. This is
149 // intended to simulate recreation of a TCP session for each message
150 // in the typical case while avoiding the complexity overhead of
151 // maintaining multiple virtual TCP sessions in parallel.
152 if (pending_.empty() && in_flight_.empty()) {
153 cwnd_ = 10;
154 ssthresh_ = INFINITY;
155 }
156 int64_t data_left = static_cast<int64_t>(size);
157 int64_t kMaxPacketSize = 1200;
158 int64_t kMinPacketSize = 4;
159 Message message{std::move(handler)};
160 while (data_left > 0) {
161 int64_t packet_size = std::min(data_left, kMaxPacketSize);
162 int fragment_id = next_fragment_id_++;
163 pending_.push_back(MessageFragment{
164 fragment_id,
165 static_cast<size_t>(std::max(kMinPacketSize, packet_size))});
166 message.pending_fragment_ids.insert(fragment_id);
167 data_left -= packet_size;
168 }
169 messages_.emplace_back(message);
170 SendPackets(clock_->CurrentTime());
171 });
172 }
173
OnRequest(TcpPacket packet_info)174 void TcpMessageRouteImpl::OnRequest(TcpPacket packet_info) {
175 for (auto it = messages_.begin(); it != messages_.end(); ++it) {
176 if (it->pending_fragment_ids.count(packet_info.fragment.fragment_id) != 0) {
177 it->pending_fragment_ids.erase(packet_info.fragment.fragment_id);
178 if (it->pending_fragment_ids.empty()) {
179 it->handler();
180 messages_.erase(it);
181 }
182 break;
183 }
184 }
185 const size_t kAckPacketSize = 20;
186 response_route_.SendPacket(kAckPacketSize, packet_info);
187 }
188
OnResponse(TcpPacket packet_info,Timestamp at_time)189 void TcpMessageRouteImpl::OnResponse(TcpPacket packet_info, Timestamp at_time) {
190 auto it = in_flight_.find(packet_info.sequence_number);
191 if (it != in_flight_.end()) {
192 last_rtt_ = at_time - packet_info.send_time;
193 in_flight_.erase(it);
194 }
195 auto lost_end = in_flight_.lower_bound(packet_info.sequence_number);
196 for (auto lost_it = in_flight_.begin(); lost_it != lost_end;
197 lost_it = in_flight_.erase(lost_it)) {
198 pending_.push_front(lost_it->second.fragment);
199 }
200
201 if (packet_info.sequence_number - last_acked_seq_num_ > 1) {
202 HandleLoss(at_time);
203 } else if (cwnd_ <= ssthresh_) {
204 cwnd_ += 1;
205 } else {
206 cwnd_ += 1.0f / cwnd_;
207 }
208 last_acked_seq_num_ =
209 std::max(packet_info.sequence_number, last_acked_seq_num_);
210 SendPackets(at_time);
211 }
212
HandleLoss(Timestamp at_time)213 void TcpMessageRouteImpl::HandleLoss(Timestamp at_time) {
214 if (at_time - last_reduction_time_ < last_rtt_)
215 return;
216 last_reduction_time_ = at_time;
217 ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
218 cwnd_ = ssthresh_;
219 }
220
SendPackets(Timestamp at_time)221 void TcpMessageRouteImpl::SendPackets(Timestamp at_time) {
222 const TimeDelta kPacketTimeout = TimeDelta::Seconds(1);
223 int cwnd = std::ceil(cwnd_);
224 int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
225 while (packets_to_send-- > 0 && !pending_.empty()) {
226 auto seq_num = next_sequence_number_++;
227 TcpPacket send;
228 send.sequence_number = seq_num;
229 send.send_time = at_time;
230 send.fragment = pending_.front();
231 pending_.pop_front();
232 request_route_.SendPacket(send.fragment.size, send);
233 in_flight_.insert({seq_num, send});
234 task_queue_->PostDelayedTask(
235 [this, seq_num] {
236 HandlePacketTimeout(seq_num, clock_->CurrentTime());
237 },
238 kPacketTimeout);
239 }
240 }
241
HandlePacketTimeout(int seq_num,Timestamp at_time)242 void TcpMessageRouteImpl::HandlePacketTimeout(int seq_num, Timestamp at_time) {
243 auto lost = in_flight_.find(seq_num);
244 if (lost != in_flight_.end()) {
245 pending_.push_front(lost->second.fragment);
246 in_flight_.erase(lost);
247 HandleLoss(at_time);
248 SendPackets(at_time);
249 }
250 }
251
FakeTcpCrossTraffic(FakeTcpConfig config,EmulatedRoute * send_route,EmulatedRoute * ret_route)252 FakeTcpCrossTraffic::FakeTcpCrossTraffic(FakeTcpConfig config,
253 EmulatedRoute* send_route,
254 EmulatedRoute* ret_route)
255 : conf_(config), route_(this, send_route, ret_route) {}
256
GetProcessInterval() const257 TimeDelta FakeTcpCrossTraffic::GetProcessInterval() const {
258 return conf_.process_interval;
259 }
260
Process(Timestamp at_time)261 void FakeTcpCrossTraffic::Process(Timestamp at_time) {
262 SendPackets(at_time);
263 }
264
OnRequest(int sequence_number,Timestamp at_time)265 void FakeTcpCrossTraffic::OnRequest(int sequence_number, Timestamp at_time) {
266 const size_t kAckPacketSize = 20;
267 route_.SendResponse(kAckPacketSize, sequence_number);
268 }
269
OnResponse(int sequence_number,Timestamp at_time)270 void FakeTcpCrossTraffic::OnResponse(int sequence_number, Timestamp at_time) {
271 ack_received_ = true;
272 auto it = in_flight_.find(sequence_number);
273 if (it != in_flight_.end()) {
274 last_rtt_ = at_time - in_flight_.at(sequence_number);
275 in_flight_.erase(sequence_number);
276 }
277 if (sequence_number - last_acked_seq_num_ > 1) {
278 HandleLoss(at_time);
279 } else if (cwnd_ <= ssthresh_) {
280 cwnd_ += 1;
281 } else {
282 cwnd_ += 1.0f / cwnd_;
283 }
284 last_acked_seq_num_ = std::max(sequence_number, last_acked_seq_num_);
285 SendPackets(at_time);
286 }
287
HandleLoss(Timestamp at_time)288 void FakeTcpCrossTraffic::HandleLoss(Timestamp at_time) {
289 if (at_time - last_reduction_time_ < last_rtt_)
290 return;
291 last_reduction_time_ = at_time;
292 ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
293 cwnd_ = ssthresh_;
294 }
295
SendPackets(Timestamp at_time)296 void FakeTcpCrossTraffic::SendPackets(Timestamp at_time) {
297 int cwnd = std::ceil(cwnd_);
298 int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
299 bool timeouts = false;
300 for (auto it = in_flight_.begin(); it != in_flight_.end();) {
301 if (it->second < at_time - conf_.packet_timeout) {
302 it = in_flight_.erase(it);
303 timeouts = true;
304 } else {
305 ++it;
306 }
307 }
308 if (timeouts)
309 HandleLoss(at_time);
310 for (int i = 0; i < packets_to_send; ++i) {
311 if ((total_sent_ + conf_.packet_size) > conf_.send_limit) {
312 break;
313 }
314 in_flight_.insert({next_sequence_number_, at_time});
315 route_.SendRequest(conf_.packet_size.bytes<size_t>(),
316 next_sequence_number_++);
317 total_sent_ += conf_.packet_size;
318 }
319 }
320
321 } // namespace test
322 } // namespace webrtc
323