1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include "../include/endpoint_manager_impl.hpp"
7
8 #include <vsomeip/internal/logger.hpp>
9
10 #include "../include/udp_client_endpoint_impl.hpp"
11 #include "../include/udp_server_endpoint_impl.hpp"
12 #include "../include/tcp_client_endpoint_impl.hpp"
13 #include "../include/tcp_server_endpoint_impl.hpp"
14 #include "../include/local_server_endpoint_impl.hpp"
15 #include "../include/virtual_server_endpoint_impl.hpp"
16 #include "../include/endpoint_definition.hpp"
17 #include "../../routing/include/routing_manager_base.hpp"
18 #include "../../routing/include/routing_manager_impl.hpp"
19 #include "../../routing/include/routing_host.hpp"
20 #include "../../security/include/security.hpp"
21 #include "../../utility/include/utility.hpp"
22 #include "../../utility/include/byteorder.hpp"
23
24
25 #include <forward_list>
26 #include <iomanip>
27
28 #ifndef WITHOUT_SYSTEMD
29 #include <systemd/sd-daemon.h>
30 #endif
31 #define SD_LISTEN_FDS_START 3
32
33 namespace vsomeip_v3 {
34
endpoint_manager_impl(routing_manager_base * const _rm,boost::asio::io_service & _io,const std::shared_ptr<configuration> & _configuration)35 endpoint_manager_impl::endpoint_manager_impl(
36 routing_manager_base* const _rm, boost::asio::io_service& _io,
37 const std::shared_ptr<configuration>& _configuration) :
38 endpoint_manager_base(_rm, _io, _configuration) {
39 }
40
find_or_create_remote_client(service_t _service,instance_t _instance,bool _reliable)41 std::shared_ptr<endpoint> endpoint_manager_impl::find_or_create_remote_client(
42 service_t _service, instance_t _instance, bool _reliable) {
43 std::shared_ptr<endpoint> its_endpoint;
44 bool start_endpoint(false);
45 {
46 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
47 its_endpoint = find_remote_client(_service, _instance, _reliable);
48 if (!its_endpoint) {
49 its_endpoint = create_remote_client(_service, _instance, _reliable);
50 start_endpoint = true;
51 }
52 }
53 if (start_endpoint && its_endpoint
54 && configuration_->is_someip(_service, _instance)) {
55 its_endpoint->start();
56 }
57 return its_endpoint;
58 }
59
find_or_create_remote_client(service_t _service,instance_t _instance)60 void endpoint_manager_impl::find_or_create_remote_client(
61 service_t _service, instance_t _instance) {
62 std::shared_ptr<endpoint> its_reliable_endpoint;
63 std::shared_ptr<endpoint> its_unreliable_endpoint;
64 bool start_reliable_endpoint(false);
65 bool start_unreliable_endpoint(false);
66 {
67 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
68 its_reliable_endpoint = find_remote_client(_service, _instance, true);
69 if (!its_reliable_endpoint) {
70 its_reliable_endpoint = create_remote_client(_service, _instance, true);
71 start_reliable_endpoint = true;
72 }
73 its_unreliable_endpoint = find_remote_client(_service, _instance, false);
74 if (!its_unreliable_endpoint) {
75 its_unreliable_endpoint = create_remote_client(_service, _instance, false);
76 start_unreliable_endpoint = true;
77 }
78 }
79 const bool is_someip = configuration_->is_someip(_service, _instance);
80 if (start_reliable_endpoint && its_reliable_endpoint && is_someip) {
81 its_reliable_endpoint->start();
82 }
83 if (start_unreliable_endpoint && its_unreliable_endpoint && is_someip) {
84 its_unreliable_endpoint->start();
85 }
86 }
87
is_remote_service_known(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,const boost::asio::ip::address & _reliable_address,uint16_t _reliable_port,bool * _reliable_known,const boost::asio::ip::address & _unreliable_address,uint16_t _unreliable_port,bool * _unreliable_known) const88 void endpoint_manager_impl::is_remote_service_known(
89 service_t _service, instance_t _instance, major_version_t _major,
90 minor_version_t _minor,
91 const boost::asio::ip::address &_reliable_address,
92 uint16_t _reliable_port, bool* _reliable_known,
93 const boost::asio::ip::address &_unreliable_address,
94 uint16_t _unreliable_port, bool* _unreliable_known) const {
95
96 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
97 auto found_service = remote_service_info_.find(_service);
98 if (found_service != remote_service_info_.end()) {
99 auto found_instance = found_service->second.find(_instance);
100 if (found_instance != found_service->second.end()) {
101 std::shared_ptr<endpoint_definition> its_definition;
102 if (_reliable_port != ILLEGAL_PORT) {
103 auto found_reliable = found_instance->second.find(true);
104 if (found_reliable != found_instance->second.end()) {
105 its_definition = found_reliable->second;
106 if (its_definition->get_address() == _reliable_address
107 && its_definition->get_port() == _reliable_port) {
108 *_reliable_known = true;
109 } else {
110 VSOMEIP_WARNING << "Reliable service endpoint has changed: ["
111 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
112 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
113 << std::dec << static_cast<std::uint32_t>(_major) << "."
114 << std::dec << _minor << "] old: "
115 << its_definition->get_address().to_string() << ":"
116 << its_definition->get_port() << " new: "
117 << _reliable_address.to_string() << ":"
118 << _reliable_port;
119 }
120 }
121 }
122 if (_unreliable_port != ILLEGAL_PORT) {
123 auto found_unreliable = found_instance->second.find(false);
124 if (found_unreliable != found_instance->second.end()) {
125 its_definition = found_unreliable->second;
126 if (its_definition->get_address() == _unreliable_address
127 && its_definition->get_port() == _unreliable_port) {
128 *_unreliable_known = true;
129 } else {
130 VSOMEIP_WARNING << "Unreliable service endpoint has changed: ["
131 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
132 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
133 << std::dec << static_cast<std::uint32_t>(_major) << "."
134 << std::dec << _minor << "] old: "
135 << its_definition->get_address().to_string() << ":"
136 << its_definition->get_port() << " new: "
137 << _unreliable_address.to_string() << ":"
138 << _unreliable_port;
139 }
140 }
141 }
142 }
143 }
144 }
145
add_remote_service_info(service_t _service,instance_t _instance,const std::shared_ptr<endpoint_definition> & _ep_definition)146 void endpoint_manager_impl::add_remote_service_info(
147 service_t _service, instance_t _instance,
148 const std::shared_ptr<endpoint_definition>& _ep_definition) {
149
150 std::shared_ptr<serviceinfo> its_info;
151 std::shared_ptr<endpoint> its_endpoint;
152 bool must_report(false);
153 {
154 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
155 remote_service_info_[_service][_instance][_ep_definition->is_reliable()] =
156 _ep_definition;
157
158 if (_ep_definition->is_reliable()) {
159 its_endpoint = find_remote_client(_service, _instance, true);
160 must_report = (its_endpoint && its_endpoint->is_established_or_connected());
161 if (must_report)
162 its_info = rm_->find_service(_service, _instance);
163 }
164 }
165
166 if (must_report)
167 static_cast<routing_manager_impl*>(rm_)->service_endpoint_connected(
168 _service, _instance, its_info->get_major(), its_info->get_minor(),
169 its_endpoint, false);
170 }
171
add_remote_service_info(service_t _service,instance_t _instance,const std::shared_ptr<endpoint_definition> & _ep_definition_reliable,const std::shared_ptr<endpoint_definition> & _ep_definition_unreliable)172 void endpoint_manager_impl::add_remote_service_info(
173 service_t _service, instance_t _instance,
174 const std::shared_ptr<endpoint_definition>& _ep_definition_reliable,
175 const std::shared_ptr<endpoint_definition>& _ep_definition_unreliable) {
176
177 std::shared_ptr<serviceinfo> its_info;
178 std::shared_ptr<endpoint> its_reliable, its_unreliable;
179 bool must_report(false);
180 {
181 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
182 remote_service_info_[_service][_instance][false] = _ep_definition_unreliable;
183 remote_service_info_[_service][_instance][true] = _ep_definition_reliable;
184
185 its_unreliable = find_remote_client(_service, _instance, false);
186 its_reliable = find_remote_client(_service, _instance, true);
187
188 must_report = (its_unreliable && its_unreliable->is_established_or_connected()
189 && its_reliable && its_reliable->is_established_or_connected());
190
191 if (must_report)
192 its_info = rm_->find_service(_service, _instance);
193 }
194
195 if (must_report) {
196 static_cast<routing_manager_impl*>(rm_)->service_endpoint_connected(
197 _service, _instance, its_info->get_major(), its_info->get_minor(),
198 its_unreliable, false);
199 static_cast<routing_manager_impl*>(rm_)->service_endpoint_connected(
200 _service, _instance, its_info->get_major(), its_info->get_minor(),
201 its_reliable, false);
202 }
203 }
204
clear_remote_service_info(service_t _service,instance_t _instance,bool _reliable)205 void endpoint_manager_impl::clear_remote_service_info(service_t _service,
206 instance_t _instance,
207 bool _reliable) {
208 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
209 const auto found_service = remote_service_info_.find(_service);
210 if (found_service != remote_service_info_.end()) {
211 const auto found_instance = found_service->second.find(_instance);
212 if (found_instance != found_service->second.end()) {
213 if (found_instance->second.erase(_reliable)) {
214 if (!found_instance->second.size()) {
215 found_service->second.erase(found_instance);
216 if (!found_service->second.size()) {
217 remote_service_info_.erase(found_service);
218 }
219 }
220 }
221 }
222 }
223 }
224
create_server_endpoint(uint16_t _port,bool _reliable,bool _start)225 std::shared_ptr<endpoint> endpoint_manager_impl::create_server_endpoint(
226 uint16_t _port, bool _reliable, bool _start) {
227 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
228 std::shared_ptr<endpoint> its_endpoint;
229 try {
230 boost::asio::ip::address its_unicast = configuration_->get_unicast_address();
231 const std::string its_unicast_str = its_unicast.to_string();
232 if (_start) {
233 if (_reliable) {
234 its_endpoint = std::make_shared<tcp_server_endpoint_impl>(
235 shared_from_this(),
236 rm_->shared_from_this(),
237 boost::asio::ip::tcp::endpoint(its_unicast, _port),
238 io_,
239 configuration_);
240 if (configuration_->has_enabled_magic_cookies(
241 its_unicast_str, _port) ||
242 configuration_->has_enabled_magic_cookies(
243 "local", _port)) {
244 its_endpoint->enable_magic_cookies();
245 }
246 } else {
247 its_endpoint = std::make_shared<udp_server_endpoint_impl>(
248 shared_from_this(),
249 rm_->shared_from_this(),
250 boost::asio::ip::udp::endpoint(its_unicast, _port),
251 io_,
252 configuration_);
253 }
254
255 } else {
256 its_endpoint = std::make_shared<virtual_server_endpoint_impl>(
257 its_unicast_str, _port, _reliable, io_);
258 }
259
260 if (its_endpoint) {
261 server_endpoints_[_port][_reliable] = its_endpoint;
262 its_endpoint->start();
263 }
264 } catch (const std::exception &e) {
265 VSOMEIP_ERROR << __func__
266 << " Server endpoint creation failed."
267 << " Reason: "<< e.what()
268 << " Port: " << _port
269 << " (reliable="
270 << (_reliable ? "reliable" : "unreliable")
271 << ")";
272 }
273
274 return (its_endpoint);
275 }
276
find_server_endpoint(uint16_t _port,bool _reliable) const277 std::shared_ptr<endpoint> endpoint_manager_impl::find_server_endpoint(
278 uint16_t _port, bool _reliable) const {
279 std::shared_ptr<endpoint> its_endpoint;
280 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
281 auto found_port = server_endpoints_.find(_port);
282 if (found_port != server_endpoints_.end()) {
283 auto found_endpoint = found_port->second.find(_reliable);
284 if (found_endpoint != found_port->second.end()) {
285 its_endpoint = found_endpoint->second;
286 }
287 }
288 return (its_endpoint);
289 }
290
find_or_create_server_endpoint(uint16_t _port,bool _reliable,bool _start,service_t _service,instance_t _instance,bool & _is_found,bool _is_multicast)291 std::shared_ptr<endpoint> endpoint_manager_impl::find_or_create_server_endpoint(
292 uint16_t _port, bool _reliable, bool _start, service_t _service,
293 instance_t _instance, bool &_is_found, bool _is_multicast) {
294 std::shared_ptr<endpoint> its_endpoint = find_server_endpoint(_port,
295 _reliable);
296 _is_found = false;
297 if (!its_endpoint) {
298 its_endpoint = create_server_endpoint(_port, _reliable, _start);
299 } else {
300 _is_found = true;
301 }
302 if (its_endpoint) {
303 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
304 if (!_is_multicast) {
305 service_instances_[_service][its_endpoint.get()] = _instance;
306 }
307 its_endpoint->increment_use_count();
308 }
309 return (its_endpoint);
310 }
311
remove_server_endpoint(uint16_t _port,bool _reliable)312 bool endpoint_manager_impl::remove_server_endpoint(uint16_t _port, bool _reliable) {
313 bool ret = false;
314 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
315 auto found_port = server_endpoints_.find(_port);
316 if (found_port != server_endpoints_.end()) {
317 auto found_reliable = found_port->second.find(_reliable);
318 if (found_reliable != found_port->second.end()) {
319 if (found_reliable->second->get_use_count() == 0 &&
320 found_port->second.erase(_reliable)) {
321 ret = true;
322 if (found_port->second.empty()) {
323 server_endpoints_.erase(found_port);
324 }
325 }
326 }
327 }
328 return ret;
329 }
330
331 void
clear_client_endpoints(service_t _service,instance_t _instance,bool _reliable)332 endpoint_manager_impl::clear_client_endpoints(
333 service_t _service, instance_t _instance, bool _reliable) {
334
335 std::shared_ptr<endpoint> endpoint_to_delete;
336 bool other_services_reachable_through_endpoint(false);
337 {
338 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
339 // Clear client endpoints for remote services (generic and specific ones)
340 const auto found_service = remote_services_.find(_service);
341 if (found_service != remote_services_.end()) {
342 const auto found_instance = found_service->second.find(_instance);
343 if (found_instance != found_service->second.end()) {
344 const auto found_reliability = found_instance->second.find(_reliable);
345 if (found_reliability != found_instance->second.end()) {
346 service_instances_[_service].erase(found_reliability->second.get());
347 endpoint_to_delete = found_reliability->second;
348
349 found_instance->second.erase(found_reliability);
350 if (found_instance->second.empty()) {
351 found_service->second.erase(found_instance);
352 if (found_service->second.empty()) {
353 remote_services_.erase(found_service);
354 }
355 }
356 }
357 }
358 }
359
360 // Only stop and delete the endpoint if none of the services
361 // reachable through it is online anymore.
362 if (endpoint_to_delete) {
363 for (const auto& service : remote_services_) {
364 for (const auto& instance : service.second) {
365 const auto found_reliability = instance.second.find(_reliable);
366 if (found_reliability != instance.second.end()
367 && found_reliability->second == endpoint_to_delete) {
368 other_services_reachable_through_endpoint = true;
369 break;
370 }
371 }
372 if (other_services_reachable_through_endpoint) { break; }
373 }
374
375 if (!other_services_reachable_through_endpoint) {
376 partition_id_t its_partition;
377 boost::asio::ip::address its_address;
378 std::uint16_t its_port(0);
379
380 its_partition = configuration_->get_partition_id(_service, _instance);
381
382 if (_reliable) {
383 std::shared_ptr<tcp_client_endpoint_impl> ep =
384 std::dynamic_pointer_cast<tcp_client_endpoint_impl>(endpoint_to_delete);
385 if (ep) {
386 its_port = ep->get_remote_port();
387 ep->get_remote_address(its_address);
388 }
389 } else {
390 std::shared_ptr<udp_client_endpoint_impl> ep =
391 std::dynamic_pointer_cast<udp_client_endpoint_impl>(endpoint_to_delete);
392 if (ep) {
393 its_port = ep->get_remote_port();
394 ep->get_remote_address(its_address);
395 }
396 }
397 const auto found_ip = client_endpoints_by_ip_.find(its_address);
398 if (found_ip != client_endpoints_by_ip_.end()) {
399 const auto found_port = found_ip->second.find(its_port);
400 if (found_port != found_ip->second.end()) {
401 auto found_reliable = found_port->second.find(_reliable);
402 if (found_reliable != found_port->second.end()) {
403 const auto found_partition = found_reliable->second.find(its_partition);
404 if (found_partition != found_reliable->second.end()) {
405 if (found_partition->second == endpoint_to_delete) {
406 found_reliable->second.erase(its_partition);
407 // delete if necessary
408 if (0 == found_reliable->second.size()) {
409 found_port->second.erase(_reliable);
410 if (0 == found_port->second.size()) {
411 found_ip->second.erase(found_port);
412 if (0 == found_ip->second.size()) {
413 client_endpoints_by_ip_.erase(found_ip);
414 }
415 }
416 }
417 }
418 }
419 }
420 }
421 }
422 }
423 }
424 }
425 if (!other_services_reachable_through_endpoint && endpoint_to_delete) {
426 endpoint_to_delete->stop();
427 }
428 }
429
find_or_create_multicast_endpoint(service_t _service,instance_t _instance,const boost::asio::ip::address & _sender,const boost::asio::ip::address & _address,uint16_t _port)430 void endpoint_manager_impl::find_or_create_multicast_endpoint(
431 service_t _service, instance_t _instance,
432 const boost::asio::ip::address &_sender,
433 const boost::asio::ip::address &_address, uint16_t _port) {
434 bool multicast_known(false);
435 {
436 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
437 const auto found_service = multicast_info.find(_service);
438 if (found_service != multicast_info.end()) {
439 const auto found_instance = found_service->second.find(_instance);
440 if (found_instance != found_service->second.end()) {
441 const auto& endpoint_def = found_instance->second;
442 if (endpoint_def->get_address() == _address &&
443 endpoint_def->get_port() == _port) {
444 // Multicast info and endpoint already created before
445 // This can happen when more than one client subscribe on the same instance!
446 multicast_known = true;
447 }
448 }
449 }
450 }
451 const bool is_someip = configuration_->is_someip(_service, _instance);
452 bool _is_found(false);
453 // Create multicast endpoint & join multicase group
454 std::shared_ptr<endpoint> its_endpoint = find_or_create_server_endpoint(
455 _port, false, is_someip, _service, _instance, _is_found, true);
456 if (!_is_found) {
457 // Only save multicast info if we created a new endpoint
458 // to be able to delete the new endpoint
459 // as soon as the instance stops offering its service
460 std::shared_ptr<endpoint_definition> endpoint_def =
461 endpoint_definition::get(_address, _port, false, _service, _instance);
462 multicast_info[_service][_instance] = endpoint_def;
463 }
464
465 if (its_endpoint) {
466 if (!multicast_known) {
467 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
468 service_instances_multicast_[_service][_sender] = _instance;
469 }
470
471 auto its_udp_server_endpoint
472 = std::dynamic_pointer_cast<udp_server_endpoint_impl>(its_endpoint);
473 if (_port != configuration_->get_sd_port()) {
474 its_udp_server_endpoint->join(_address.to_string());
475 } else {
476 its_udp_server_endpoint->join_unlocked(_address.to_string());
477 }
478 } else {
479 VSOMEIP_ERROR <<"Could not find/create multicast endpoint!";
480 }
481 }
482
clear_multicast_endpoints(service_t _service,instance_t _instance)483 void endpoint_manager_impl::clear_multicast_endpoints(service_t _service, instance_t _instance) {
484 std::shared_ptr<endpoint> multicast_endpoint;
485 std::string address;
486
487 {
488 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
489 // Clear multicast info and endpoint and multicast instance (remote service)
490 if (multicast_info.find(_service) != multicast_info.end()) {
491 if (multicast_info[_service].find(_instance) != multicast_info[_service].end()) {
492 address = multicast_info[_service][_instance]->get_address().to_string();
493 uint16_t port = multicast_info[_service][_instance]->get_port();
494 auto found_port = server_endpoints_.find(port);
495 if (found_port != server_endpoints_.end()) {
496 auto found_unreliable = found_port->second.find(false);
497 if (found_unreliable != found_port->second.end()) {
498 multicast_endpoint = found_unreliable->second;
499 server_endpoints_[port].erase(false);
500 }
501 if (found_port->second.find(true) == found_port->second.end()) {
502 server_endpoints_.erase(port);
503 }
504 }
505 multicast_info[_service].erase(_instance);
506 if (0 >= multicast_info[_service].size()) {
507 multicast_info.erase(_service);
508 }
509 (void)remove_instance_multicast(_service, _instance);
510 }
511 }
512 }
513 if (multicast_endpoint) {
514 dynamic_cast<udp_server_endpoint_impl*>(
515 multicast_endpoint.get())->leave(address);
516
517 multicast_endpoint->stop();
518 }
519 }
520
supports_selective(service_t _service,instance_t _instance) const521 bool endpoint_manager_impl::supports_selective(service_t _service,
522 instance_t _instance) const {
523 bool supports_selective(false);
524 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
525 const auto its_service = remote_service_info_.find(_service);
526 if (its_service != remote_service_info_.end()) {
527 const auto its_instance = its_service->second.find(_instance);
528 if (its_instance != its_service->second.end()) {
529 for (const auto& its_reliable : its_instance->second) {
530 supports_selective |= configuration_->
531 supports_selective_broadcasts(
532 its_reliable.second->get_address());
533 }
534 }
535 }
536 return supports_selective;
537 }
538
print_status() const539 void endpoint_manager_impl::print_status() const {
540 // local client endpoints
541 {
542 std::map<client_t, std::shared_ptr<endpoint>> lces = get_local_endpoints();
543 VSOMEIP_INFO << "status local client endpoints: " << std::dec << lces.size();
544 for (const auto& lce : lces) {
545 lce.second->print_status();
546 }
547 }
548
549 // udp and tcp client endpoints
550 {
551 client_endpoints_by_ip_t client_endpoints_by_ip;
552 server_endpoints_t server_endpoints;
553 {
554 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
555 client_endpoints_by_ip = client_endpoints_by_ip_;
556 server_endpoints = server_endpoints_;
557 }
558 VSOMEIP_INFO << "status start remote client endpoints:";
559 std::uint32_t num_remote_client_endpoints(0);
560 // normal endpoints
561 for (const auto &its_address : client_endpoints_by_ip) {
562 for (const auto &its_port : its_address.second) {
563 for (const auto &its_reliability : its_port.second) {
564 for (const auto &its_partition : its_reliability.second) {
565 its_partition.second->print_status();
566 num_remote_client_endpoints++;
567 }
568 }
569 }
570 }
571 VSOMEIP_INFO << "status end remote client endpoints: " << std::dec
572 << num_remote_client_endpoints;
573
574 VSOMEIP_INFO << "status start server endpoints:";
575 std::uint32_t num_server_endpoints(1);
576 // local server endpoints
577 static_cast<routing_manager_impl*>(rm_)->print_stub_status();
578
579 // server endpoints
580 for (const auto& p : server_endpoints) {
581 for (const auto& ru : p.second ) {
582 ru.second->print_status();
583 num_server_endpoints++;
584 }
585 }
586 VSOMEIP_INFO << "status end server endpoints:"
587 << std::dec << num_server_endpoints;
588 }
589 }
590
591 std::shared_ptr<local_server_endpoint_impl>
create_local_server(bool * _is_socket_activated,const std::shared_ptr<routing_host> & _routing_host)592 endpoint_manager_impl::create_local_server(
593 bool* _is_socket_activated,
594 const std::shared_ptr<routing_host>& _routing_host) {
595 std::shared_ptr<local_server_endpoint_impl> its_endpoint;
596 std::stringstream its_endpoint_path_ss;
597 its_endpoint_path_ss << utility::get_base_path(configuration_) << VSOMEIP_ROUTING_CLIENT;
598 const std::string its_endpoint_path = its_endpoint_path_ss.str();
599 client_t routing_host_id = configuration_->get_id(configuration_->get_routing_host());
600 if (security::get()->is_enabled() && get_client() != routing_host_id) {
601 VSOMEIP_ERROR << "endpoint_manager_impl::create_local_server: "
602 << std::hex << "Client " << get_client() << " isn't allowed"
603 << " to create the routing endpoint as its not configured as the routing master!";
604 return its_endpoint;
605 }
606 uint32_t native_socket_fd = 0;
607 int32_t num_fd = 0;
608 #ifndef WITHOUT_SYSTEMD
609 num_fd = sd_listen_fds(0);
610 #endif
611 if (num_fd > 1) {
612 VSOMEIP_ERROR << "Too many file descriptors received by systemd socket activation! num_fd: " << num_fd;
613 } else if (num_fd == 1) {
614 native_socket_fd = SD_LISTEN_FDS_START + 0;
615 VSOMEIP_INFO << "Using native socket created by systemd socket activation! fd: " << native_socket_fd;
616 #ifndef _WIN32
617 try {
618 its_endpoint =
619 std::make_shared <local_server_endpoint_impl>(
620 shared_from_this(), _routing_host,
621 boost::asio::local::stream_protocol_ext::endpoint(its_endpoint_path),
622 io_,
623 native_socket_fd,
624 configuration_, true);
625 } catch (const std::exception &e) {
626 VSOMEIP_ERROR << "Server endpoint creation failed. Client ID: "
627 << std::hex << std::setw(4) << std::setfill('0')
628 << VSOMEIP_ROUTING_CLIENT << ": " << e.what();
629 }
630 #endif
631 *_is_socket_activated = true;
632 } else {
633 #if _WIN32
634 ::_unlink(its_endpoint_path.c_str());
635 int port = VSOMEIP_INTERNAL_BASE_PORT;
636 VSOMEIP_INFO << "Routing endpoint at " << port;
637 #else
638 if (-1 == ::unlink(its_endpoint_path.c_str()) && errno != ENOENT) {
639 VSOMEIP_ERROR << "endpoint_manager_impl::create_local_server unlink failed ("
640 << its_endpoint_path << "): "<< std::strerror(errno);
641 }
642 VSOMEIP_INFO << __func__ << " Routing endpoint at " << its_endpoint_path;
643 #endif
644
645 try {
646 its_endpoint =
647 std::make_shared <local_server_endpoint_impl>(
648 shared_from_this(), _routing_host,
649 #ifdef _WIN32
650 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port),
651 #else
652 boost::asio::local::stream_protocol_ext::endpoint(its_endpoint_path),
653 #endif
654 io_,
655 configuration_, true);
656 } catch (const std::exception &e) {
657 VSOMEIP_ERROR << "Server endpoint creation failed. Client ID: "
658 << std::hex << std::setw(4) << std::setfill('0')
659 << VSOMEIP_ROUTING_CLIENT << ": " << e.what();
660 }
661 *_is_socket_activated = false;
662 }
663 return its_endpoint;
664 }
665
find_instance(service_t _service,endpoint * const _endpoint) const666 instance_t endpoint_manager_impl::find_instance(
667 service_t _service, endpoint* const _endpoint) const {
668 instance_t its_instance(0xFFFF);
669 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
670 auto found_service = service_instances_.find(_service);
671 if (found_service != service_instances_.end()) {
672 auto found_endpoint = found_service->second.find(_endpoint);
673 if (found_endpoint != found_service->second.end()) {
674 its_instance = found_endpoint->second;
675 }
676 }
677 return its_instance;
678 }
679
find_instance_multicast(service_t _service,const boost::asio::ip::address & _sender) const680 instance_t endpoint_manager_impl::find_instance_multicast(
681 service_t _service, const boost::asio::ip::address &_sender) const {
682 instance_t its_instance(0xFFFF);
683 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
684 auto found_service = service_instances_multicast_.find(_service);
685 if (found_service != service_instances_multicast_.end()) {
686 auto found_sender = found_service->second.find(_sender);
687 if (found_sender != found_service->second.end()) {
688 its_instance = found_sender->second;
689 }
690 }
691 return its_instance;
692 }
693
remove_instance(service_t _service,endpoint * const _endpoint)694 bool endpoint_manager_impl::remove_instance(service_t _service,
695 endpoint* const _endpoint) {
696 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
697 auto found_service = service_instances_.find(_service);
698 if (found_service != service_instances_.end()) {
699 if (found_service->second.erase(_endpoint)) {
700 if (!found_service->second.size()) {
701 service_instances_.erase(found_service);
702 }
703 }
704 }
705 _endpoint->decrement_use_count();
706 return (_endpoint->get_use_count() == 0);
707 }
708
remove_instance_multicast(service_t _service,instance_t _instance)709 bool endpoint_manager_impl::remove_instance_multicast(service_t _service,
710 instance_t _instance) {
711 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
712 auto found_service = service_instances_multicast_.find(_service);
713 if (found_service != service_instances_multicast_.end()) {
714 for (auto &its_sender : found_service->second) {
715 if (its_sender.second == _instance) {
716 if (found_service->second.erase(its_sender.first)) {
717 if (!found_service->second.size()) {
718 service_instances_multicast_.erase(_service);
719 }
720 }
721 return (true);
722 }
723 }
724 }
725 return (false);
726 }
727
on_connect(std::shared_ptr<endpoint> _endpoint)728 void endpoint_manager_impl::on_connect(std::shared_ptr<endpoint> _endpoint) {
729 // Is called when endpoint->connect succeeded!
730 struct service_info {
731 service_t service_id_;
732 instance_t instance_id_;
733 major_version_t major_;
734 minor_version_t minor_;
735 std::shared_ptr<endpoint> endpoint_;
736 bool service_is_unreliable_only_;
737 };
738
739 // Set to state CONNECTED as connection is not yet fully established in remote side POV
740 // but endpoint is ready to send / receive. Set to ESTABLISHED after timer expires
741 // to prevent inserting subscriptions twice or send out subscription before remote side
742 // is finished with TCP 3 way handshake
743 _endpoint->set_connected(true);
744
745 std::forward_list<struct service_info> services_to_report_;
746 {
747 const bool endpoint_is_reliable = _endpoint->is_reliable();
748 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
749 for (auto &its_service : remote_services_) {
750 for (auto &its_instance : its_service.second) {
751 auto found_endpoint = its_instance.second.find(endpoint_is_reliable);
752 if (found_endpoint != its_instance.second.end()) {
753 if (found_endpoint->second == _endpoint) {
754 std::shared_ptr<serviceinfo> its_info(
755 rm_->find_service(its_service.first,
756 its_instance.first));
757 if (!its_info) {
758 _endpoint->set_established(true);
759 return;
760 }
761 // only report services offered via TCP+UDP when both
762 // endpoints are connected
763 const auto its_other_endpoint = its_info->get_endpoint(
764 !endpoint_is_reliable);
765
766 if (!its_other_endpoint || (its_other_endpoint
767 && its_other_endpoint->is_established_or_connected())) {
768 services_to_report_.push_front(
769 { its_service.first,
770 its_instance.first,
771 its_info->get_major(),
772 its_info->get_minor(),
773 _endpoint,
774 (!endpoint_is_reliable &&
775 !its_other_endpoint)});
776 }
777 }
778 }
779 }
780 }
781 }
782 for (const auto &s : services_to_report_) {
783 static_cast<routing_manager_impl*>(rm_)->service_endpoint_connected(
784 s.service_id_, s.instance_id_, s.major_, s.minor_, s.endpoint_,
785 s.service_is_unreliable_only_);
786 }
787 if (services_to_report_.empty()) {
788 _endpoint->set_established(true);
789 }
790 }
791
on_disconnect(std::shared_ptr<endpoint> _endpoint)792 void endpoint_manager_impl::on_disconnect(std::shared_ptr<endpoint> _endpoint) {
793 // Is called when endpoint->connect fails!
794 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
795 for (auto &its_service : remote_services_) {
796 for (auto &its_instance : its_service.second) {
797 const bool is_reliable = _endpoint->is_reliable();
798 auto found_endpoint = its_instance.second.find(is_reliable);
799 if (found_endpoint != its_instance.second.end()) {
800 if (found_endpoint->second == _endpoint) {
801 std::shared_ptr<serviceinfo> its_info(
802 rm_->find_service(its_service.first,
803 its_instance.first));
804 if(!its_info){
805 return;
806 }
807 if (!is_reliable) {
808 static_cast<routing_manager_impl*>(rm_)->on_availability(
809 its_service.first, its_instance.first,
810 false, its_info->get_major(),
811 its_info->get_minor());
812 }
813 static_cast<routing_manager_impl*>(rm_)->service_endpoint_disconnected(
814 its_service.first, its_instance.first,
815 its_info->get_major(),
816 its_info->get_minor(), _endpoint);
817 }
818 }
819 }
820 }
821 }
822
on_bind_error(std::shared_ptr<endpoint> _endpoint,std::uint16_t _remote_port)823 bool endpoint_manager_impl::on_bind_error(std::shared_ptr<endpoint> _endpoint, std::uint16_t _remote_port) {
824 std::lock_guard<std::recursive_mutex> its_ep_lock(endpoint_mutex_);
825 for (auto &its_service : remote_services_) {
826 for (auto &its_instance : its_service.second) {
827 const bool is_reliable = _endpoint->is_reliable();
828 auto found_endpoint = its_instance.second.find(is_reliable);
829 if (found_endpoint != its_instance.second.end()) {
830 if (found_endpoint->second == _endpoint) {
831 // get a new client port using service / instance / remote port
832 uint16_t its_old_local_port = _endpoint->get_local_port();
833 uint16_t its_new_local_port(ILLEGAL_PORT);
834
835 std::unique_lock<std::mutex> its_lock(used_client_ports_mutex_);
836 if (configuration_->get_client_port(its_service.first,
837 its_instance.first,
838 _remote_port,
839 is_reliable,
840 used_client_ports_,
841 its_new_local_port)) {
842 _endpoint->set_local_port(its_new_local_port);
843 its_lock.unlock();
844 release_port(its_old_local_port, _endpoint->is_reliable());
845 return true;
846 }
847 }
848 }
849 }
850 }
851 return false;
852 }
853
on_error(const byte_t * _data,length_t _length,endpoint * const _receiver,const boost::asio::ip::address & _remote_address,std::uint16_t _remote_port)854 void endpoint_manager_impl::on_error(
855 const byte_t *_data, length_t _length, endpoint* const _receiver,
856 const boost::asio::ip::address &_remote_address,
857 std::uint16_t _remote_port) {
858 instance_t its_instance = 0;
859 if (_length >= VSOMEIP_SERVICE_POS_MAX) {
860 service_t its_service = VSOMEIP_BYTES_TO_WORD(
861 _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
862 its_instance = find_instance(its_service, _receiver);
863 }
864 static_cast<routing_manager_impl*>(rm_)->send_error(
865 return_code_e::E_MALFORMED_MESSAGE, _data, _length, its_instance,
866 _receiver->is_reliable(), _receiver, _remote_address, _remote_port);
867 }
868
release_port(uint16_t _port,bool _reliable)869 void endpoint_manager_impl::release_port(uint16_t _port, bool _reliable) {
870 std::lock_guard<std::mutex> its_lock(used_client_ports_mutex_);
871 used_client_ports_[_reliable].erase(_port);
872 }
873
874 std::shared_ptr<endpoint>
find_remote_client(service_t _service,instance_t _instance,bool _reliable)875 endpoint_manager_impl::find_remote_client(
876 service_t _service, instance_t _instance, bool _reliable) {
877
878 std::shared_ptr<endpoint> its_endpoint;
879 auto found_service = remote_services_.find(_service);
880 if (found_service != remote_services_.end()) {
881 auto found_instance = found_service->second.find(_instance);
882 if (found_instance != found_service->second.end()) {
883 auto found_reliability = found_instance->second.find(_reliable);
884 if (found_reliability != found_instance->second.end()) {
885 its_endpoint = found_reliability->second;
886 }
887 }
888 }
889 if (its_endpoint) {
890 return its_endpoint;
891 }
892
893 // Endpoint did not yet exist. Get the partition id to check
894 // whether the client endpoint for the partition does exist.
895 partition_id_t its_partition_id
896 = configuration_->get_partition_id(_service, _instance);
897
898 // If another service within the same partition is hosted on the
899 // same server_endpoint reuse the existing client_endpoint.
900 auto found_service_info = remote_service_info_.find(_service);
901 if (found_service_info != remote_service_info_.end()) {
902 auto found_instance = found_service_info->second.find(_instance);
903 if (found_instance != found_service_info->second.end()) {
904 auto found_reliable = found_instance->second.find(_reliable);
905 if (found_reliable != found_instance->second.end()) {
906 std::shared_ptr<endpoint_definition> its_ep_def
907 = found_reliable->second;
908 auto found_address = client_endpoints_by_ip_.find(
909 its_ep_def->get_address());
910 if (found_address != client_endpoints_by_ip_.end()) {
911 auto found_port = found_address->second.find(
912 its_ep_def->get_remote_port());
913 if (found_port != found_address->second.end()) {
914 auto found_reliable2
915 = found_port->second.find(_reliable);
916 if (found_reliable2 != found_port->second.end()) {
917 auto found_partition
918 = found_reliable2->second.find(its_partition_id);
919 if (found_partition != found_reliable2->second.end()) {
920 its_endpoint = found_partition->second;
921
922 // store the endpoint under this service/instance id
923 // as well - needed for later cleanup
924 remote_services_[_service][_instance][_reliable]
925 = its_endpoint;
926 service_instances_[_service][its_endpoint.get()] = _instance;
927
928 // add endpoint to serviceinfo object
929 auto found_service_info = rm_->find_service(_service,_instance);
930 if (found_service_info) {
931 found_service_info->set_endpoint(its_endpoint, _reliable);
932 }
933 }
934 }
935 }
936 }
937 }
938 }
939 }
940
941 return (its_endpoint);
942 }
943
create_remote_client(service_t _service,instance_t _instance,bool _reliable)944 std::shared_ptr<endpoint> endpoint_manager_impl::create_remote_client(
945 service_t _service, instance_t _instance, bool _reliable) {
946 std::shared_ptr<endpoint> its_endpoint;
947 std::shared_ptr<endpoint_definition> its_endpoint_def;
948 uint16_t its_local_port;
949 uint16_t its_remote_port = ILLEGAL_PORT;
950
951 auto found_service = remote_service_info_.find(_service);
952 if (found_service != remote_service_info_.end()) {
953 auto found_instance = found_service->second.find(_instance);
954 if (found_instance != found_service->second.end()) {
955 auto found_reliability = found_instance->second.find(_reliable);
956 if (found_reliability != found_instance->second.end()) {
957 its_endpoint_def = found_reliability->second;
958 its_remote_port = its_endpoint_def->get_port();
959 }
960 }
961 }
962
963 if( its_remote_port != ILLEGAL_PORT) {
964 // if client port range for remote service port range is configured
965 // and remote port is in range, determine unused client port
966 std::unique_lock<std::mutex> its_lock(used_client_ports_mutex_);
967 if (configuration_->get_client_port(_service, _instance, its_remote_port, _reliable,
968 used_client_ports_, its_local_port)) {
969 if(its_endpoint_def) {
970 its_endpoint = create_client_endpoint(
971 its_endpoint_def->get_address(),
972 its_local_port,
973 its_endpoint_def->get_port(),
974 _reliable);
975 }
976
977 if (its_endpoint) {
978 partition_id_t its_partition
979 = configuration_->get_partition_id(_service, _instance);
980 used_client_ports_[_reliable].insert(its_local_port);
981 its_lock.unlock();
982 service_instances_[_service][its_endpoint.get()] = _instance;
983 remote_services_[_service][_instance][_reliable] = its_endpoint;
984
985 client_endpoints_by_ip_[its_endpoint_def->get_address()]
986 [its_endpoint_def->get_port()]
987 [_reliable]
988 [its_partition]= its_endpoint;
989 // Set the basic route to the service in the service info
990 auto found_service_info = rm_->find_service(_service, _instance);
991 if (found_service_info) {
992 found_service_info->set_endpoint(its_endpoint, _reliable);
993 }
994 boost::system::error_code ec;
995 VSOMEIP_INFO << "endpoint_manager_impl::create_remote_client: "
996 << its_endpoint_def->get_address().to_string(ec)
997 << ":" << std::dec << its_endpoint_def->get_port()
998 << " reliable: " << _reliable
999 << " using local port: " << std::dec << its_local_port;
1000 }
1001 }
1002 }
1003 return its_endpoint;
1004 }
1005
create_client_endpoint(const boost::asio::ip::address & _address,uint16_t _local_port,uint16_t _remote_port,bool _reliable)1006 std::shared_ptr<endpoint> endpoint_manager_impl::create_client_endpoint(
1007 const boost::asio::ip::address &_address,
1008 uint16_t _local_port, uint16_t _remote_port,
1009 bool _reliable) {
1010
1011 std::shared_ptr<endpoint> its_endpoint;
1012 boost::asio::ip::address its_unicast = configuration_->get_unicast_address();
1013
1014 try {
1015 if (_reliable) {
1016 its_endpoint = std::make_shared<tcp_client_endpoint_impl>(
1017 shared_from_this(),
1018 rm_->shared_from_this(),
1019 boost::asio::ip::tcp::endpoint(its_unicast, _local_port),
1020 boost::asio::ip::tcp::endpoint(_address, _remote_port),
1021 io_,
1022 configuration_);
1023
1024 if (configuration_->has_enabled_magic_cookies(_address.to_string(),
1025 _remote_port)) {
1026 its_endpoint->enable_magic_cookies();
1027 }
1028 } else {
1029 its_endpoint = std::make_shared<udp_client_endpoint_impl>(
1030 shared_from_this(),
1031 rm_->shared_from_this(),
1032 boost::asio::ip::udp::endpoint(its_unicast, _local_port),
1033 boost::asio::ip::udp::endpoint(_address, _remote_port),
1034 io_,
1035 configuration_);
1036 }
1037 } catch (...) {
1038 VSOMEIP_ERROR << __func__ << " Client endpoint creation failed";
1039 }
1040
1041 return (its_endpoint);
1042 }
1043
1044 void
log_client_states() const1045 endpoint_manager_impl::log_client_states() const {
1046 std::stringstream its_log;
1047 client_endpoints_by_ip_t its_client_endpoints;
1048 std::vector<
1049 std::pair<
1050 std::tuple<boost::asio::ip::address, uint16_t, bool>,
1051 size_t
1052 >
1053 > its_client_queue_sizes;
1054
1055 {
1056 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
1057 its_client_endpoints = client_endpoints_by_ip_;
1058 }
1059
1060 for (const auto &its_address : its_client_endpoints) {
1061 for (const auto &its_port : its_address.second) {
1062 for (const auto &its_reliability : its_port.second) {
1063 for (const auto &its_partition : its_reliability.second) {
1064 size_t its_queue_size = its_partition.second->get_queue_size();
1065 if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE)
1066 its_client_queue_sizes.push_back(
1067 std::make_pair(
1068 std::make_tuple(
1069 its_address.first,
1070 its_port.first,
1071 its_reliability.first),
1072 its_queue_size));
1073 }
1074 }
1075 }
1076 }
1077
1078 std::sort(its_client_queue_sizes.begin(), its_client_queue_sizes.end(),
1079 [](const std::pair<
1080 std::tuple<boost::asio::ip::address, uint16_t, bool>,
1081 size_t> &_a,
1082 const std::pair<
1083 std::tuple<boost::asio::ip::address, uint16_t, bool>,
1084 size_t> &_b) {
1085 return (_a.second > _b.second);
1086 });
1087
1088 size_t its_max(std::min(size_t(5), its_client_queue_sizes.size()));
1089 for (size_t i = 0; i < its_max; i++) {
1090 its_log << std::hex << std::setw(4) << std::setfill('0')
1091 << std::get<0>(its_client_queue_sizes[i].first).to_string()
1092 << ":" << std::dec << std::get<1>(its_client_queue_sizes[i].first)
1093 << "(" << (std::get<2>(its_client_queue_sizes[i].first) ? "tcp" : "udp") << "):"
1094 << std::dec << its_client_queue_sizes[i].second;
1095 if (i < its_max-1)
1096 its_log << ", ";
1097 }
1098
1099 if (its_log.str().length() > 0)
1100 VSOMEIP_INFO << "ECQ: [" << its_log.str() << "]";
1101 }
1102
1103 void
log_server_states() const1104 endpoint_manager_impl::log_server_states() const {
1105 std::stringstream its_log;
1106 server_endpoints_t its_server_endpoints;
1107 std::vector<
1108 std::pair<
1109 std::pair<uint16_t, bool>,
1110 size_t
1111 >
1112 > its_client_queue_sizes;
1113
1114 {
1115 std::lock_guard<std::recursive_mutex> its_lock(endpoint_mutex_);
1116 its_server_endpoints = server_endpoints_;
1117 }
1118
1119 for (const auto &its_port : its_server_endpoints) {
1120 for (const auto &its_reliability : its_port.second) {
1121 size_t its_queue_size = its_reliability.second->get_queue_size();
1122 if (its_queue_size > VSOMEIP_DEFAULT_QUEUE_WARN_SIZE)
1123 its_client_queue_sizes.push_back(
1124 std::make_pair(
1125 std::make_pair(
1126 its_port.first,
1127 its_reliability.first),
1128 its_queue_size));
1129 }
1130 }
1131
1132 std::sort(its_client_queue_sizes.begin(), its_client_queue_sizes.end(),
1133 [](const std::pair<std::pair<uint16_t, bool>, size_t> &_a,
1134 const std::pair<std::pair<uint16_t, bool>, size_t> &_b) {
1135 return (_a.second > _b.second);
1136 });
1137
1138 size_t its_max(std::min(size_t(5), its_client_queue_sizes.size()));
1139 for (size_t i = 0; i < its_max; i++) {
1140 its_log << std::dec << its_client_queue_sizes[i].first.first
1141 << "(" << (its_client_queue_sizes[i].first.second ? "tcp" : "udp") << "):"
1142 << std::dec << its_client_queue_sizes[i].second;
1143 if (i < its_max-1)
1144 its_log << ", ";
1145 }
1146
1147 if (its_log.str().length() > 0)
1148 VSOMEIP_INFO << "ESQ: [" << its_log.str() << "]";
1149 }
1150
1151 } // namespace vsomeip_v3
1152