1 /*
2 * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "test/network/network_emulation.h"
12
13 #include <algorithm>
14 #include <limits>
15 #include <memory>
16 #include <utility>
17
18 #include "absl/types/optional.h"
19 #include "api/numerics/samples_stats_counter.h"
20 #include "api/sequence_checker.h"
21 #include "api/test/network_emulation/network_emulation_interfaces.h"
22 #include "api/test/network_emulation_manager.h"
23 #include "api/units/data_size.h"
24 #include "api/units/time_delta.h"
25 #include "rtc_base/logging.h"
26
27 namespace webrtc {
28 namespace {
29
GetOverallOutgoingStats(const std::map<rtc::IPAddress,EmulatedNetworkOutgoingStats> & outgoing_stats,EmulatedNetworkStatsGatheringMode mode)30 EmulatedNetworkOutgoingStats GetOverallOutgoingStats(
31 const std::map<rtc::IPAddress, EmulatedNetworkOutgoingStats>&
32 outgoing_stats,
33 EmulatedNetworkStatsGatheringMode mode) {
34 EmulatedNetworkOutgoingStatsBuilder builder(mode);
35 for (const auto& entry : outgoing_stats) {
36 builder.AddOutgoingStats(entry.second);
37 }
38 return builder.Build();
39 }
40
GetOverallIncomingStats(const std::map<rtc::IPAddress,EmulatedNetworkIncomingStats> & incoming_stats,EmulatedNetworkStatsGatheringMode mode)41 EmulatedNetworkIncomingStats GetOverallIncomingStats(
42 const std::map<rtc::IPAddress, EmulatedNetworkIncomingStats>&
43 incoming_stats,
44 EmulatedNetworkStatsGatheringMode mode) {
45 EmulatedNetworkIncomingStatsBuilder builder(mode);
46 for (const auto& entry : incoming_stats) {
47 builder.AddIncomingStats(entry.second);
48 }
49 return builder.Build();
50 }
51
52 } // namespace
53
EmulatedNetworkOutgoingStatsBuilder(EmulatedNetworkStatsGatheringMode stats_gathering_mode)54 EmulatedNetworkOutgoingStatsBuilder::EmulatedNetworkOutgoingStatsBuilder(
55 EmulatedNetworkStatsGatheringMode stats_gathering_mode)
56 : stats_gathering_mode_(stats_gathering_mode) {
57 sequence_checker_.Detach();
58 }
59
OnPacketSent(Timestamp sent_time,DataSize packet_size)60 void EmulatedNetworkOutgoingStatsBuilder::OnPacketSent(Timestamp sent_time,
61 DataSize packet_size) {
62 RTC_DCHECK_RUN_ON(&sequence_checker_);
63 RTC_CHECK_GE(packet_size, DataSize::Zero());
64 if (stats_.first_packet_sent_time.IsInfinite()) {
65 stats_.first_packet_sent_time = sent_time;
66 stats_.first_sent_packet_size = packet_size;
67 }
68 stats_.last_packet_sent_time = sent_time;
69 stats_.packets_sent++;
70 stats_.bytes_sent += packet_size;
71 if (stats_gathering_mode_ == EmulatedNetworkStatsGatheringMode::kDebug) {
72 stats_.sent_packets_size.AddSample(packet_size.bytes());
73 }
74 }
75
AddOutgoingStats(const EmulatedNetworkOutgoingStats & stats)76 void EmulatedNetworkOutgoingStatsBuilder::AddOutgoingStats(
77 const EmulatedNetworkOutgoingStats& stats) {
78 RTC_DCHECK_RUN_ON(&sequence_checker_);
79 stats_.packets_sent += stats.packets_sent;
80 stats_.bytes_sent += stats.bytes_sent;
81 stats_.sent_packets_size.AddSamples(stats.sent_packets_size);
82 if (stats_.first_packet_sent_time > stats.first_packet_sent_time) {
83 stats_.first_packet_sent_time = stats.first_packet_sent_time;
84 stats_.first_sent_packet_size = stats.first_sent_packet_size;
85 }
86 if (stats_.last_packet_sent_time < stats.last_packet_sent_time) {
87 stats_.last_packet_sent_time = stats.last_packet_sent_time;
88 }
89 }
90
Build() const91 EmulatedNetworkOutgoingStats EmulatedNetworkOutgoingStatsBuilder::Build()
92 const {
93 RTC_DCHECK_RUN_ON(&sequence_checker_);
94 return stats_;
95 }
96
EmulatedNetworkIncomingStatsBuilder(EmulatedNetworkStatsGatheringMode stats_gathering_mode)97 EmulatedNetworkIncomingStatsBuilder::EmulatedNetworkIncomingStatsBuilder(
98 EmulatedNetworkStatsGatheringMode stats_gathering_mode)
99 : stats_gathering_mode_(stats_gathering_mode) {
100 sequence_checker_.Detach();
101 }
102
OnPacketDropped(DataSize packet_size)103 void EmulatedNetworkIncomingStatsBuilder::OnPacketDropped(
104 DataSize packet_size) {
105 RTC_DCHECK_RUN_ON(&sequence_checker_);
106 stats_.packets_discarded_no_receiver++;
107 stats_.bytes_discarded_no_receiver += packet_size;
108 if (stats_gathering_mode_ == EmulatedNetworkStatsGatheringMode::kDebug) {
109 stats_.packets_discarded_no_receiver_size.AddSample(packet_size.bytes());
110 }
111 }
112
OnPacketReceived(Timestamp received_time,DataSize packet_size)113 void EmulatedNetworkIncomingStatsBuilder::OnPacketReceived(
114 Timestamp received_time,
115 DataSize packet_size) {
116 RTC_DCHECK_RUN_ON(&sequence_checker_);
117 RTC_CHECK_GE(packet_size, DataSize::Zero());
118 if (stats_.first_packet_received_time.IsInfinite()) {
119 stats_.first_packet_received_time = received_time;
120 stats_.first_received_packet_size = packet_size;
121 }
122 stats_.last_packet_received_time = received_time;
123 stats_.packets_received++;
124 stats_.bytes_received += packet_size;
125 if (stats_gathering_mode_ == EmulatedNetworkStatsGatheringMode::kDebug) {
126 stats_.received_packets_size.AddSample(packet_size.bytes());
127 }
128 }
129
AddIncomingStats(const EmulatedNetworkIncomingStats & stats)130 void EmulatedNetworkIncomingStatsBuilder::AddIncomingStats(
131 const EmulatedNetworkIncomingStats& stats) {
132 RTC_DCHECK_RUN_ON(&sequence_checker_);
133 stats_.packets_received += stats.packets_received;
134 stats_.bytes_received += stats.bytes_received;
135 stats_.received_packets_size.AddSamples(stats.received_packets_size);
136 stats_.packets_discarded_no_receiver += stats.packets_discarded_no_receiver;
137 stats_.bytes_discarded_no_receiver += stats.bytes_discarded_no_receiver;
138 stats_.packets_discarded_no_receiver_size.AddSamples(
139 stats.packets_discarded_no_receiver_size);
140 if (stats_.first_packet_received_time > stats.first_packet_received_time) {
141 stats_.first_packet_received_time = stats.first_packet_received_time;
142 stats_.first_received_packet_size = stats.first_received_packet_size;
143 }
144 if (stats_.last_packet_received_time < stats.last_packet_received_time) {
145 stats_.last_packet_received_time = stats.last_packet_received_time;
146 }
147 }
148
Build() const149 EmulatedNetworkIncomingStats EmulatedNetworkIncomingStatsBuilder::Build()
150 const {
151 RTC_DCHECK_RUN_ON(&sequence_checker_);
152 return stats_;
153 }
154
EmulatedNetworkStatsBuilder(EmulatedNetworkStatsGatheringMode stats_gathering_mode)155 EmulatedNetworkStatsBuilder::EmulatedNetworkStatsBuilder(
156 EmulatedNetworkStatsGatheringMode stats_gathering_mode)
157 : stats_gathering_mode_(stats_gathering_mode) {
158 sequence_checker_.Detach();
159 }
160
EmulatedNetworkStatsBuilder(rtc::IPAddress local_ip,EmulatedNetworkStatsGatheringMode stats_gathering_mode)161 EmulatedNetworkStatsBuilder::EmulatedNetworkStatsBuilder(
162 rtc::IPAddress local_ip,
163 EmulatedNetworkStatsGatheringMode stats_gathering_mode)
164 : stats_gathering_mode_(stats_gathering_mode) {
165 local_addresses_.push_back(local_ip);
166 sequence_checker_.Detach();
167 }
168
OnPacketSent(Timestamp queued_time,Timestamp sent_time,rtc::IPAddress destination_ip,DataSize packet_size)169 void EmulatedNetworkStatsBuilder::OnPacketSent(Timestamp queued_time,
170 Timestamp sent_time,
171 rtc::IPAddress destination_ip,
172 DataSize packet_size) {
173 RTC_DCHECK_RUN_ON(&sequence_checker_);
174 if (stats_gathering_mode_ == EmulatedNetworkStatsGatheringMode::kDebug) {
175 sent_packets_queue_wait_time_us_.AddSample((sent_time - queued_time).us());
176 }
177 auto it = outgoing_stats_per_destination_.find(destination_ip);
178 if (it == outgoing_stats_per_destination_.end()) {
179 outgoing_stats_per_destination_
180 .emplace(destination_ip,
181 std::make_unique<EmulatedNetworkOutgoingStatsBuilder>(
182 stats_gathering_mode_))
183 .first->second->OnPacketSent(sent_time, packet_size);
184 } else {
185 it->second->OnPacketSent(sent_time, packet_size);
186 }
187 }
188
OnPacketDropped(rtc::IPAddress source_ip,DataSize packet_size)189 void EmulatedNetworkStatsBuilder::OnPacketDropped(rtc::IPAddress source_ip,
190 DataSize packet_size) {
191 RTC_DCHECK_RUN_ON(&sequence_checker_);
192 auto it = incoming_stats_per_source_.find(source_ip);
193 if (it == incoming_stats_per_source_.end()) {
194 incoming_stats_per_source_
195 .emplace(source_ip,
196 std::make_unique<EmulatedNetworkIncomingStatsBuilder>(
197 stats_gathering_mode_))
198 .first->second->OnPacketDropped(packet_size);
199 } else {
200 it->second->OnPacketDropped(packet_size);
201 }
202 }
203
OnPacketReceived(Timestamp received_time,rtc::IPAddress source_ip,DataSize packet_size)204 void EmulatedNetworkStatsBuilder::OnPacketReceived(Timestamp received_time,
205 rtc::IPAddress source_ip,
206 DataSize packet_size) {
207 RTC_DCHECK_RUN_ON(&sequence_checker_);
208 auto it = incoming_stats_per_source_.find(source_ip);
209 if (it == incoming_stats_per_source_.end()) {
210 incoming_stats_per_source_
211 .emplace(source_ip,
212 std::make_unique<EmulatedNetworkIncomingStatsBuilder>(
213 stats_gathering_mode_))
214 .first->second->OnPacketReceived(received_time, packet_size);
215 } else {
216 it->second->OnPacketReceived(received_time, packet_size);
217 }
218 }
219
AddEmulatedNetworkStats(const EmulatedNetworkStats & stats)220 void EmulatedNetworkStatsBuilder::AddEmulatedNetworkStats(
221 const EmulatedNetworkStats& stats) {
222 RTC_DCHECK_RUN_ON(&sequence_checker_);
223
224 // Append IPs from other endpoints stats to the builder.
225 for (const rtc::IPAddress& addr : stats.local_addresses) {
226 local_addresses_.push_back(addr);
227 }
228
229 sent_packets_queue_wait_time_us_.AddSamples(
230 stats.sent_packets_queue_wait_time_us);
231
232 // Add outgoing stats from other endpoints to the builder.
233 for (const auto& entry : stats.outgoing_stats_per_destination) {
234 auto it = outgoing_stats_per_destination_.find(entry.first);
235 if (it == outgoing_stats_per_destination_.end()) {
236 outgoing_stats_per_destination_
237 .emplace(entry.first,
238 std::make_unique<EmulatedNetworkOutgoingStatsBuilder>(
239 stats_gathering_mode_))
240 .first->second->AddOutgoingStats(entry.second);
241 } else {
242 it->second->AddOutgoingStats(entry.second);
243 }
244 }
245
246 // Add incoming stats from other endpoints to the builder.
247 for (const auto& entry : stats.incoming_stats_per_source) {
248 auto it = incoming_stats_per_source_.find(entry.first);
249 if (it == incoming_stats_per_source_.end()) {
250 incoming_stats_per_source_
251 .emplace(entry.first,
252 std::make_unique<EmulatedNetworkIncomingStatsBuilder>(
253 stats_gathering_mode_))
254 .first->second->AddIncomingStats(entry.second);
255 } else {
256 it->second->AddIncomingStats(entry.second);
257 }
258 }
259 }
260
Build() const261 EmulatedNetworkStats EmulatedNetworkStatsBuilder::Build() const {
262 RTC_DCHECK_RUN_ON(&sequence_checker_);
263 std::map<rtc::IPAddress, EmulatedNetworkOutgoingStats> outgoing_stats;
264 for (const auto& entry : outgoing_stats_per_destination_) {
265 outgoing_stats.emplace(entry.first, entry.second->Build());
266 }
267 std::map<rtc::IPAddress, EmulatedNetworkIncomingStats> incoming_stats;
268 for (const auto& entry : incoming_stats_per_source_) {
269 incoming_stats.emplace(entry.first, entry.second->Build());
270 }
271 return EmulatedNetworkStats{
272 .local_addresses = local_addresses_,
273 .overall_outgoing_stats =
274 GetOverallOutgoingStats(outgoing_stats, stats_gathering_mode_),
275 .overall_incoming_stats =
276 GetOverallIncomingStats(incoming_stats, stats_gathering_mode_),
277 .outgoing_stats_per_destination = std::move(outgoing_stats),
278 .incoming_stats_per_source = std::move(incoming_stats),
279 .sent_packets_queue_wait_time_us = sent_packets_queue_wait_time_us_};
280 }
281
EmulatedNetworkNodeStatsBuilder(EmulatedNetworkStatsGatheringMode stats_gathering_mode)282 EmulatedNetworkNodeStatsBuilder::EmulatedNetworkNodeStatsBuilder(
283 EmulatedNetworkStatsGatheringMode stats_gathering_mode)
284 : stats_gathering_mode_(stats_gathering_mode) {
285 sequence_checker_.Detach();
286 }
287
AddPacketTransportTime(TimeDelta time,size_t packet_size)288 void EmulatedNetworkNodeStatsBuilder::AddPacketTransportTime(
289 TimeDelta time,
290 size_t packet_size) {
291 RTC_DCHECK_RUN_ON(&sequence_checker_);
292 if (stats_gathering_mode_ == EmulatedNetworkStatsGatheringMode::kDebug) {
293 stats_.packet_transport_time.AddSample(time.ms<double>());
294 stats_.size_to_packet_transport_time.AddSample(packet_size /
295 time.ms<double>());
296 }
297 }
298
AddEmulatedNetworkNodeStats(const EmulatedNetworkNodeStats & stats)299 void EmulatedNetworkNodeStatsBuilder::AddEmulatedNetworkNodeStats(
300 const EmulatedNetworkNodeStats& stats) {
301 RTC_DCHECK_RUN_ON(&sequence_checker_);
302 stats_.packet_transport_time.AddSamples(stats.packet_transport_time);
303 stats_.size_to_packet_transport_time.AddSamples(
304 stats.size_to_packet_transport_time);
305 }
306
Build() const307 EmulatedNetworkNodeStats EmulatedNetworkNodeStatsBuilder::Build() const {
308 RTC_DCHECK_RUN_ON(&sequence_checker_);
309 return stats_;
310 }
311
OnPacketReceived(EmulatedIpPacket packet)312 void LinkEmulation::OnPacketReceived(EmulatedIpPacket packet) {
313 task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
314 RTC_DCHECK_RUN_ON(task_queue_);
315
316 uint64_t packet_id = next_packet_id_++;
317 bool sent = network_behavior_->EnqueuePacket(PacketInFlightInfo(
318 packet.ip_packet_size(), packet.arrival_time.us(), packet_id));
319 if (sent) {
320 packets_.emplace_back(StoredPacket{.id = packet_id,
321 .sent_time = clock_->CurrentTime(),
322 .packet = std::move(packet),
323 .removed = false});
324 }
325 if (process_task_.Running())
326 return;
327 absl::optional<int64_t> next_time_us =
328 network_behavior_->NextDeliveryTimeUs();
329 if (!next_time_us)
330 return;
331 Timestamp current_time = clock_->CurrentTime();
332 process_task_ = RepeatingTaskHandle::DelayedStart(
333 task_queue_->Get(),
334 std::max(TimeDelta::Zero(),
335 Timestamp::Micros(*next_time_us) - current_time),
336 [this]() {
337 RTC_DCHECK_RUN_ON(task_queue_);
338 Timestamp current_time = clock_->CurrentTime();
339 Process(current_time);
340 absl::optional<int64_t> next_time_us =
341 network_behavior_->NextDeliveryTimeUs();
342 if (!next_time_us) {
343 process_task_.Stop();
344 return TimeDelta::Zero(); // This is ignored.
345 }
346 RTC_DCHECK_GE(*next_time_us, current_time.us());
347 return Timestamp::Micros(*next_time_us) - current_time;
348 });
349 });
350 }
351
stats() const352 EmulatedNetworkNodeStats LinkEmulation::stats() const {
353 RTC_DCHECK_RUN_ON(task_queue_);
354 return stats_builder_.Build();
355 }
356
Process(Timestamp at_time)357 void LinkEmulation::Process(Timestamp at_time) {
358 std::vector<PacketDeliveryInfo> delivery_infos =
359 network_behavior_->DequeueDeliverablePackets(at_time.us());
360 for (PacketDeliveryInfo& delivery_info : delivery_infos) {
361 StoredPacket* packet = nullptr;
362 for (auto& stored_packet : packets_) {
363 if (stored_packet.id == delivery_info.packet_id) {
364 packet = &stored_packet;
365 break;
366 }
367 }
368 RTC_CHECK(packet);
369 RTC_DCHECK(!packet->removed);
370 packet->removed = true;
371 stats_builder_.AddPacketTransportTime(
372 clock_->CurrentTime() - packet->sent_time,
373 packet->packet.ip_packet_size());
374
375 if (delivery_info.receive_time_us != PacketDeliveryInfo::kNotReceived) {
376 packet->packet.arrival_time =
377 Timestamp::Micros(delivery_info.receive_time_us);
378 receiver_->OnPacketReceived(std::move(packet->packet));
379 }
380 while (!packets_.empty() && packets_.front().removed) {
381 packets_.pop_front();
382 }
383 }
384 }
385
NetworkRouterNode(rtc::TaskQueue * task_queue)386 NetworkRouterNode::NetworkRouterNode(rtc::TaskQueue* task_queue)
387 : task_queue_(task_queue) {}
388
OnPacketReceived(EmulatedIpPacket packet)389 void NetworkRouterNode::OnPacketReceived(EmulatedIpPacket packet) {
390 RTC_DCHECK_RUN_ON(task_queue_);
391 if (watcher_) {
392 watcher_(packet);
393 }
394 if (filter_) {
395 if (!filter_(packet))
396 return;
397 }
398 auto receiver_it = routing_.find(packet.to.ipaddr());
399 if (receiver_it == routing_.end()) {
400 if (default_receiver_.has_value()) {
401 (*default_receiver_)->OnPacketReceived(std::move(packet));
402 }
403 return;
404 }
405 RTC_CHECK(receiver_it != routing_.end());
406
407 receiver_it->second->OnPacketReceived(std::move(packet));
408 }
409
SetReceiver(const rtc::IPAddress & dest_ip,EmulatedNetworkReceiverInterface * receiver)410 void NetworkRouterNode::SetReceiver(
411 const rtc::IPAddress& dest_ip,
412 EmulatedNetworkReceiverInterface* receiver) {
413 task_queue_->PostTask([=] {
414 RTC_DCHECK_RUN_ON(task_queue_);
415 EmulatedNetworkReceiverInterface* cur_receiver = routing_[dest_ip];
416 RTC_CHECK(cur_receiver == nullptr || cur_receiver == receiver)
417 << "Routing for dest_ip=" << dest_ip.ToString() << " already exists";
418 routing_[dest_ip] = receiver;
419 });
420 }
421
RemoveReceiver(const rtc::IPAddress & dest_ip)422 void NetworkRouterNode::RemoveReceiver(const rtc::IPAddress& dest_ip) {
423 RTC_DCHECK_RUN_ON(task_queue_);
424 routing_.erase(dest_ip);
425 }
426
SetDefaultReceiver(EmulatedNetworkReceiverInterface * receiver)427 void NetworkRouterNode::SetDefaultReceiver(
428 EmulatedNetworkReceiverInterface* receiver) {
429 task_queue_->PostTask([=] {
430 RTC_DCHECK_RUN_ON(task_queue_);
431 if (default_receiver_.has_value()) {
432 RTC_CHECK_EQ(*default_receiver_, receiver)
433 << "Router already default receiver";
434 }
435 default_receiver_ = receiver;
436 });
437 }
438
RemoveDefaultReceiver()439 void NetworkRouterNode::RemoveDefaultReceiver() {
440 RTC_DCHECK_RUN_ON(task_queue_);
441 default_receiver_ = absl::nullopt;
442 }
443
SetWatcher(std::function<void (const EmulatedIpPacket &)> watcher)444 void NetworkRouterNode::SetWatcher(
445 std::function<void(const EmulatedIpPacket&)> watcher) {
446 task_queue_->PostTask([=] {
447 RTC_DCHECK_RUN_ON(task_queue_);
448 watcher_ = watcher;
449 });
450 }
451
SetFilter(std::function<bool (const EmulatedIpPacket &)> filter)452 void NetworkRouterNode::SetFilter(
453 std::function<bool(const EmulatedIpPacket&)> filter) {
454 task_queue_->PostTask([=] {
455 RTC_DCHECK_RUN_ON(task_queue_);
456 filter_ = filter;
457 });
458 }
459
EmulatedNetworkNode(Clock * clock,rtc::TaskQueue * task_queue,std::unique_ptr<NetworkBehaviorInterface> network_behavior,EmulatedNetworkStatsGatheringMode stats_gathering_mode)460 EmulatedNetworkNode::EmulatedNetworkNode(
461 Clock* clock,
462 rtc::TaskQueue* task_queue,
463 std::unique_ptr<NetworkBehaviorInterface> network_behavior,
464 EmulatedNetworkStatsGatheringMode stats_gathering_mode)
465 : router_(task_queue),
466 link_(clock,
467 task_queue,
468 std::move(network_behavior),
469 &router_,
470 stats_gathering_mode) {}
471
OnPacketReceived(EmulatedIpPacket packet)472 void EmulatedNetworkNode::OnPacketReceived(EmulatedIpPacket packet) {
473 link_.OnPacketReceived(std::move(packet));
474 }
475
stats() const476 EmulatedNetworkNodeStats EmulatedNetworkNode::stats() const {
477 return link_.stats();
478 }
479
CreateRoute(const rtc::IPAddress & receiver_ip,std::vector<EmulatedNetworkNode * > nodes,EmulatedNetworkReceiverInterface * receiver)480 void EmulatedNetworkNode::CreateRoute(
481 const rtc::IPAddress& receiver_ip,
482 std::vector<EmulatedNetworkNode*> nodes,
483 EmulatedNetworkReceiverInterface* receiver) {
484 RTC_CHECK(!nodes.empty());
485 for (size_t i = 0; i + 1 < nodes.size(); ++i)
486 nodes[i]->router()->SetReceiver(receiver_ip, nodes[i + 1]);
487 nodes.back()->router()->SetReceiver(receiver_ip, receiver);
488 }
489
ClearRoute(const rtc::IPAddress & receiver_ip,std::vector<EmulatedNetworkNode * > nodes)490 void EmulatedNetworkNode::ClearRoute(const rtc::IPAddress& receiver_ip,
491 std::vector<EmulatedNetworkNode*> nodes) {
492 for (EmulatedNetworkNode* node : nodes)
493 node->router()->RemoveReceiver(receiver_ip);
494 }
495
496 EmulatedNetworkNode::~EmulatedNetworkNode() = default;
497
Options(uint64_t id,const rtc::IPAddress & ip,const EmulatedEndpointConfig & config,EmulatedNetworkStatsGatheringMode stats_gathering_mode)498 EmulatedEndpointImpl::Options::Options(
499 uint64_t id,
500 const rtc::IPAddress& ip,
501 const EmulatedEndpointConfig& config,
502 EmulatedNetworkStatsGatheringMode stats_gathering_mode)
503 : id(id),
504 ip(ip),
505 stats_gathering_mode(stats_gathering_mode),
506 type(config.type),
507 allow_send_packet_with_different_source_ip(
508 config.allow_send_packet_with_different_source_ip),
509 allow_receive_packets_with_different_dest_ip(
510 config.allow_receive_packets_with_different_dest_ip),
511 log_name(ip.ToString() + " (" + config.name.value_or("") + ")") {}
512
EmulatedEndpointImpl(const Options & options,bool is_enabled,rtc::TaskQueue * task_queue,Clock * clock)513 EmulatedEndpointImpl::EmulatedEndpointImpl(const Options& options,
514 bool is_enabled,
515 rtc::TaskQueue* task_queue,
516 Clock* clock)
517 : options_(options),
518 is_enabled_(is_enabled),
519 clock_(clock),
520 task_queue_(task_queue),
521 router_(task_queue_),
522 next_port_(kFirstEphemeralPort),
523 stats_builder_(options_.ip, options_.stats_gathering_mode) {
524 constexpr int kIPv4NetworkPrefixLength = 24;
525 constexpr int kIPv6NetworkPrefixLength = 64;
526
527 int prefix_length = 0;
528 if (options_.ip.family() == AF_INET) {
529 prefix_length = kIPv4NetworkPrefixLength;
530 } else if (options_.ip.family() == AF_INET6) {
531 prefix_length = kIPv6NetworkPrefixLength;
532 }
533 rtc::IPAddress prefix = TruncateIP(options_.ip, prefix_length);
534 network_ = std::make_unique<rtc::Network>(
535 options_.ip.ToString(), "Endpoint id=" + std::to_string(options_.id),
536 prefix, prefix_length, options_.type);
537 network_->AddIP(options_.ip);
538
539 enabled_state_checker_.Detach();
540 RTC_LOG(LS_INFO) << "Created emulated endpoint " << options_.log_name
541 << "; id=" << options_.id;
542 }
543 EmulatedEndpointImpl::~EmulatedEndpointImpl() = default;
544
GetId() const545 uint64_t EmulatedEndpointImpl::GetId() const {
546 return options_.id;
547 }
548
SendPacket(const rtc::SocketAddress & from,const rtc::SocketAddress & to,rtc::CopyOnWriteBuffer packet_data,uint16_t application_overhead)549 void EmulatedEndpointImpl::SendPacket(const rtc::SocketAddress& from,
550 const rtc::SocketAddress& to,
551 rtc::CopyOnWriteBuffer packet_data,
552 uint16_t application_overhead) {
553 if (!options_.allow_send_packet_with_different_source_ip) {
554 RTC_CHECK(from.ipaddr() == options_.ip);
555 }
556 EmulatedIpPacket packet(from, to, std::move(packet_data),
557 clock_->CurrentTime(), application_overhead);
558 task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
559 RTC_DCHECK_RUN_ON(task_queue_);
560 stats_builder_.OnPacketSent(packet.arrival_time, clock_->CurrentTime(),
561 packet.to.ipaddr(),
562 DataSize::Bytes(packet.ip_packet_size()));
563
564 if (packet.to.ipaddr() == options_.ip) {
565 OnPacketReceived(std::move(packet));
566 } else {
567 router_.OnPacketReceived(std::move(packet));
568 }
569 });
570 }
571
BindReceiver(uint16_t desired_port,EmulatedNetworkReceiverInterface * receiver)572 absl::optional<uint16_t> EmulatedEndpointImpl::BindReceiver(
573 uint16_t desired_port,
574 EmulatedNetworkReceiverInterface* receiver) {
575 return BindReceiverInternal(desired_port, receiver, /*is_one_shot=*/false);
576 }
577
BindOneShotReceiver(uint16_t desired_port,EmulatedNetworkReceiverInterface * receiver)578 absl::optional<uint16_t> EmulatedEndpointImpl::BindOneShotReceiver(
579 uint16_t desired_port,
580 EmulatedNetworkReceiverInterface* receiver) {
581 return BindReceiverInternal(desired_port, receiver, /*is_one_shot=*/true);
582 }
583
BindReceiverInternal(uint16_t desired_port,EmulatedNetworkReceiverInterface * receiver,bool is_one_shot)584 absl::optional<uint16_t> EmulatedEndpointImpl::BindReceiverInternal(
585 uint16_t desired_port,
586 EmulatedNetworkReceiverInterface* receiver,
587 bool is_one_shot) {
588 MutexLock lock(&receiver_lock_);
589 uint16_t port = desired_port;
590 if (port == 0) {
591 // Because client can specify its own port, next_port_ can be already in
592 // use, so we need to find next available port.
593 int ports_pool_size =
594 std::numeric_limits<uint16_t>::max() - kFirstEphemeralPort + 1;
595 for (int i = 0; i < ports_pool_size; ++i) {
596 uint16_t next_port = NextPort();
597 if (port_to_receiver_.find(next_port) == port_to_receiver_.end()) {
598 port = next_port;
599 break;
600 }
601 }
602 }
603 RTC_CHECK(port != 0) << "Can't find free port for receiver in endpoint "
604 << options_.log_name << "; id=" << options_.id;
605 bool result =
606 port_to_receiver_.insert({port, {receiver, is_one_shot}}).second;
607 if (!result) {
608 RTC_LOG(LS_INFO) << "Can't bind receiver to used port " << desired_port
609 << " in endpoint " << options_.log_name
610 << "; id=" << options_.id;
611 return absl::nullopt;
612 }
613 RTC_LOG(LS_INFO) << "New receiver is binded to endpoint " << options_.log_name
614 << "; id=" << options_.id << " on port " << port;
615 return port;
616 }
617
NextPort()618 uint16_t EmulatedEndpointImpl::NextPort() {
619 uint16_t out = next_port_;
620 if (next_port_ == std::numeric_limits<uint16_t>::max()) {
621 next_port_ = kFirstEphemeralPort;
622 } else {
623 next_port_++;
624 }
625 return out;
626 }
627
UnbindReceiver(uint16_t port)628 void EmulatedEndpointImpl::UnbindReceiver(uint16_t port) {
629 MutexLock lock(&receiver_lock_);
630 RTC_LOG(LS_INFO) << "Receiver is removed on port " << port
631 << " from endpoint " << options_.log_name
632 << "; id=" << options_.id;
633 port_to_receiver_.erase(port);
634 }
635
BindDefaultReceiver(EmulatedNetworkReceiverInterface * receiver)636 void EmulatedEndpointImpl::BindDefaultReceiver(
637 EmulatedNetworkReceiverInterface* receiver) {
638 MutexLock lock(&receiver_lock_);
639 RTC_CHECK(!default_receiver_.has_value())
640 << "Endpoint " << options_.log_name << "; id=" << options_.id
641 << " already has default receiver";
642 RTC_LOG(LS_INFO) << "Default receiver is binded to endpoint "
643 << options_.log_name << "; id=" << options_.id;
644 default_receiver_ = receiver;
645 }
646
UnbindDefaultReceiver()647 void EmulatedEndpointImpl::UnbindDefaultReceiver() {
648 MutexLock lock(&receiver_lock_);
649 RTC_LOG(LS_INFO) << "Default receiver is removed from endpoint "
650 << options_.log_name << "; id=" << options_.id;
651 default_receiver_ = absl::nullopt;
652 }
653
GetPeerLocalAddress() const654 rtc::IPAddress EmulatedEndpointImpl::GetPeerLocalAddress() const {
655 return options_.ip;
656 }
657
OnPacketReceived(EmulatedIpPacket packet)658 void EmulatedEndpointImpl::OnPacketReceived(EmulatedIpPacket packet) {
659 RTC_DCHECK_RUN_ON(task_queue_);
660 if (!options_.allow_receive_packets_with_different_dest_ip) {
661 RTC_CHECK(packet.to.ipaddr() == options_.ip)
662 << "Routing error: wrong destination endpoint. Packet.to.ipaddr()=: "
663 << packet.to.ipaddr().ToString()
664 << "; Receiver options_.ip=" << options_.ip.ToString();
665 }
666 MutexLock lock(&receiver_lock_);
667 stats_builder_.OnPacketReceived(clock_->CurrentTime(), packet.from.ipaddr(),
668 DataSize::Bytes(packet.ip_packet_size()));
669 auto it = port_to_receiver_.find(packet.to.port());
670 if (it == port_to_receiver_.end()) {
671 if (default_receiver_.has_value()) {
672 (*default_receiver_)->OnPacketReceived(std::move(packet));
673 return;
674 }
675 // It can happen, that remote peer closed connection, but there still some
676 // packets, that are going to it. It can happen during peer connection close
677 // process: one peer closed connection, second still sending data.
678 RTC_LOG(LS_INFO) << "Drop packet: no receiver registered in "
679 << options_.log_name << "; id=" << options_.id
680 << " on port " << packet.to.port()
681 << ". Packet source: " << packet.from.ToString();
682 stats_builder_.OnPacketDropped(packet.from.ipaddr(),
683 DataSize::Bytes(packet.ip_packet_size()));
684 return;
685 }
686 // Endpoint holds lock during packet processing to ensure that a call to
687 // UnbindReceiver followed by a delete of the receiver cannot race with this
688 // call to OnPacketReceived.
689 it->second.receiver->OnPacketReceived(std::move(packet));
690
691 if (it->second.is_one_shot) {
692 port_to_receiver_.erase(it);
693 }
694 }
695
Enable()696 void EmulatedEndpointImpl::Enable() {
697 RTC_DCHECK_RUN_ON(&enabled_state_checker_);
698 RTC_CHECK(!is_enabled_);
699 is_enabled_ = true;
700 }
701
Disable()702 void EmulatedEndpointImpl::Disable() {
703 RTC_DCHECK_RUN_ON(&enabled_state_checker_);
704 RTC_CHECK(is_enabled_);
705 is_enabled_ = false;
706 }
707
Enabled() const708 bool EmulatedEndpointImpl::Enabled() const {
709 RTC_DCHECK_RUN_ON(&enabled_state_checker_);
710 return is_enabled_;
711 }
712
stats() const713 EmulatedNetworkStats EmulatedEndpointImpl::stats() const {
714 RTC_DCHECK_RUN_ON(task_queue_);
715 return stats_builder_.Build();
716 }
717
LookupByLocalAddress(const rtc::IPAddress & local_ip) const718 EmulatedEndpointImpl* EndpointsContainer::LookupByLocalAddress(
719 const rtc::IPAddress& local_ip) const {
720 for (auto* endpoint : endpoints_) {
721 rtc::IPAddress peer_local_address = endpoint->GetPeerLocalAddress();
722 if (peer_local_address == local_ip) {
723 return endpoint;
724 }
725 }
726 RTC_CHECK(false) << "No network found for address" << local_ip.ToString();
727 }
728
EndpointsContainer(const std::vector<EmulatedEndpointImpl * > & endpoints,EmulatedNetworkStatsGatheringMode stats_gathering_mode)729 EndpointsContainer::EndpointsContainer(
730 const std::vector<EmulatedEndpointImpl*>& endpoints,
731 EmulatedNetworkStatsGatheringMode stats_gathering_mode)
732 : endpoints_(endpoints), stats_gathering_mode_(stats_gathering_mode) {}
733
HasEndpoint(EmulatedEndpointImpl * endpoint) const734 bool EndpointsContainer::HasEndpoint(EmulatedEndpointImpl* endpoint) const {
735 for (auto* e : endpoints_) {
736 if (e->GetId() == endpoint->GetId()) {
737 return true;
738 }
739 }
740 return false;
741 }
742
743 std::vector<std::unique_ptr<rtc::Network>>
GetEnabledNetworks() const744 EndpointsContainer::GetEnabledNetworks() const {
745 std::vector<std::unique_ptr<rtc::Network>> networks;
746 for (auto* endpoint : endpoints_) {
747 if (endpoint->Enabled()) {
748 networks.emplace_back(
749 std::make_unique<rtc::Network>(endpoint->network()));
750 }
751 }
752 return networks;
753 }
754
GetEndpoints() const755 std::vector<EmulatedEndpoint*> EndpointsContainer::GetEndpoints() const {
756 return std::vector<EmulatedEndpoint*>(endpoints_.begin(), endpoints_.end());
757 }
758
GetStats() const759 EmulatedNetworkStats EndpointsContainer::GetStats() const {
760 EmulatedNetworkStatsBuilder stats_builder(stats_gathering_mode_);
761 for (auto* endpoint : endpoints_) {
762 stats_builder.AddEmulatedNetworkStats(endpoint->stats());
763 }
764 return stats_builder.Build();
765 }
766
767 } // namespace webrtc
768