1 // Copyright (C) 2014-2018 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <climits>
7 #include <iomanip>
8 #include <memory>
9 #include <sstream>
10 #include <forward_list>
11
12 #ifndef _WIN32
13 #include <unistd.h>
14 #include <cstdio>
15 #include <time.h>
16 #endif
17
18 #include <boost/asio/steady_timer.hpp>
19
20 #include <vsomeip/constants.hpp>
21 #include <vsomeip/payload.hpp>
22 #include <vsomeip/runtime.hpp>
23 #include <vsomeip/internal/logger.hpp>
24
25 #include "../include/event.hpp"
26 #include "../include/eventgroupinfo.hpp"
27 #include "../include/remote_subscription.hpp"
28 #include "../include/routing_manager_host.hpp"
29 #include "../include/routing_manager_impl.hpp"
30 #include "../include/routing_manager_stub.hpp"
31 #include "../include/serviceinfo.hpp"
32 #include "../../configuration/include/configuration.hpp"
33 #include "../../security/include/security.hpp"
34
35 #include "../../endpoints/include/endpoint_definition.hpp"
36 #include "../../endpoints/include/local_client_endpoint_impl.hpp"
37 #include "../../endpoints/include/tcp_client_endpoint_impl.hpp"
38 #include "../../endpoints/include/tcp_server_endpoint_impl.hpp"
39 #include "../../endpoints/include/udp_client_endpoint_impl.hpp"
40 #include "../../endpoints/include/udp_server_endpoint_impl.hpp"
41 #include "../../endpoints/include/virtual_server_endpoint_impl.hpp"
42 #include "../../message/include/deserializer.hpp"
43 #include "../../message/include/message_impl.hpp"
44 #include "../../message/include/serializer.hpp"
45 #include "../../service_discovery/include/constants.hpp"
46 #include "../../service_discovery/include/defines.hpp"
47 #include "../../service_discovery/include/runtime.hpp"
48 #include "../../service_discovery/include/service_discovery.hpp"
49 #include "../../utility/include/byteorder.hpp"
50 #include "../../utility/include/utility.hpp"
51 #include "../../plugin/include/plugin_manager_impl.hpp"
52 #ifdef USE_DLT
53 #include "../../tracing/include/connector_impl.hpp"
54 #endif
55
56 #ifndef ANDROID
57 #include "../../e2e_protection/include/buffer/buffer.hpp"
58 #include "../../e2e_protection/include/e2exf/config.hpp"
59
60 #include "../../e2e_protection/include/e2e/profile/e2e_provider.hpp"
61 #endif
62
63 #ifdef USE_DLT
64 #include "../../tracing/include/connector_impl.hpp"
65 #endif
66
67 namespace vsomeip_v3 {
68
69 #ifdef ANDROID
70 namespace sd {
~runtime()71 runtime::~runtime() {}
72 }
73 #endif
74
routing_manager_impl(routing_manager_host * _host)75 routing_manager_impl::routing_manager_impl(routing_manager_host *_host) :
76 routing_manager_base(_host),
77 version_log_timer_(_host->get_io()),
78 if_state_running_(false),
79 sd_route_set_(false),
80 routing_running_(false),
81 status_log_timer_(_host->get_io()),
82 memory_log_timer_(_host->get_io()),
83 ep_mgr_impl_(std::make_shared<endpoint_manager_impl>(this, io_, configuration_)),
84 pending_remote_offer_id_(0),
85 last_resume_(std::chrono::steady_clock::now().min()),
86 statistics_log_timer_(_host->get_io()),
87 ignored_statistics_counter_(0)
88 {
89 }
90
~routing_manager_impl()91 routing_manager_impl::~routing_manager_impl() {
92 utility::remove_lockfile(configuration_);
93 utility::reset_client_ids();
94 }
95
get_io()96 boost::asio::io_service & routing_manager_impl::get_io() {
97 return routing_manager_base::get_io();
98 }
99
get_client() const100 client_t routing_manager_impl::get_client() const {
101 return routing_manager_base::get_client();
102 }
103
find_local_clients(service_t _service,instance_t _instance)104 std::set<client_t> routing_manager_impl::find_local_clients(service_t _service, instance_t _instance) {
105 return routing_manager_base::find_local_clients(_service, _instance);
106 }
107
find_local_client(service_t _service,instance_t _instance)108 client_t routing_manager_impl::find_local_client(service_t _service, instance_t _instance) {
109 return routing_manager_base::find_local_client(_service, _instance);
110 }
111
is_subscribe_to_any_event_allowed(credentials_t _credentials,client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup)112 bool routing_manager_impl::is_subscribe_to_any_event_allowed(credentials_t _credentials, client_t _client,
113 service_t _service, instance_t _instance, eventgroup_t _eventgroup) {
114 return routing_manager_base::is_subscribe_to_any_event_allowed(_credentials, _client,
115 _service, _instance, _eventgroup);
116 }
117
init()118 void routing_manager_impl::init() {
119 routing_manager_base::init(ep_mgr_impl_);
120
121 // TODO: Only instantiate the stub if needed
122 stub_ = std::make_shared<routing_manager_stub>(this, configuration_);
123 stub_->init();
124
125 if (configuration_->is_sd_enabled()) {
126 VSOMEIP_INFO<< "Service Discovery enabled. Trying to load module.";
127 auto its_plugin = plugin_manager::get()->get_plugin(
128 plugin_type_e::SD_RUNTIME_PLUGIN, VSOMEIP_SD_LIBRARY);
129 if (its_plugin) {
130 VSOMEIP_INFO << "Service Discovery module loaded.";
131 discovery_ = std::dynamic_pointer_cast<sd::runtime>(its_plugin)->create_service_discovery(this, configuration_);
132 discovery_->init();
133 } else {
134 VSOMEIP_ERROR << "Service Discovery module could not be loaded!";
135 std::exit(EXIT_FAILURE);
136 }
137 }
138
139 #ifndef ANDROID
140 if( configuration_->is_e2e_enabled()) {
141 VSOMEIP_INFO << "E2E protection enabled.";
142
143 const char *its_e2e_module = getenv(VSOMEIP_ENV_E2E_PROTECTION_MODULE);
144 std::string plugin_name = its_e2e_module != nullptr ? its_e2e_module : VSOMEIP_E2E_LIBRARY;
145
146 auto its_plugin = plugin_manager::get()->get_plugin(plugin_type_e::APPLICATION_PLUGIN, plugin_name);
147 if (its_plugin) {
148 VSOMEIP_INFO << "E2E module loaded.";
149 e2e_provider_ = std::dynamic_pointer_cast<e2e::e2e_provider>(its_plugin);
150 }
151 }
152
153 if(e2e_provider_) {
154 std::map<e2exf::data_identifier_t, std::shared_ptr<cfg::e2e>> its_e2e_configuration = configuration_->get_e2e_configuration();
155 for (auto &identifier : its_e2e_configuration) {
156 if(!e2e_provider_->add_configuration(identifier.second)) {
157 VSOMEIP_INFO << "Unknown E2E profile: " << identifier.second->profile << ", skipping ...";
158 }
159 }
160 }
161 #endif
162 }
163
start()164 void routing_manager_impl::start() {
165 #ifndef _WIN32
166 boost::asio::ip::address its_multicast;
167 try {
168 its_multicast = boost::asio::ip::address::from_string(configuration_->get_sd_multicast());
169 } catch (...) {
170 VSOMEIP_ERROR << "Illegal multicast address \""
171 << configuration_->get_sd_multicast()
172 << "\". Please check your configuration.";
173 }
174
175 netlink_connector_ = std::make_shared<netlink_connector>(
176 host_->get_io(), configuration_->get_unicast_address(), its_multicast);
177 netlink_connector_->register_net_if_changes_handler(
178 std::bind(&routing_manager_impl::on_net_interface_or_route_state_changed,
179 this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
180 netlink_connector_->start();
181 #else
182 {
183 std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
184 start_ip_routing();
185 }
186 #endif
187
188 stub_->start();
189 host_->on_state(state_type_e::ST_REGISTERED);
190
191 if (configuration_->log_version()) {
192 std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
193 version_log_timer_.expires_from_now(
194 std::chrono::seconds(0));
195 version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk,
196 this, std::placeholders::_1));
197 }
198 #ifndef _WIN32
199 if (configuration_->log_memory()) {
200 std::lock_guard<std::mutex> its_lock(memory_log_timer_mutex_);
201 boost::system::error_code ec;
202 memory_log_timer_.expires_from_now(std::chrono::seconds(0), ec);
203 memory_log_timer_.async_wait(
204 std::bind(&routing_manager_impl::memory_log_timer_cbk, this,
205 std::placeholders::_1));
206 }
207 #endif
208 if (configuration_->log_status()) {
209 std::lock_guard<std::mutex> its_lock(status_log_timer_mutex_);
210 boost::system::error_code ec;
211 status_log_timer_.expires_from_now(std::chrono::seconds(0), ec);
212 status_log_timer_.async_wait(
213 std::bind(&routing_manager_impl::status_log_timer_cbk, this,
214 std::placeholders::_1));
215 }
216
217 if (configuration_->log_statistics()) {
218 std::lock_guard<std::mutex> its_lock(statistics_log_timer_mutex_);
219 boost::system::error_code ec;
220 statistics_log_timer_.expires_from_now(std::chrono::seconds(0), ec);
221 statistics_log_timer_.async_wait(
222 std::bind(&routing_manager_impl::statistics_log_timer_cbk, this,
223 std::placeholders::_1));
224 }
225 }
226
stop()227 void routing_manager_impl::stop() {
228 // Ensure to StopOffer all services that are offered by the application hosting the rm
229 local_services_map_t its_services;
230 {
231 std::lock_guard<std::mutex> its_lock(local_services_mutex_);
232 for (const auto& s : local_services_) {
233 for (const auto& i : s.second) {
234 if (std::get<2>(i.second) == client_) {
235 its_services[s.first][i.first] = i.second;
236 }
237 }
238 }
239
240 }
241 for (const auto& s : its_services) {
242 for (const auto& i : s.second) {
243 on_stop_offer_service(std::get<2>(i.second), s.first, i.first,
244 std::get<0>(i.second), std::get<1>(i.second));
245 }
246 }
247
248 {
249 std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
250 version_log_timer_.cancel();
251 }
252 #ifndef _WIN32
253 {
254 boost::system::error_code ec;
255 std::lock_guard<std::mutex> its_lock(memory_log_timer_mutex_);
256 memory_log_timer_.cancel(ec);
257 }
258 if (netlink_connector_) {
259 netlink_connector_->stop();
260 }
261 #endif
262
263 {
264 std::lock_guard<std::mutex> its_lock(status_log_timer_mutex_);
265 boost::system::error_code ec;
266 status_log_timer_.cancel(ec);
267 }
268
269 {
270 std::lock_guard<std::mutex> its_lock(statistics_log_timer_mutex_);
271 boost::system::error_code ec;
272 statistics_log_timer_.cancel(ec);
273 }
274
275 host_->on_state(state_type_e::ST_DEREGISTERED);
276
277 if (discovery_)
278 discovery_->stop();
279 stub_->stop();
280
281 for (const auto& client : ep_mgr_->get_connected_clients()) {
282 if (client != VSOMEIP_ROUTING_CLIENT) {
283 remove_local(client, true);
284 }
285 }
286 }
287
insert_offer_command(service_t _service,instance_t _instance,uint8_t _command,client_t _client,major_version_t _major,minor_version_t _minor)288 bool routing_manager_impl::insert_offer_command(service_t _service, instance_t _instance, uint8_t _command,
289 client_t _client, major_version_t _major, minor_version_t _minor) {
290 std::lock_guard<std::mutex> its_lock(offer_serialization_mutex_);
291 // flag to indicate whether caller of this function can start directly processing the command
292 bool must_process(false);
293 auto found_service_instance = offer_commands_.find(std::make_pair(_service, _instance));
294 if (found_service_instance != offer_commands_.end()) {
295 // if nothing is queued
296 if (found_service_instance->second.empty()) {
297 must_process = true;
298 }
299 found_service_instance->second.push_back(
300 std::make_tuple(_command, _client, _major, _minor));
301 } else {
302 // nothing is queued -> add command to queue and process command directly
303 offer_commands_[std::make_pair(_service, _instance)].push_back(
304 std::make_tuple(_command, _client, _major, _minor));
305 must_process = true;
306 }
307 return must_process;
308 }
309
erase_offer_command(service_t _service,instance_t _instance)310 bool routing_manager_impl::erase_offer_command(service_t _service, instance_t _instance) {
311 std::lock_guard<std::mutex> its_lock(offer_serialization_mutex_);
312 auto found_service_instance = offer_commands_.find(std::make_pair(_service, _instance));
313 if (found_service_instance != offer_commands_.end()) {
314 // erase processed command
315 if (!found_service_instance->second.empty()) {
316 found_service_instance->second.pop_front();
317 if (!found_service_instance->second.empty()) {
318 // check for other commands to be processed
319 auto its_command = found_service_instance->second.front();
320 if (std::get<0>(its_command) == VSOMEIP_OFFER_SERVICE) {
321 io_.post([&, its_command, _service, _instance](){
322 offer_service(std::get<1>(its_command), _service, _instance,
323 std::get<2>(its_command), std::get<3>(its_command), false);
324 });
325 } else {
326 io_.post([&, its_command, _service, _instance](){
327 stop_offer_service(std::get<1>(its_command), _service, _instance,
328 std::get<2>(its_command), std::get<3>(its_command), false);
329 });
330 }
331 }
332 }
333 }
334 return true;
335 }
336
offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)337 bool routing_manager_impl::offer_service(client_t _client,
338 service_t _service, instance_t _instance,
339 major_version_t _major, minor_version_t _minor) {
340
341 return offer_service(_client, _service, _instance, _major, _minor, true);
342 }
343
offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,bool _must_queue)344 bool routing_manager_impl::offer_service(client_t _client,
345 service_t _service, instance_t _instance,
346 major_version_t _major, minor_version_t _minor,
347 bool _must_queue) {
348
349 VSOMEIP_INFO << "OFFER("
350 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
351 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
352 << std::hex << std::setw(4) << std::setfill('0') << _instance
353 << ":" << std::dec << int(_major) << "." << std::dec << _minor << "]"
354 << " (" << std::boolalpha << _must_queue << ")";
355
356 // only queue commands if method was NOT called via erase_offer_command()
357 if (_must_queue) {
358 if (!insert_offer_command(_service, _instance, VSOMEIP_OFFER_SERVICE,
359 _client, _major, _minor)) {
360 return false;
361 }
362 }
363
364 // Check if the application hosted by routing manager is allowed to offer
365 // offer_service requests of local proxies are checked in rms::on:message
366 if (_client == get_client()) {
367 #ifdef _WIN32
368 std::uint32_t its_routing_uid = ANY_UID;
369 std::uint32_t its_routing_gid = ANY_GID;
370 #else
371 std::uint32_t its_routing_uid = getuid();
372 std::uint32_t its_routing_gid = getgid();
373 #endif
374 if (!security::get()->is_offer_allowed(its_routing_uid, its_routing_gid,
375 _client, _service, _instance)) {
376 VSOMEIP_WARNING << "routing_manager_impl::offer_service: "
377 << std::hex << "Security: Client 0x" << _client
378 << " isn't allowed to offer the following service/instance "
379 << _service << "/" << _instance
380 << " ~> Skip offer!";
381 erase_offer_command(_service, _instance);
382 return false;
383 }
384 }
385
386 if (!handle_local_offer_service(_client, _service, _instance, _major, _minor)) {
387 erase_offer_command(_service, _instance);
388 return false;
389 }
390
391 {
392 std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
393 if (if_state_running_) {
394 init_service_info(_service, _instance, true);
395 } else {
396 pending_sd_offers_.push_back(std::make_pair(_service, _instance));
397 }
398 }
399
400 if (discovery_) {
401 std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
402 if (its_info) {
403 discovery_->offer_service(its_info);
404 }
405 }
406
407 {
408 std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
409 std::set<event_t> its_already_subscribed_events;
410 for (auto &ps : pending_subscriptions_) {
411 if (ps.service_ == _service
412 && ps.instance_ == _instance
413 && ps.major_ == _major) {
414 insert_subscription(ps.service_, ps.instance_,
415 ps.eventgroup_, ps.event_, client_, &its_already_subscribed_events);
416 #if 0
417 VSOMEIP_ERROR << __func__
418 << ": event="
419 << std::hex << ps.service_ << "."
420 << std::hex << ps.instance_ << "."
421 << std::hex << ps.event_;
422 #endif
423 }
424 }
425
426 send_pending_subscriptions(_service, _instance, _major);
427 }
428 stub_->on_offer_service(_client, _service, _instance, _major, _minor);
429 on_availability(_service, _instance, true, _major, _minor);
430 erase_offer_command(_service, _instance);
431 return true;
432 }
433
stop_offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)434 void routing_manager_impl::stop_offer_service(client_t _client,
435 service_t _service, instance_t _instance,
436 major_version_t _major, minor_version_t _minor) {
437
438 stop_offer_service(_client, _service, _instance, _major, _minor, true);
439 }
440
stop_offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,bool _must_queue)441 void routing_manager_impl::stop_offer_service(client_t _client,
442 service_t _service, instance_t _instance,
443 major_version_t _major, minor_version_t _minor,
444 bool _must_queue) {
445
446 VSOMEIP_INFO << "STOP OFFER("
447 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
448 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
449 << std::hex << std::setw(4) << std::setfill('0') << _instance
450 << ":" << std::dec << int(_major) << "." << _minor << "]"
451 << " (" << std::boolalpha << _must_queue << ")";
452
453 if (_must_queue) {
454 if (!insert_offer_command(_service, _instance, VSOMEIP_STOP_OFFER_SERVICE,
455 _client, _major, _minor)) {
456 return;
457 }
458 }
459
460 bool is_local(false);
461 {
462 std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
463 is_local = (its_info && its_info->is_local());
464 }
465 if (is_local) {
466 {
467 std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
468 for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) {
469 if (it->first == _service && it->second == _instance) {
470 it = pending_sd_offers_.erase(it);
471 break;
472 } else {
473 ++it;
474 }
475 }
476 }
477
478 on_stop_offer_service(_client, _service, _instance, _major, _minor);
479 stub_->on_stop_offer_service(_client, _service, _instance, _major, _minor);
480 on_availability(_service, _instance, false, _major, _minor);
481 } else {
482 VSOMEIP_WARNING << __func__ << " received STOP_OFFER("
483 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
484 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
485 << std::hex << std::setw(4) << std::setfill('0') << _instance
486 << ":" << std::dec << int(_major) << "." << _minor << "] "
487 << "for remote service --> ignore";
488 erase_offer_command(_service, _instance);
489 }
490 }
491
request_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)492 void routing_manager_impl::request_service(client_t _client, service_t _service,
493 instance_t _instance, major_version_t _major, minor_version_t _minor) {
494
495 VSOMEIP_INFO << "REQUEST("
496 << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
497 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
498 << std::hex << std::setw(4) << std::setfill('0') << _instance << ":"
499 << std::dec << int(_major) << "." << std::dec << _minor << "]";
500
501 routing_manager_base::request_service(_client,
502 _service, _instance, _major, _minor);
503
504 auto its_info = find_service(_service, _instance);
505 if (!its_info) {
506 requested_service_add(_client, _service, _instance, _major, _minor);
507 if (discovery_) {
508 if (!configuration_->is_local_service(_service, _instance)) {
509 // Non local service instance ~> tell SD to find it!
510 discovery_->request_service(_service, _instance, _major, _minor,
511 DEFAULT_TTL);
512 } else {
513 VSOMEIP_INFO << std::hex
514 << "Avoid trigger SD find-service message"
515 << " for local service/instance/major/minor: "
516 << _service << "/" << _instance << std::dec
517 << "/" << (uint32_t)_major << "/" << _minor;
518 }
519 }
520 } else {
521 if ((_major == its_info->get_major()
522 || DEFAULT_MAJOR == its_info->get_major()
523 || ANY_MAJOR == _major)
524 && (_minor <= its_info->get_minor()
525 || DEFAULT_MINOR == its_info->get_minor()
526 || _minor == ANY_MINOR)) {
527 if(!its_info->is_local()) {
528 requested_service_add(_client, _service, _instance, _major, _minor);
529 if (discovery_) {
530 // Non local service instance ~> tell SD to find it!
531 discovery_->request_service(_service, _instance, _major,
532 _minor, DEFAULT_TTL);
533 }
534 its_info->add_client(_client);
535 ep_mgr_impl_->find_or_create_remote_client(_service, _instance);
536 }
537 }
538 }
539
540 if (_client == get_client()) {
541 stub_->create_local_receiver();
542
543 service_data_t request = { _service, _instance, _major, _minor };
544 std::set<service_data_t> requests;
545 requests.insert(request);
546 stub_->handle_requests(_client, requests);
547 }
548 }
549
release_service(client_t _client,service_t _service,instance_t _instance)550 void routing_manager_impl::release_service(client_t _client, service_t _service,
551 instance_t _instance) {
552
553 VSOMEIP_INFO << "RELEASE("
554 << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
555 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
556 << std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
557
558 if (host_->get_client() == _client) {
559 std::lock_guard<std::mutex> its_lock(pending_subscription_mutex_);
560 remove_pending_subscription(_service, _instance, 0xFFFF, ANY_EVENT);
561 }
562 routing_manager_base::release_service(_client, _service, _instance);
563 requested_service_remove(_client, _service, _instance);
564
565 std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
566 if (its_info && !its_info->is_local()) {
567 if (!its_info->get_requesters_size()) {
568 if (discovery_) {
569 discovery_->release_service(_service, _instance);
570 discovery_->unsubscribe_all(_service, _instance);
571 }
572 ep_mgr_impl_->clear_client_endpoints(_service, _instance, true);
573 ep_mgr_impl_->clear_client_endpoints(_service, _instance, false);
574 its_info->set_endpoint(nullptr, true);
575 its_info->set_endpoint(nullptr, false);
576 unset_all_eventpayloads(_service, _instance);
577 }
578 } else {
579 if (discovery_) {
580 discovery_->release_service(_service, _instance);
581 }
582 }
583 }
584
subscribe(client_t _client,uid_t _uid,gid_t _gid,service_t _service,instance_t _instance,eventgroup_t _eventgroup,major_version_t _major,event_t _event)585 void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid,
586 service_t _service, instance_t _instance, eventgroup_t _eventgroup,
587 major_version_t _major, event_t _event) {
588
589 VSOMEIP_INFO << "SUBSCRIBE("
590 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
591 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
592 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
593 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << ":"
594 << std::hex << std::setw(4) << std::setfill('0') << _event << ":"
595 << std::dec << (uint16_t)_major << "]";
596 const client_t its_local_client = find_local_client(_service, _instance);
597 if (get_client() == its_local_client) {
598 #ifdef VSOMEIP_ENABLE_COMPAT
599 routing_manager_base::set_incoming_subscription_state(_client, _service, _instance,
600 _eventgroup, _event, subscription_state_e::IS_SUBSCRIBING);
601 #endif
602 auto self = shared_from_this();
603 host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, true,
604 [this, self, _client, _uid, _gid, _service, _instance, _eventgroup,
605 _event, _major]
606 (const bool _subscription_accepted) {
607 (void) ep_mgr_->find_or_create_local(_client);
608 if (!_subscription_accepted) {
609 stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup, _event);
610 VSOMEIP_INFO << "Subscription request from client: 0x" << std::hex
611 << _client << std::dec << " for eventgroup: 0x" << _eventgroup
612 << " rejected from application handler.";
613 return;
614 } else {
615 stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event);
616 }
617 routing_manager_base::subscribe(_client, _uid, _gid, _service, _instance, _eventgroup, _major, _event);
618 #ifdef VSOMEIP_ENABLE_COMPAT
619 send_pending_notify_ones(_service, _instance, _eventgroup, _client);
620 routing_manager_base::erase_incoming_subscription_state(_client, _service, _instance,
621 _eventgroup, _event);
622 #endif
623 });
624 } else {
625 if (discovery_) {
626 std::set<event_t> its_already_subscribed_events;
627
628 // Note: The calls to insert_subscription & handle_subscription_state must not
629 // run concurrently to a call to on_subscribe_ack. Therefore the lock is acquired
630 // before calling insert_subscription and released after the call to
631 // handle_subscription_state.
632 std::unique_lock<std::mutex> its_critical(remote_subscription_state_mutex_);
633 bool inserted = insert_subscription(_service, _instance, _eventgroup,
634 _event, _client, &its_already_subscribed_events);
635 const bool subscriber_is_rm_host = (get_client() == _client);
636 if (inserted) {
637 if (0 == its_local_client) {
638 handle_subscription_state(_client, _service, _instance, _eventgroup, _event);
639 its_critical.unlock();
640 static const ttl_t configured_ttl(configuration_->get_sd_ttl());
641 notify_one_current_value(_client, _service, _instance,
642 _eventgroup, _event, its_already_subscribed_events);
643
644 auto its_info = find_eventgroup(_service, _instance, _eventgroup);
645 // if the subscriber is the rm_host itself: check if service
646 // is available before subscribing via SD otherwise we sent
647 // a StopSubscribe/Subscribe once the first offer is received
648 if (its_info &&
649 (!subscriber_is_rm_host || find_service(_service, _instance))) {
650 discovery_->subscribe(_service, _instance, _eventgroup,
651 _major, configured_ttl,
652 its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT,
653 its_info);
654 }
655 } else {
656 its_critical.unlock();
657 if (is_available(_service, _instance, _major)) {
658 stub_->send_subscribe(ep_mgr_->find_local(_service, _instance),
659 _client, _service, _instance, _eventgroup, _major, _event,
660 PENDING_SUBSCRIPTION_ID);
661 }
662 }
663 }
664 if (subscriber_is_rm_host) {
665 std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
666 subscription_data_t subscription = {
667 _service, _instance, _eventgroup, _major, _event, _uid, _gid
668 };
669 pending_subscriptions_.insert(subscription);
670 }
671 } else {
672 VSOMEIP_ERROR<< "SOME/IP eventgroups require SD to be enabled!";
673 }
674 }
675 }
676
unsubscribe(client_t _client,uid_t _uid,gid_t _gid,service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)677 void routing_manager_impl::unsubscribe(client_t _client, uid_t _uid, gid_t _gid,
678 service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
679
680 VSOMEIP_INFO << "UNSUBSCRIBE("
681 << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
682 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
683 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
684 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
685 << std::hex << std::setw(4) << std::setfill('0') << _event << "]";
686
687 bool last_subscriber_removed(true);
688
689 std::shared_ptr<eventgroupinfo> its_info
690 = find_eventgroup(_service, _instance, _eventgroup);
691 if (its_info) {
692 for (const auto& e : its_info->get_events()) {
693 if (e->get_event() == _event || ANY_EVENT == _event)
694 e->remove_subscriber(_eventgroup, _client);
695 }
696 for (const auto& e : its_info->get_events()) {
697 if (e->has_subscriber(_eventgroup, ANY_CLIENT)) {
698 last_subscriber_removed = false;
699 break;
700 }
701 }
702 }
703
704 if (discovery_) {
705 host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, false,
706 [](const bool _subscription_accepted){ (void)_subscription_accepted; });
707 if (0 == find_local_client(_service, _instance)) {
708 if (get_client() == _client) {
709 std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
710 remove_pending_subscription(_service, _instance, _eventgroup, _event);
711 }
712 if (last_subscriber_removed) {
713 unset_all_eventpayloads(_service, _instance, _eventgroup);
714 {
715 auto tuple = std::make_tuple(_service, _instance, _eventgroup, _client);
716 std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
717 remote_subscription_state_.erase(tuple);
718 }
719 }
720
721 if (its_info &&
722 (last_subscriber_removed || its_info->is_selective())) {
723
724 discovery_->unsubscribe(_service, _instance, _eventgroup,
725 its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT);
726 }
727 } else {
728 if (get_client() == _client) {
729 std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
730 remove_pending_subscription(_service, _instance, _eventgroup, _event);
731 stub_->send_unsubscribe(
732 ep_mgr_->find_local(_service, _instance),
733 _client, _service, _instance, _eventgroup, _event,
734 PENDING_SUBSCRIPTION_ID);
735 }
736 }
737 ep_mgr_impl_->clear_multicast_endpoints(_service, _instance);
738
739 } else {
740 VSOMEIP_ERROR<< "SOME/IP eventgroups require SD to be enabled!";
741 }
742 }
743
send(client_t _client,std::shared_ptr<message> _message)744 bool routing_manager_impl::send(client_t _client,
745 std::shared_ptr<message> _message) {
746 return routing_manager_base::send(_client, _message);
747 }
748
send(client_t _client,const byte_t * _data,length_t _size,instance_t _instance,bool _reliable,client_t _bound_client,credentials_t _credentials,uint8_t _status_check,bool _sent_from_remote)749 bool routing_manager_impl::send(client_t _client, const byte_t *_data,
750 length_t _size, instance_t _instance, bool _reliable,
751 client_t _bound_client,
752 credentials_t _credentials,
753 uint8_t _status_check, bool _sent_from_remote) {
754 bool is_sent(false);
755 if (_size > VSOMEIP_MESSAGE_TYPE_POS) {
756 std::shared_ptr<endpoint> its_target;
757 bool is_request = utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS]);
758 bool is_notification = utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS]);
759 bool is_response = utility::is_response(_data[VSOMEIP_MESSAGE_TYPE_POS]);
760
761 client_t its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
762 _data[VSOMEIP_CLIENT_POS_MAX]);
763 service_t its_service = VSOMEIP_BYTES_TO_WORD(
764 _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
765 method_t its_method = VSOMEIP_BYTES_TO_WORD(
766 _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
767
768 bool is_service_discovery
769 = (its_service == sd::service && its_method == sd::method);
770
771 if (is_request) {
772 its_target = ep_mgr_->find_local(its_service, _instance);
773 } else if (!is_notification) {
774 its_target = find_local(its_client);
775 } else if (is_notification && _client && !is_service_discovery) { // Selective notifications!
776 if (_client == get_client()) {
777 #ifdef USE_DLT
778 const uint16_t its_data_size
779 = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
780
781 trace::header its_header;
782 if (its_header.prepare(its_target, true, _instance))
783 tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
784 _data, its_data_size);
785 #endif
786 deliver_message(_data, _size, _instance, _reliable, _bound_client, _credentials, _status_check, _sent_from_remote);
787 return true;
788 }
789 its_target = find_local(_client);
790 }
791
792 if (its_target) {
793 #ifdef USE_DLT
794 if ((is_request && its_client == get_client()) ||
795 (is_response && find_local_client(its_service, _instance) == get_client()) ||
796 (is_notification && find_local_client(its_service, _instance) == VSOMEIP_ROUTING_CLIENT)) {
797 const uint16_t its_data_size
798 = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
799
800 trace::header its_header;
801 if (its_header.prepare(its_target, true, _instance))
802 tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
803 _data, its_data_size);
804 }
805 #endif
806 is_sent = send_local(its_target, get_client(), _data, _size, _instance, _reliable, VSOMEIP_SEND, _status_check);
807 } else {
808 // Check whether hosting application should get the message
809 // If not, check routes to external
810 if ((its_client == host_->get_client() && is_response)
811 || (find_local_client(its_service, _instance)
812 == host_->get_client() && is_request)) {
813 // TODO: Find out how to handle session id here
814 is_sent = deliver_message(_data, _size, _instance, _reliable, VSOMEIP_ROUTING_CLIENT, _credentials, _status_check);
815 } else {
816 e2e_buffer its_buffer;
817
818 if (e2e_provider_) {
819 if ( !is_service_discovery) {
820 service_t its_service = VSOMEIP_BYTES_TO_WORD(
821 _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
822 method_t its_method = VSOMEIP_BYTES_TO_WORD(
823 _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
824 #ifndef ANDROID
825 if (e2e_provider_->is_protected({its_service, its_method})) {
826 // Find out where the protected area starts
827 size_t its_base = e2e_provider_->get_protection_base({its_service, its_method});
828
829 // Build a corresponding buffer
830 its_buffer.assign(_data + its_base, _data + _size);
831
832 e2e_provider_->protect({ its_service, its_method }, its_buffer, _instance);
833
834 // Prepend header
835 its_buffer.insert(its_buffer.begin(), _data, _data + its_base);
836
837 _data = its_buffer.data();
838 }
839 #endif
840 }
841 }
842 if (is_request) {
843 its_target = ep_mgr_impl_->find_or_create_remote_client(
844 its_service, _instance, _reliable);
845 if (its_target) {
846 #ifdef USE_DLT
847 const uint16_t its_data_size
848 = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
849
850 trace::header its_header;
851 if (its_header.prepare(its_target, true, _instance))
852 tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
853 _data, its_data_size);
854 #endif
855 is_sent = its_target->send(_data, _size);
856 } else {
857 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
858 _data[VSOMEIP_SESSION_POS_MIN],
859 _data[VSOMEIP_SESSION_POS_MAX]);
860 VSOMEIP_ERROR<< "Routing info for remote service could not be found! ("
861 << std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
862 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
863 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
864 << std::hex << std::setw(4) << std::setfill('0') << its_method << "] "
865 << std::hex << std::setw(4) << std::setfill('0') << its_session;
866 }
867 } else {
868 std::shared_ptr<serviceinfo> its_info(find_service(its_service, _instance));
869 if (its_info || is_service_discovery) {
870 if (is_notification && !is_service_discovery) {
871 send_local_notification(get_client(), _data, _size, _instance, _reliable, _status_check);
872 method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
873 _data[VSOMEIP_METHOD_POS_MAX]);
874 std::shared_ptr<event> its_event = find_event(its_service, _instance, its_method);
875 if (its_event) {
876 #ifdef USE_DLT
877 bool has_sent(false);
878 #endif
879 std::set<std::shared_ptr<endpoint_definition>> its_targets;
880 // we need both endpoints as clients can subscribe to events via TCP and UDP
881 std::shared_ptr<endpoint> its_udp_server_endpoint = its_info->get_endpoint(false);
882 std::shared_ptr<endpoint> its_tcp_server_endpoint = its_info->get_endpoint(true);
883
884 if (its_udp_server_endpoint || its_tcp_server_endpoint) {
885 const auto its_reliability = its_event->get_reliability();
886 for (auto its_group : its_event->get_eventgroups()) {
887 auto its_eventgroup = find_eventgroup(its_service, _instance, its_group);
888 if (its_eventgroup) {
889 // Unicast targets
890 for (const auto &its_remote : its_eventgroup->get_unicast_targets()) {
891 if (its_remote->is_reliable() && its_tcp_server_endpoint) {
892 if (its_reliability == reliability_type_e::RT_RELIABLE
893 || its_reliability == reliability_type_e::RT_BOTH) {
894 its_targets.insert(its_remote);
895 }
896 } else if (its_udp_server_endpoint && !its_eventgroup->is_sending_multicast()) {
897 if (its_reliability == reliability_type_e::RT_UNRELIABLE
898 || its_reliability == reliability_type_e::RT_BOTH) {
899 its_targets.insert(its_remote);
900 }
901 }
902 }
903 // Send to multicast targets if subscribers are still interested
904 if (its_eventgroup->is_sending_multicast()) {
905 if (its_reliability == reliability_type_e::RT_UNRELIABLE
906 || its_reliability == reliability_type_e::RT_BOTH) {
907 boost::asio::ip::address its_address;
908 uint16_t its_port;
909 if (its_eventgroup->get_multicast(its_address, its_port)) {
910 std::shared_ptr<endpoint_definition> its_multicast_target;
911 its_multicast_target = endpoint_definition::get(its_address,
912 its_port, false, its_service, _instance);
913 its_targets.insert(its_multicast_target);
914 }
915 }
916 }
917 }
918 }
919 }
920
921 for (auto const &target : its_targets) {
922 if (target->is_reliable()) {
923 its_tcp_server_endpoint->send_to(target, _data, _size);
924 } else {
925 its_udp_server_endpoint->send_to(target, _data, _size);
926 }
927 #ifdef USE_DLT
928 has_sent = true;
929 #endif
930 }
931 #ifdef USE_DLT
932 if (has_sent) {
933 const uint16_t its_data_size
934 = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
935
936 trace::header its_header;
937 if (its_header.prepare(nullptr, true, _instance))
938 tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
939 _data, its_data_size);
940 }
941 #endif
942 }
943 } else {
944 if ((utility::is_response(_data[VSOMEIP_MESSAGE_TYPE_POS])
945 || utility::is_error(_data[VSOMEIP_MESSAGE_TYPE_POS]))
946 && !its_info->is_local()) {
947 // We received a response/error but neither the hosting application
948 // nor another local client could be found --> drop
949 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
950 _data[VSOMEIP_SESSION_POS_MIN],
951 _data[VSOMEIP_SESSION_POS_MAX]);
952 VSOMEIP_ERROR
953 << "routing_manager_impl::send: Received response/error for unknown client ("
954 << std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
955 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
956 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
957 << std::hex << std::setw(4) << std::setfill('0') << its_method << "] "
958 << std::hex << std::setw(4) << std::setfill('0') << its_session;
959 return false;
960 }
961 its_target = is_service_discovery ?
962 (sd_info_ ? sd_info_->get_endpoint(false) : nullptr) : its_info->get_endpoint(_reliable);
963 if (its_target) {
964 #ifdef USE_DLT
965 const uint16_t its_data_size
966 = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
967
968 trace::header its_header;
969 if (its_header.prepare(its_target, true, _instance))
970 tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
971 _data, its_data_size);
972 #endif
973 is_sent = its_target->send(_data, _size);
974 } else {
975 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
976 _data[VSOMEIP_SESSION_POS_MIN],
977 _data[VSOMEIP_SESSION_POS_MAX]);
978 VSOMEIP_ERROR << "Routing error. Endpoint for service ("
979 << std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
980 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
981 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
982 << std::hex << std::setw(4) << std::setfill('0') << its_method << "] "
983 << std::hex << std::setw(4) << std::setfill('0') << its_session
984 << " could not be found!";
985 }
986 }
987 } else {
988 if (!is_notification) {
989 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
990 _data[VSOMEIP_SESSION_POS_MIN],
991 _data[VSOMEIP_SESSION_POS_MAX]);
992 VSOMEIP_ERROR << "Routing error. Not hosting service ("
993 << std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
994 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
995 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
996 << std::hex << std::setw(4) << std::setfill('0') << its_method << "] "
997 << std::hex << std::setw(4) << std::setfill('0') << its_session;
998 }
999 }
1000 }
1001 }
1002 }
1003 }
1004
1005 return (is_sent);
1006 }
1007
send_to(const client_t _client,const std::shared_ptr<endpoint_definition> & _target,std::shared_ptr<message> _message)1008 bool routing_manager_impl::send_to(
1009 const client_t _client,
1010 const std::shared_ptr<endpoint_definition> &_target,
1011 std::shared_ptr<message> _message) {
1012 bool is_sent(false);
1013
1014 std::shared_ptr<serializer> its_serializer(get_serializer());
1015 if (its_serializer->serialize(_message.get())) {
1016 const byte_t *its_data = its_serializer->get_data();
1017 length_t its_size = its_serializer->get_size();
1018 e2e_buffer its_buffer;
1019 if (e2e_provider_) {
1020 service_t its_service = VSOMEIP_BYTES_TO_WORD(
1021 its_data[VSOMEIP_SERVICE_POS_MIN],
1022 its_data[VSOMEIP_SERVICE_POS_MAX]);
1023 method_t its_method = VSOMEIP_BYTES_TO_WORD(
1024 its_data[VSOMEIP_METHOD_POS_MIN],
1025 its_data[VSOMEIP_METHOD_POS_MAX]);
1026 #ifndef ANDROID
1027 if (e2e_provider_->is_protected({its_service, its_method})) {
1028 auto its_base = e2e_provider_->get_protection_base({its_service, its_method});
1029 its_buffer.assign(its_data + its_base, its_data + its_size);
1030 e2e_provider_->protect({its_service, its_method}, its_buffer, _message->get_instance());
1031 its_buffer.insert(its_buffer.begin(), its_data, its_data + its_base);
1032 its_data = its_buffer.data();
1033 }
1034 #endif
1035 }
1036
1037 const_cast<byte_t*>(its_data)[VSOMEIP_CLIENT_POS_MIN] = VSOMEIP_WORD_BYTE1(_client);
1038 const_cast<byte_t*>(its_data)[VSOMEIP_CLIENT_POS_MAX] = VSOMEIP_WORD_BYTE0(_client);
1039
1040 is_sent = send_to(_target, its_data, its_size, _message->get_instance());
1041
1042 its_serializer->reset();
1043 put_serializer(its_serializer);
1044 } else {
1045 VSOMEIP_ERROR<< "routing_manager_impl::send_to: serialization failed.";
1046 }
1047 return (is_sent);
1048 }
1049
send_to(const std::shared_ptr<endpoint_definition> & _target,const byte_t * _data,uint32_t _size,instance_t _instance)1050 bool routing_manager_impl::send_to(
1051 const std::shared_ptr<endpoint_definition> &_target,
1052 const byte_t *_data, uint32_t _size, instance_t _instance) {
1053 std::shared_ptr<endpoint> its_endpoint =
1054 ep_mgr_impl_->find_server_endpoint(
1055 _target->get_remote_port(), _target->is_reliable());
1056
1057 if (its_endpoint) {
1058 #ifdef USE_DLT
1059 const uint16_t its_data_size
1060 = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
1061
1062 trace::header its_header;
1063 if (its_header.prepare(its_endpoint, true, _instance))
1064 tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
1065 _data, its_data_size);
1066 #else
1067 (void) _instance;
1068 #endif
1069 return its_endpoint->send_to(_target, _data, _size);
1070 }
1071 return false;
1072 }
1073
send_via_sd(const std::shared_ptr<endpoint_definition> & _target,const byte_t * _data,uint32_t _size,uint16_t _sd_port)1074 bool routing_manager_impl::send_via_sd(
1075 const std::shared_ptr<endpoint_definition> &_target,
1076 const byte_t *_data, uint32_t _size, uint16_t _sd_port) {
1077 std::shared_ptr<endpoint> its_endpoint =
1078 ep_mgr_impl_->find_server_endpoint(_sd_port,
1079 _target->is_reliable());
1080
1081 if (its_endpoint) {
1082 #ifdef USE_DLT
1083 if (tc_->is_sd_enabled()) {
1084 const uint16_t its_data_size
1085 = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
1086
1087 trace::header its_header;
1088 if (its_header.prepare(its_endpoint, true, 0x0))
1089 tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
1090 _data, its_data_size);
1091
1092 }
1093 #endif
1094 return its_endpoint->send_to(_target, _data, _size);
1095 }
1096
1097 return false;
1098 }
1099
register_event(client_t _client,service_t _service,instance_t _instance,event_t _notifier,const std::set<eventgroup_t> & _eventgroups,const event_type_e _type,reliability_type_e _reliability,std::chrono::milliseconds _cycle,bool _change_resets_cycle,bool _update_on_change,epsilon_change_func_t _epsilon_change_func,bool _is_provided,bool _is_shadow,bool _is_cache_placeholder)1100 void routing_manager_impl::register_event(client_t _client,
1101 service_t _service, instance_t _instance,
1102 event_t _notifier,
1103 const std::set<eventgroup_t> &_eventgroups, const event_type_e _type,
1104 reliability_type_e _reliability,
1105 std::chrono::milliseconds _cycle, bool _change_resets_cycle,
1106 bool _update_on_change,
1107 epsilon_change_func_t _epsilon_change_func,
1108 bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) {
1109 auto its_event = find_event(_service, _instance, _notifier);
1110 bool is_first(false);
1111 if (its_event) {
1112 if (!its_event->has_ref(_client, _is_provided)) {
1113 is_first = true;
1114 }
1115 } else {
1116 is_first = true;
1117 }
1118 if (is_first) {
1119 routing_manager_base::register_event(_client,
1120 _service, _instance,
1121 _notifier,
1122 _eventgroups, _type, _reliability,
1123 _cycle, _change_resets_cycle, _update_on_change,
1124 _epsilon_change_func, _is_provided, _is_shadow,
1125 _is_cache_placeholder);
1126 }
1127 VSOMEIP_INFO << "REGISTER EVENT("
1128 << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
1129 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1130 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
1131 << std::hex << std::setw(4) << std::setfill('0') << _notifier
1132 << ":is_provider=" << std::boolalpha << _is_provided << "]";
1133 }
1134
register_shadow_event(client_t _client,service_t _service,instance_t _instance,event_t _notifier,const std::set<eventgroup_t> & _eventgroups,event_type_e _type,reliability_type_e _reliability,bool _is_provided)1135 void routing_manager_impl::register_shadow_event(client_t _client,
1136 service_t _service, instance_t _instance,
1137 event_t _notifier,
1138 const std::set<eventgroup_t> &_eventgroups, event_type_e _type,
1139 reliability_type_e _reliability, bool _is_provided) {
1140 routing_manager_base::register_event(_client,
1141 _service, _instance,
1142 _notifier,
1143 _eventgroups, _type, _reliability,
1144 std::chrono::milliseconds::zero(), false, true,
1145 nullptr,
1146 _is_provided, true);
1147 }
1148
unregister_shadow_event(client_t _client,service_t _service,instance_t _instance,event_t _event,bool _is_provided)1149 void routing_manager_impl::unregister_shadow_event(client_t _client,
1150 service_t _service, instance_t _instance,
1151 event_t _event, bool _is_provided) {
1152 routing_manager_base::unregister_event(_client, _service, _instance,
1153 _event, _is_provided);
1154 }
1155
notify_one(service_t _service,instance_t _instance,event_t _event,std::shared_ptr<payload> _payload,client_t _client,bool _force,bool _remote_subscriber)1156 void routing_manager_impl::notify_one(service_t _service, instance_t _instance,
1157 event_t _event, std::shared_ptr<payload> _payload, client_t _client,
1158 bool _force
1159 #ifdef VSOMEIP_ENABLE_COMPAT
1160 , bool _remote_subscriber
1161 #endif
1162 ) {
1163 if (find_local(_client)) {
1164 routing_manager_base::notify_one(_service, _instance, _event, _payload,
1165 _client, _force
1166 #ifdef VSOMEIP_ENABLE_COMPAT
1167 , _remote_subscriber
1168 #endif
1169 );
1170 } else {
1171 std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
1172 if (its_event) {
1173 std::set<std::shared_ptr<endpoint_definition> > its_targets;
1174 const auto its_reliability = its_event->get_reliability();
1175 for (const auto& g : its_event->get_eventgroups()) {
1176 const auto its_eventgroup = find_eventgroup(_service, _instance, g);
1177 if (its_eventgroup) {
1178 const auto its_subscriptions = its_eventgroup->get_remote_subscriptions();
1179 for (const auto &s : its_subscriptions) {
1180 if (s->has_client(_client)) {
1181 if (its_reliability == reliability_type_e::RT_RELIABLE
1182 || its_reliability == reliability_type_e::RT_BOTH) {
1183 const auto its_reliable = s->get_reliable();
1184 if (its_reliable)
1185 its_targets.insert(its_reliable);
1186 }
1187 if (its_reliability == reliability_type_e::RT_UNRELIABLE
1188 || its_reliability == reliability_type_e::RT_BOTH) {
1189 const auto its_unreliable = s->get_unreliable();
1190 if (its_unreliable)
1191 its_targets.insert(its_unreliable);
1192 }
1193 }
1194 }
1195 }
1196 }
1197
1198 if (its_targets.size() > 0) {
1199 for (const auto &its_target : its_targets) {
1200 its_event->set_payload(_payload, _client, its_target, _force);
1201 }
1202 }
1203 } else {
1204 VSOMEIP_WARNING << "Attempt to update the undefined event/field ["
1205 << std::hex << _service << "." << _instance << "." << _event
1206 << "]";
1207 }
1208 }
1209 }
1210
on_availability(service_t _service,instance_t _instance,bool _is_available,major_version_t _major,minor_version_t _minor)1211 void routing_manager_impl::on_availability(service_t _service, instance_t _instance,
1212 bool _is_available, major_version_t _major, minor_version_t _minor) {
1213
1214 // insert subscriptions of routing manager into service discovery
1215 // to send SubscribeEventgroup after StopOffer / Offer was received
1216 if (_is_available) {
1217 if (discovery_) {
1218 const client_t its_local_client = find_local_client(_service, _instance);
1219 // remote service
1220 if (VSOMEIP_ROUTING_CLIENT == its_local_client) {
1221 static const ttl_t configured_ttl(configuration_->get_sd_ttl());
1222
1223 std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
1224 for (auto &ps : pending_subscriptions_) {
1225 if (ps.service_ == _service
1226 && ps.instance_ == _instance
1227 && ps.major_ == _major) {
1228 auto its_info = find_eventgroup(_service, _instance, ps.eventgroup_);
1229 if (its_info) {
1230 discovery_->subscribe(
1231 _service,
1232 _instance,
1233 ps.eventgroup_,
1234 _major,
1235 configured_ttl,
1236 its_info->is_selective() ? get_client() : VSOMEIP_ROUTING_CLIENT,
1237 its_info);
1238 }
1239 }
1240 }
1241 }
1242 }
1243 }
1244 host_->on_availability(_service, _instance, _is_available, _major, _minor);
1245 }
1246
1247
offer_service_remotely(service_t _service,instance_t _instance,std::uint16_t _port,bool _reliable,bool _magic_cookies_enabled)1248 bool routing_manager_impl::offer_service_remotely(service_t _service,
1249 instance_t _instance,
1250 std::uint16_t _port,
1251 bool _reliable,
1252 bool _magic_cookies_enabled) {
1253 bool ret = true;
1254
1255 if(!is_available(_service, _instance, ANY_MAJOR)) {
1256 VSOMEIP_ERROR << __func__ << ": Service ["
1257 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1258 << std::hex << std::setw(4) << std::setfill('0') << _instance
1259 << "] is not offered locally! Won't offer it remotely.";
1260 ret = false;
1261 } else {
1262 // update service info in configuration
1263 if (!configuration_->remote_offer_info_add(_service, _instance, _port,
1264 _reliable, _magic_cookies_enabled)) {
1265 ret = false;
1266 } else {
1267 // trigger event registration again to create shadow events
1268 const client_t its_offering_client = find_local_client(_service, _instance);
1269 if (its_offering_client == VSOMEIP_ROUTING_CLIENT) {
1270 VSOMEIP_ERROR << __func__ << " didn't find offering client for service ["
1271 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1272 << std::hex << std::setw(4) << std::setfill('0') << _instance
1273 << "]";
1274 ret = false;
1275 } else {
1276 if (!stub_->send_provided_event_resend_request(its_offering_client,
1277 pending_remote_offer_add(_service, _instance))) {
1278 VSOMEIP_ERROR << __func__ << ": Couldn't send event resend"
1279 << "request to client 0x" << std::hex << std::setw(4)
1280 << std::setfill('0') << its_offering_client << " providing service ["
1281 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1282 << std::hex << std::setw(4) << std::setfill('0') << _instance
1283 << "]";
1284
1285 ret = false;
1286 }
1287 }
1288 }
1289 }
1290 return ret;
1291 }
1292
stop_offer_service_remotely(service_t _service,instance_t _instance,std::uint16_t _port,bool _reliable,bool _magic_cookies_enabled)1293 bool routing_manager_impl::stop_offer_service_remotely(service_t _service,
1294 instance_t _instance,
1295 std::uint16_t _port,
1296 bool _reliable,
1297 bool _magic_cookies_enabled) {
1298 bool ret = true;
1299 bool service_still_offered_remote(false);
1300 // update service configuration
1301 if (!configuration_->remote_offer_info_remove(_service, _instance, _port,
1302 _reliable, _magic_cookies_enabled, &service_still_offered_remote)) {
1303 VSOMEIP_ERROR << __func__ << " couldn't remove remote offer info for service ["
1304 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1305 << std::hex << std::setw(4) << std::setfill('0') << _instance
1306 << "] from configuration";
1307 ret = false;
1308 }
1309 std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
1310 std::shared_ptr<endpoint> its_server_endpoint;
1311 if (its_info) {
1312 its_server_endpoint = its_info->get_endpoint(_reliable);
1313 }
1314 // don't deregister events if the service is still offered remotely
1315 if (!service_still_offered_remote) {
1316 const client_t its_offering_client = find_local_client(_service, _instance);
1317 major_version_t its_major(0);
1318 minor_version_t its_minor(0);
1319 if (its_info) {
1320 its_major = its_info->get_major();
1321 its_minor = its_info->get_minor();
1322 }
1323 // unset payload and clear subcribers
1324 routing_manager_base::stop_offer_service(its_offering_client,
1325 _service, _instance, its_major, its_minor);
1326 // unregister events
1327 for (const event_t its_event_id : find_events(_service, _instance)) {
1328 unregister_shadow_event(its_offering_client, _service, _instance,
1329 its_event_id, true);
1330 }
1331 clear_targets_and_pending_sub_from_eventgroups(_service, _instance);
1332 clear_remote_subscriber(_service, _instance);
1333
1334 if (discovery_ && its_info) {
1335 discovery_->stop_offer_service(its_info);
1336 its_info->set_endpoint(std::shared_ptr<endpoint>(), _reliable);
1337 }
1338 } else {
1339 // service is still partly offered
1340 if (discovery_ && its_info) {
1341 std::shared_ptr<serviceinfo> its_copied_info =
1342 std::make_shared<serviceinfo>(*its_info);
1343 its_info->set_endpoint(std::shared_ptr<endpoint>(), _reliable);
1344 // ensure to not send StopOffer for endpoint on which the service is
1345 // still offered
1346 its_copied_info->set_endpoint(std::shared_ptr<endpoint>(), !_reliable);
1347 discovery_->stop_offer_service(its_copied_info);
1348 }
1349 }
1350
1351 cleanup_server_endpoint(_service, its_server_endpoint);
1352 return ret;
1353 }
1354
on_message(const byte_t * _data,length_t _size,endpoint * _receiver,const boost::asio::ip::address & _destination,client_t _bound_client,credentials_t _credentials,const boost::asio::ip::address & _remote_address,std::uint16_t _remote_port)1355 void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
1356 endpoint *_receiver, const boost::asio::ip::address &_destination,
1357 client_t _bound_client, credentials_t _credentials,
1358 const boost::asio::ip::address &_remote_address,
1359 std::uint16_t _remote_port) {
1360 #if 0
1361 std::stringstream msg;
1362 msg << "rmi::on_message: ";
1363 for (uint32_t i = 0; i < _size; ++i)
1364 msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
1365 VSOMEIP_INFO << msg.str();
1366 #endif
1367 (void)_bound_client;
1368 service_t its_service;
1369 method_t its_method;
1370 uint8_t its_check_status = e2e::profile_interface::generic_check_status::E2E_OK;
1371 instance_t its_instance(0x0);
1372 #ifdef USE_DLT
1373 bool is_forwarded(true);
1374 #endif
1375 if (_size >= VSOMEIP_SOMEIP_HEADER_SIZE) {
1376 its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
1377 _data[VSOMEIP_SERVICE_POS_MAX]);
1378 if (its_service == VSOMEIP_SD_SERVICE) {
1379 its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
1380 _data[VSOMEIP_METHOD_POS_MAX]);
1381 if (discovery_ && its_method == sd::method) {
1382 if (configuration_->get_sd_port() == _remote_port) {
1383 if (!_remote_address.is_unspecified()) {
1384 discovery_->on_message(_data, _size, _remote_address, _destination);
1385 } else {
1386 VSOMEIP_ERROR << "Ignored SD message from unknown address.";
1387 }
1388 } else {
1389 VSOMEIP_ERROR << "Ignored SD message from unknown port ("
1390 << _remote_port << ")";
1391 }
1392 }
1393 } else {
1394 if(_destination.is_multicast()) {
1395 its_instance = ep_mgr_impl_->find_instance_multicast(its_service, _remote_address);
1396 } else {
1397 its_instance = ep_mgr_impl_->find_instance(its_service, _receiver);
1398 }
1399 if (its_instance == 0xFFFF) {
1400 its_method = VSOMEIP_BYTES_TO_WORD(
1401 _data[VSOMEIP_METHOD_POS_MIN],
1402 _data[VSOMEIP_METHOD_POS_MAX]);
1403 const client_t its_client = VSOMEIP_BYTES_TO_WORD(
1404 _data[VSOMEIP_CLIENT_POS_MIN],
1405 _data[VSOMEIP_CLIENT_POS_MAX]);
1406 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
1407 _data[VSOMEIP_SESSION_POS_MIN],
1408 _data[VSOMEIP_SESSION_POS_MAX]);
1409 boost::system::error_code ec;
1410 VSOMEIP_ERROR << "Received message on invalid port: ["
1411 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
1412 << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
1413 << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
1414 << std::hex << std::setw(4) << std::setfill('0') << its_client << "."
1415 << std::hex << std::setw(4) << std::setfill('0') << its_session << "] from: "
1416 << _remote_address.to_string(ec) << ":" << std::dec << _remote_port;
1417 }
1418 //Ignore messages with invalid message type
1419 if(_size >= VSOMEIP_MESSAGE_TYPE_POS) {
1420 if(!utility::is_valid_message_type(static_cast<message_type_e>(_data[VSOMEIP_MESSAGE_TYPE_POS]))) {
1421 VSOMEIP_ERROR << "Ignored SomeIP message with invalid message type.";
1422 return;
1423 }
1424 }
1425 return_code_e return_code = check_error(_data, _size, its_instance);
1426 if(!(_size >= VSOMEIP_MESSAGE_TYPE_POS && utility::is_request_no_return(_data[VSOMEIP_MESSAGE_TYPE_POS]))) {
1427 if (return_code != return_code_e::E_OK && return_code != return_code_e::E_NOT_OK) {
1428 send_error(return_code, _data, _size, its_instance,
1429 _receiver->is_reliable(), _receiver,
1430 _remote_address, _remote_port);
1431 return;
1432 }
1433 } else if(return_code != return_code_e::E_OK && return_code != return_code_e::E_NOT_OK) {
1434 //Ignore request no response message if an error occured
1435 return;
1436 }
1437
1438 // Security checks if enabled!
1439 if (security::get()->is_enabled()) {
1440 if (utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
1441 client_t requester = VSOMEIP_BYTES_TO_WORD(
1442 _data[VSOMEIP_CLIENT_POS_MIN],
1443 _data[VSOMEIP_CLIENT_POS_MAX]);
1444 its_method = VSOMEIP_BYTES_TO_WORD(
1445 _data[VSOMEIP_METHOD_POS_MIN],
1446 _data[VSOMEIP_METHOD_POS_MAX]);
1447 if (!configuration_->is_offered_remote(its_service, its_instance)) {
1448 VSOMEIP_WARNING << std::hex << "Security: Received a remote request "
1449 << "for service/instance " << its_service << "/" << its_instance
1450 << " which isn't offered remote ~> Skip message!";
1451 return;
1452 }
1453 if (find_local(requester)) {
1454 VSOMEIP_WARNING << std::hex << "Security: Received a remote request "
1455 << "from client identifier 0x" << requester
1456 << " which is already used locally ~> Skip message!";
1457 return;
1458 }
1459 if (!security::get()->is_remote_client_allowed()) {
1460 // check if policy allows remote requests.
1461 VSOMEIP_WARNING << "routing_manager_impl::on_message: "
1462 << std::hex << "Security: Remote client with client ID 0x" << requester
1463 << " is not allowed to communicate with service/instance/method "
1464 << its_service << "/" << its_instance
1465 << "/" << its_method;
1466 return;
1467 }
1468 }
1469 }
1470 if (e2e_provider_) {
1471 its_method = VSOMEIP_BYTES_TO_WORD(
1472 _data[VSOMEIP_METHOD_POS_MIN],
1473 _data[VSOMEIP_METHOD_POS_MAX]);
1474 #ifndef ANDROID
1475 if (e2e_provider_->is_checked({its_service, its_method})) {
1476 auto its_base = e2e_provider_->get_protection_base({its_service, its_method});
1477 e2e_buffer its_buffer(_data + its_base, _data + _size);
1478 e2e_provider_->check({its_service, its_method},
1479 its_buffer, its_instance, its_check_status);
1480
1481 if (its_check_status != e2e::profile_interface::generic_check_status::E2E_OK) {
1482 VSOMEIP_INFO << "E2E protection: CRC check failed for service: "
1483 << std::hex << its_service << " method: " << its_method;
1484 }
1485 }
1486 #endif
1487 }
1488 // Common way of message handling
1489 #ifdef USE_DLT
1490 is_forwarded =
1491 #endif
1492 on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(),
1493 _bound_client, _credentials, its_check_status, true);
1494 }
1495 }
1496 #ifdef USE_DLT
1497 if (is_forwarded) {
1498 const uint16_t its_data_size
1499 = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
1500
1501 trace::header its_header;
1502 const boost::asio::ip::address_v4 its_remote_address =
1503 _remote_address.is_v4() ? _remote_address.to_v4() :
1504 boost::asio::ip::address_v4::from_string("6.6.6.6");
1505 trace::protocol_e its_protocol =
1506 _receiver->is_local() ? trace::protocol_e::local :
1507 _receiver->is_reliable() ? trace::protocol_e::tcp :
1508 trace::protocol_e::udp;
1509 its_header.prepare(its_remote_address, _remote_port, its_protocol, false,
1510 its_instance);
1511 tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data,
1512 its_data_size);
1513 }
1514 #endif
1515 }
1516
on_message(service_t _service,instance_t _instance,const byte_t * _data,length_t _size,bool _reliable,client_t _bound_client,credentials_t _credentials,uint8_t _check_status,bool _is_from_remote)1517 bool routing_manager_impl::on_message(
1518 service_t _service, instance_t _instance,
1519 const byte_t *_data, length_t _size,
1520 bool _reliable, client_t _bound_client,
1521 credentials_t _credentials,
1522 uint8_t _check_status,
1523 bool _is_from_remote) {
1524 #if 0
1525 std::stringstream msg;
1526 msg << "rmi::on_message("
1527 << std::hex << std::setw(4) << std::setfill('0')
1528 << _service << ", " << _instance << "): ";
1529 for (uint32_t i = 0; i < _size; ++i)
1530 msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
1531 VSOMEIP_INFO << msg.str();
1532 #endif
1533 client_t its_client;
1534 bool is_forwarded(true);
1535
1536 if (utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
1537 its_client = find_local_client(_service, _instance);
1538 } else {
1539 its_client = VSOMEIP_BYTES_TO_WORD(
1540 _data[VSOMEIP_CLIENT_POS_MIN],
1541 _data[VSOMEIP_CLIENT_POS_MAX]);
1542 }
1543
1544 if (utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
1545 is_forwarded = deliver_notification(_service, _instance, _data, _size,
1546 _reliable, _bound_client, _credentials, _check_status, _is_from_remote);
1547 } else if (its_client == host_->get_client()) {
1548 deliver_message(_data, _size, _instance,
1549 _reliable, _bound_client, _credentials, _check_status, _is_from_remote);
1550 } else {
1551 send(its_client, _data, _size, _instance, _reliable,
1552 _bound_client, _credentials, _check_status, _is_from_remote); //send to proxy
1553 }
1554 return is_forwarded;
1555 }
1556
on_notification(client_t _client,service_t _service,instance_t _instance,const byte_t * _data,length_t _size,bool _notify_one)1557 void routing_manager_impl::on_notification(client_t _client,
1558 service_t _service, instance_t _instance,
1559 const byte_t *_data, length_t _size, bool _notify_one) {
1560 event_t its_event_id = VSOMEIP_BYTES_TO_WORD(
1561 _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
1562 std::shared_ptr<event> its_event = find_event(_service, _instance, its_event_id);
1563 if (its_event) {
1564 uint32_t its_length = utility::get_payload_size(_data, _size);
1565 std::shared_ptr<payload> its_payload =
1566 runtime::get()->create_payload(
1567 &_data[VSOMEIP_PAYLOAD_POS],
1568 its_length);
1569
1570 if (_notify_one) {
1571 notify_one(_service, _instance, its_event->get_event(),
1572 its_payload, _client, true
1573 #ifdef VSOMEIP_ENABLE_COMPAT
1574 , false
1575 #endif
1576 );
1577 } else {
1578 if (its_event->is_field()) {
1579 if (!its_event->set_payload_notify_pending(its_payload)) {
1580 its_event->set_payload(its_payload, false);
1581 }
1582 } else {
1583 its_event->set_payload(its_payload, false, true);
1584 }
1585 }
1586 }
1587 }
1588
is_last_stop_callback(const uint32_t _callback_id)1589 bool routing_manager_impl::is_last_stop_callback(const uint32_t _callback_id) {
1590 bool last_callback(false);
1591 auto found_id = callback_counts_.find(_callback_id);
1592 if (found_id != callback_counts_.end()) {
1593 found_id->second--;
1594 if (found_id->second == 0) {
1595 last_callback = true;
1596 }
1597 }
1598 if (last_callback) {
1599 callback_counts_.erase(_callback_id);
1600 }
1601 return last_callback;
1602 }
1603
on_stop_offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)1604 void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _service,
1605 instance_t _instance, major_version_t _major, minor_version_t _minor) {
1606 {
1607 std::lock_guard<std::mutex> its_lock(local_services_mutex_);
1608 auto found_service = local_services_.find(_service);
1609 if (found_service != local_services_.end()) {
1610 auto found_instance = found_service->second.find(_instance);
1611 if (found_instance != found_service->second.end()) {
1612 if ( std::get<0>(found_instance->second) != _major
1613 || std::get<1>(found_instance->second) != _minor
1614 || std::get<2>(found_instance->second) != _client) {
1615 VSOMEIP_WARNING
1616 << "routing_manager_impl::on_stop_offer_service: "
1617 << "trying to delete service not matching exactly "
1618 << "the one offered previously: " << "[" << std::hex
1619 << std::setw(4) << std::setfill('0') << _service
1620 << "." << _instance << "." << std::dec
1621 << static_cast<std::uint32_t>(_major)
1622 << "." << _minor << "] by application: "
1623 << std::hex << std::setw(4) << std::setfill('0')
1624 << _client << ". Stored: [" << std::hex
1625 << std::setw(4) << std::setfill('0') << _service
1626 << "." << _instance << "." << std::dec
1627 << static_cast<std::uint32_t>(std::get<0>(found_instance->second)) << "."
1628 << std::get<1>(found_instance->second)
1629 << "] by application: " << std::hex << std::setw(4)
1630 << std::setfill('0') << std::get<2>(found_instance->second);
1631 }
1632 if (std::get<2>(found_instance->second) == _client) {
1633 found_service->second.erase(_instance);
1634 if (found_service->second.size() == 0) {
1635 local_services_.erase(_service);
1636 }
1637 }
1638 }
1639 }
1640 }
1641
1642 routing_manager_base::stop_offer_service(_client, _service, _instance,
1643 _major, _minor);
1644
1645 /**
1646 * Hold reliable & unreliable server-endpoints from service info
1647 * because if "del_routing_info" is called those entries could be freed
1648 * and we can't be sure this happens synchronous when SD is active.
1649 * After triggering "del_routing_info" this endpoints gets cleanup up
1650 * within this method if they not longer used by any other local service.
1651 */
1652 std::shared_ptr<endpoint> its_reliable_endpoint;
1653 std::shared_ptr<endpoint> its_unreliable_endpoint;
1654 std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
1655 if (its_info) {
1656 its_reliable_endpoint = its_info->get_endpoint(true);
1657 its_unreliable_endpoint = its_info->get_endpoint(false);
1658 static std::atomic<uint32_t> callback_id(0);
1659 const uint32_t its_callback_id = ++callback_id;
1660
1661 struct ready_to_stop_t {
1662 ready_to_stop_t() : reliable_(false), unreliable_(false){}
1663 std::atomic<bool> reliable_;
1664 std::atomic<bool> unreliable_;
1665 };
1666 auto ready_to_stop = std::make_shared<ready_to_stop_t>();
1667 auto ptr = shared_from_this();
1668
1669 auto callback = [&, its_callback_id, ptr, its_info, its_reliable_endpoint, its_unreliable_endpoint,
1670 ready_to_stop, _service, _instance, _major, _minor]
1671 (std::shared_ptr<endpoint> _endpoint, service_t _stopped_service) {
1672 (void)_stopped_service;
1673 if (its_reliable_endpoint && its_reliable_endpoint == _endpoint) {
1674 ready_to_stop->reliable_ = true;
1675 }
1676 if (its_unreliable_endpoint && its_unreliable_endpoint == _endpoint) {
1677 ready_to_stop->unreliable_ = true;
1678 }
1679 if ((its_unreliable_endpoint && !ready_to_stop->unreliable_) ||
1680 (its_reliable_endpoint && !ready_to_stop->reliable_)) {
1681 {
1682 std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1683 if (is_last_stop_callback(its_callback_id)) {
1684 erase_offer_command(_service, _instance);
1685 }
1686 }
1687 return;
1688 }
1689
1690 if (discovery_) {
1691 if (its_info->get_major() == _major && its_info->get_minor() == _minor) {
1692 discovery_->stop_offer_service(its_info);
1693 }
1694 }
1695 del_routing_info(_service, _instance, (its_reliable_endpoint != nullptr),
1696 (its_unreliable_endpoint != nullptr));
1697
1698 for (const auto& ep: {its_reliable_endpoint, its_unreliable_endpoint}) {
1699 if (ep) {
1700 if (ep_mgr_impl_->remove_instance(_service, ep.get())) {
1701 {
1702 std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1703 callback_counts_[its_callback_id]++;
1704 }
1705 // last instance -> pass ANY_INSTANCE and shutdown completely
1706 ep->prepare_stop(
1707 [&, _service, _instance, its_callback_id, ptr, its_reliable_endpoint, its_unreliable_endpoint]
1708 (std::shared_ptr<endpoint> _endpoint,
1709 service_t _stopped_service) {
1710 (void)_stopped_service;
1711 if (ep_mgr_impl_->remove_server_endpoint(
1712 _endpoint->get_local_port(),
1713 _endpoint->is_reliable())) {
1714 _endpoint->stop();
1715 }
1716 {
1717 std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1718 if (is_last_stop_callback(its_callback_id)) {
1719 erase_offer_command(_service, _instance);
1720 }
1721 }
1722 }, ANY_SERVICE);
1723 }
1724 // Clear service info and service group
1725 clear_service_info(_service, _instance, ep->is_reliable());
1726 }
1727 }
1728 {
1729 std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1730 if (is_last_stop_callback(its_callback_id)) {
1731 erase_offer_command(_service, _instance);
1732 }
1733 }
1734 };
1735
1736 // determine callback count
1737 for (const auto& ep : {its_reliable_endpoint, its_unreliable_endpoint}) {
1738 if (ep) {
1739 std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1740 auto found_id = callback_counts_.find(its_callback_id);
1741 if (found_id != callback_counts_.end()) {
1742 found_id->second++;
1743 } else {
1744 callback_counts_[its_callback_id] = 1;
1745 }
1746 }
1747 }
1748 for (const auto& ep : {its_reliable_endpoint, its_unreliable_endpoint}) {
1749 if (ep) {
1750 ep->prepare_stop(callback, _service);
1751 }
1752 }
1753
1754 if (!its_reliable_endpoint && !its_unreliable_endpoint) {
1755 {
1756 std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1757 callback_counts_.erase(its_callback_id);
1758 }
1759 erase_offer_command(_service, _instance);
1760 }
1761
1762 std::set<std::shared_ptr<eventgroupinfo> > its_eventgroup_info_set;
1763 {
1764 std::lock_guard<std::mutex> its_eventgroups_lock(eventgroups_mutex_);
1765 auto find_service = eventgroups_.find(_service);
1766 if (find_service != eventgroups_.end()) {
1767 auto find_instance = find_service->second.find(_instance);
1768 if (find_instance != find_service->second.end()) {
1769 for (auto e : find_instance->second) {
1770 its_eventgroup_info_set.insert(e.second);
1771 }
1772 }
1773 }
1774 }
1775
1776 for (auto e : its_eventgroup_info_set) {
1777 e->clear_remote_subscriptions();
1778 }
1779 } else {
1780 erase_offer_command(_service, _instance);
1781 }
1782 }
1783
deliver_message(const byte_t * _data,length_t _size,instance_t _instance,bool _reliable,client_t _bound_client,credentials_t _credentials,uint8_t _status_check,bool _is_from_remote)1784 bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
1785 instance_t _instance, bool _reliable, client_t _bound_client, credentials_t _credentials,
1786 uint8_t _status_check, bool _is_from_remote) {
1787 bool is_delivered(false);
1788 std::uint32_t its_sender_uid = std::get<0>(_credentials);
1789 std::uint32_t its_sender_gid = std::get<1>(_credentials);
1790
1791 auto its_deserializer = get_deserializer();
1792 its_deserializer->set_data(_data, _size);
1793 std::shared_ptr<message_impl> its_message(its_deserializer->deserialize_message());
1794 its_deserializer->reset();
1795 put_deserializer(its_deserializer);
1796
1797 if (its_message) {
1798 its_message->set_instance(_instance);
1799 its_message->set_reliable(_reliable);
1800 its_message->set_check_result(_status_check);
1801 its_message->set_uid(std::get<0>(_credentials));
1802 its_message->set_gid(std::get<1>(_credentials));
1803
1804 if (!_is_from_remote) {
1805 if (utility::is_notification(its_message->get_message_type())) {
1806 if (!is_response_allowed(_bound_client, its_message->get_service(),
1807 its_message->get_instance(), its_message->get_method())) {
1808 VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1809 << " : routing_manager_impl::deliver_message: "
1810 << std::hex << " received a notification from client 0x" << _bound_client
1811 << " which does not offer service/instance/event "
1812 << its_message->get_service() << "/" << its_message->get_instance()
1813 << "/" << its_message->get_method()
1814 << " ~> Skip message!";
1815 return false;
1816 } else {
1817 if (!security::get()->is_client_allowed(own_uid_, own_gid_,
1818 get_client(), its_message->get_service(),
1819 its_message->get_instance(), its_message->get_method())) {
1820 VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1821 << " : routing_manager_impl::deliver_message: "
1822 << " isn't allowed to receive a notification from service/instance/event "
1823 << its_message->get_service() << "/" << its_message->get_instance()
1824 << "/" << its_message->get_method()
1825 << " respectively from client 0x" << _bound_client
1826 << " ~> Skip message!";
1827 return false;
1828 }
1829 }
1830 } else if (utility::is_request(its_message->get_message_type())) {
1831 if (security::get()->is_enabled()
1832 && its_message->get_client() != _bound_client) {
1833 VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1834 << " : routing_manager_impl::deliver_message:"
1835 << " received a request from client 0x" << std::setw(4) << std::setfill('0')
1836 << its_message->get_client() << " to service/instance/method "
1837 << its_message->get_service() << "/" << its_message->get_instance()
1838 << "/" << its_message->get_method() << " which doesn't match the bound client 0x"
1839 << std::setw(4) << std::setfill('0') << _bound_client
1840 << " ~> Skip message!";
1841 return false;
1842 }
1843
1844 if (!security::get()->is_client_allowed(its_sender_uid, its_sender_gid,
1845 its_message->get_client(), its_message->get_service(),
1846 its_message->get_instance(), its_message->get_method())) {
1847 VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1848 << " : routing_manager_impl::deliver_message: "
1849 << " isn't allowed to send a request to service/instance/method "
1850 << its_message->get_service() << "/" << its_message->get_instance()
1851 << "/" << its_message->get_method()
1852 << " ~> Skip message!";
1853 return false;
1854 }
1855 } else { // response
1856 if (!is_response_allowed(_bound_client, its_message->get_service(),
1857 its_message->get_instance(), its_message->get_method())) {
1858 VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1859 << " : routing_manager_impl::deliver_message: "
1860 << " received a response from client 0x" << _bound_client
1861 << " which does not offer service/instance/method "
1862 << its_message->get_service() << "/" << its_message->get_instance()
1863 << "/" << its_message->get_method()
1864 << " ~> Skip message!";
1865 return false;
1866 } else {
1867 if (!security::get()->is_client_allowed(own_uid_, own_gid_,
1868 get_client(), its_message->get_service(),
1869 its_message->get_instance(), its_message->get_method())) {
1870 VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1871 << " : routing_manager_impl::deliver_message: "
1872 << " isn't allowed to receive a response from service/instance/method "
1873 << its_message->get_service() << "/" << its_message->get_instance()
1874 << "/" << its_message->get_method()
1875 << " respectively from client 0x" << _bound_client
1876 << " ~> Skip message!";
1877 return false;
1878 }
1879 }
1880 }
1881 } else {
1882 if (!security::get()->is_remote_client_allowed()) {
1883 // if the message is from remote, check if
1884 // policy allows remote requests.
1885 VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1886 << " : routing_manager_impl::deliver_message: "
1887 << std::hex << "Remote clients are not allowed"
1888 << " to communicate with service/instance/method "
1889 << its_message->get_service() << "/" << its_message->get_instance()
1890 << "/" << its_message->get_method()
1891 << " respectively with client 0x" << get_client()
1892 << " ~> Skip message!";
1893 return false;
1894 } else if (utility::is_notification(its_message->get_message_type())) {
1895 if (!security::get()->is_client_allowed(own_uid_, own_gid_,
1896 get_client(), its_message->get_service(),
1897 its_message->get_instance(), its_message->get_method())) {
1898 VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1899 << " : routing_manager_impl::deliver_message: "
1900 << " isn't allowed to receive a notification from service/instance/event "
1901 << its_message->get_service() << "/" << its_message->get_instance()
1902 << "/" << its_message->get_method()
1903 << " respectively from remote client"
1904 << " ~> Skip message!";
1905 return false;
1906 }
1907 }
1908 }
1909
1910 host_->on_message(std::move(its_message));
1911 is_delivered = true;
1912 } else {
1913 VSOMEIP_ERROR << "Routing manager: deliver_message: "
1914 << "SomeIP-Header deserialization failed!";
1915 }
1916 return is_delivered;
1917 }
1918
deliver_notification(service_t _service,instance_t _instance,const byte_t * _data,length_t _length,bool _reliable,client_t _bound_client,credentials_t _credentials,uint8_t _status_check,bool _is_from_remote)1919 bool routing_manager_impl::deliver_notification(
1920 service_t _service, instance_t _instance,
1921 const byte_t *_data, length_t _length,
1922 bool _reliable, client_t _bound_client,
1923 credentials_t _credentials,
1924 uint8_t _status_check, bool _is_from_remote) {
1925 event_t its_event_id = VSOMEIP_BYTES_TO_WORD(
1926 _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
1927 client_t its_client_id = VSOMEIP_BYTES_TO_WORD(
1928 _data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]);
1929
1930 std::shared_ptr<event> its_event = find_event(_service, _instance, its_event_id);
1931 if (its_event) {
1932 if (!its_event->is_provided()) {
1933 if (its_event->get_subscribers().size() == 0) {
1934 // no subscribers for this specific event / check subscriptions
1935 // to other events of the event's eventgroups
1936 bool cache_event = false;
1937 for (const auto& eg : its_event->get_eventgroups()) {
1938 std::shared_ptr<eventgroupinfo> egi = find_eventgroup(_service, _instance, eg);
1939 if (egi) {
1940 for (const auto &e : egi->get_events()) {
1941 cache_event = (e->get_subscribers().size() > 0);
1942 if (cache_event) {
1943 break;
1944 }
1945 }
1946 if (cache_event) {
1947 break;
1948 }
1949 }
1950 }
1951 if (!cache_event) {
1952 VSOMEIP_WARNING << __func__ << ": dropping ["
1953 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1954 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
1955 << std::hex << std::setw(4) << std::setfill('0') << its_event_id
1956 << "]. No subscription to corresponding eventgroup.";
1957 return true; // as there is nothing to do
1958 }
1959 }
1960 const uint32_t its_length(utility::get_payload_size(_data, _length));
1961 if (its_length != _length - VSOMEIP_FULL_HEADER_SIZE) {
1962 VSOMEIP_ERROR << "Message length mismatch, dropping message!";
1963 return false;
1964 }
1965 std::shared_ptr<payload> its_payload
1966 = runtime::get()->create_payload(&_data[VSOMEIP_PAYLOAD_POS],
1967 its_length);
1968 if (!its_event->set_payload_dont_notify(its_payload)) {
1969 // do not forward the notification as it was filtered
1970 return false;
1971 }
1972 }
1973
1974 // incoming events statistics
1975 (void) insert_event_statistics(
1976 _service,
1977 _instance,
1978 its_event_id,
1979 utility::get_payload_size(_data, _length));
1980
1981 if (its_event->get_type() != event_type_e::ET_SELECTIVE_EVENT) {
1982 for (const auto& its_local_client : its_event->get_subscribers()) {
1983 if (its_local_client == host_->get_client()) {
1984 deliver_message(_data, _length, _instance, _reliable,
1985 _bound_client, _credentials, _status_check, _is_from_remote);
1986 } else {
1987 std::shared_ptr<endpoint> its_local_target = find_local(its_local_client);
1988 if (its_local_target) {
1989 send_local(its_local_target, VSOMEIP_ROUTING_CLIENT,
1990 _data, _length, _instance, _reliable, VSOMEIP_SEND, _status_check);
1991 }
1992 }
1993 }
1994 } else {
1995 // TODO: Check whether it makes more sense to set the client id
1996 // for internal selective events. This would create some extra
1997 // effort but we could avoid this hack.
1998 if (its_client_id == VSOMEIP_ROUTING_CLIENT)
1999 its_client_id = get_client();
2000
2001 auto its_subscribers = its_event->get_subscribers();
2002 if (its_subscribers.find(its_client_id) != its_subscribers.end()) {
2003 if (its_client_id == host_->get_client()) {
2004 deliver_message(_data, _length, _instance, _reliable,
2005 _bound_client, _credentials, _status_check, _is_from_remote);
2006 } else {
2007 std::shared_ptr<endpoint> its_local_target = find_local(its_client_id);
2008 if (its_local_target) {
2009 send_local(its_local_target, VSOMEIP_ROUTING_CLIENT,
2010 _data, _length, _instance, _reliable, VSOMEIP_SEND, _status_check);
2011 }
2012 }
2013 }
2014 }
2015 } else {
2016 VSOMEIP_WARNING << __func__ << ": Event ["
2017 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2018 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2019 << std::hex << std::setw(4) << std::setfill('0') << its_event_id << "]"
2020 << " is not registered. The message is dropped.";
2021 }
2022
2023 return true;
2024 }
2025
find_eventgroup(service_t _service,instance_t _instance,eventgroup_t _eventgroup) const2026 std::shared_ptr<eventgroupinfo> routing_manager_impl::find_eventgroup(
2027 service_t _service, instance_t _instance,
2028 eventgroup_t _eventgroup) const {
2029 return routing_manager_base::find_eventgroup(_service, _instance, _eventgroup);
2030 }
2031
create_service_discovery_endpoint(const std::string & _address,uint16_t _port,bool _reliable)2032 std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoint(
2033 const std::string &_address, uint16_t _port, bool _reliable) {
2034 std::shared_ptr<endpoint> its_service_endpoint =
2035 ep_mgr_impl_->find_server_endpoint(_port, _reliable);
2036 if (!its_service_endpoint) {
2037 try {
2038 its_service_endpoint =
2039 ep_mgr_impl_->create_server_endpoint(_port,
2040 _reliable, true);
2041
2042 if (its_service_endpoint) {
2043 sd_info_ = std::make_shared<serviceinfo>(
2044 VSOMEIP_SD_SERVICE, VSOMEIP_SD_INSTANCE,
2045 ANY_MAJOR, ANY_MINOR, DEFAULT_TTL,
2046 false); // false, because we do _not_ want to announce it...
2047 sd_info_->set_endpoint(its_service_endpoint, _reliable);
2048 its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE,
2049 _address, _port);
2050 if (!_reliable) {
2051 auto its_udp_server_endpoint_impl = std::dynamic_pointer_cast<
2052 udp_server_endpoint_impl>(its_service_endpoint);
2053 if (its_udp_server_endpoint_impl)
2054 its_udp_server_endpoint_impl->join(_address);
2055 }
2056 } else {
2057 VSOMEIP_ERROR<< "Service Discovery endpoint could not be created. "
2058 "Please check your network configuration.";
2059 }
2060 } catch (const std::exception &e) {
2061 VSOMEIP_ERROR << "Server endpoint creation failed: Service "
2062 "Discovery endpoint could not be created: " << e.what();
2063 }
2064 }
2065 return its_service_endpoint;
2066 }
2067
get_offered_services() const2068 services_t routing_manager_impl::get_offered_services() const {
2069 services_t its_services;
2070 for (const auto& s : get_services()) {
2071 for (const auto& i : s.second) {
2072 if (i.second->is_local()) {
2073 its_services[s.first][i.first] = i.second;
2074 }
2075 }
2076 }
2077 return its_services;
2078 }
2079
get_offered_service(service_t _service,instance_t _instance) const2080 std::shared_ptr<serviceinfo> routing_manager_impl::get_offered_service(
2081 service_t _service, instance_t _instance) const {
2082 std::shared_ptr<serviceinfo> its_info;
2083 its_info = find_service(_service, _instance);
2084 if (its_info && !its_info->is_local()) {
2085 its_info.reset();
2086 }
2087 return its_info;
2088 }
2089
2090 std::map<instance_t, std::shared_ptr<serviceinfo>>
get_offered_service_instances(service_t _service) const2091 routing_manager_impl::get_offered_service_instances(service_t _service) const {
2092 std::map<instance_t, std::shared_ptr<serviceinfo>> its_instances;
2093 const services_t its_services(get_services());
2094 const auto found_service = its_services.find(_service);
2095 if (found_service != its_services.end()) {
2096 for (const auto& i : found_service->second) {
2097 if (i.second->is_local()) {
2098 its_instances[i.first] = i.second;
2099 }
2100 }
2101 }
2102 return its_instances;
2103 }
2104
2105 ///////////////////////////////////////////////////////////////////////////////
2106 // PRIVATE
2107 ///////////////////////////////////////////////////////////////////////////////
init_service_info(service_t _service,instance_t _instance,bool _is_local_service)2108 void routing_manager_impl::init_service_info(
2109 service_t _service, instance_t _instance, bool _is_local_service) {
2110 std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
2111 if (!its_info) {
2112 VSOMEIP_ERROR << "routing_manager_impl::init_service_info: couldn't "
2113 "find serviceinfo for service: ["
2114 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2115 << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"
2116 << " is_local_service=" << _is_local_service;
2117 return;
2118 }
2119 if (configuration_) {
2120 // Create server endpoints for local services only
2121 if (_is_local_service) {
2122 const bool is_someip = configuration_->is_someip(_service, _instance);
2123 uint16_t its_reliable_port = configuration_->get_reliable_port(
2124 _service, _instance);
2125 bool _is_found(false);
2126 if (ILLEGAL_PORT != its_reliable_port) {
2127 std::shared_ptr<endpoint> its_reliable_endpoint =
2128 ep_mgr_impl_->find_or_create_server_endpoint(
2129 its_reliable_port, true, is_someip, _service,
2130 _instance, _is_found);
2131 if (its_reliable_endpoint) {
2132 its_info->set_endpoint(its_reliable_endpoint, true);
2133 }
2134 }
2135 uint16_t its_unreliable_port = configuration_->get_unreliable_port(
2136 _service, _instance);
2137 if (ILLEGAL_PORT != its_unreliable_port) {
2138 std::shared_ptr<endpoint> its_unreliable_endpoint =
2139 ep_mgr_impl_->find_or_create_server_endpoint(
2140 its_unreliable_port, false, is_someip, _service,
2141 _instance, _is_found);
2142 if (its_unreliable_endpoint) {
2143 its_info->set_endpoint(its_unreliable_endpoint, false);
2144 }
2145 }
2146
2147 if (ILLEGAL_PORT == its_reliable_port
2148 && ILLEGAL_PORT == its_unreliable_port) {
2149 VSOMEIP_INFO << "Port configuration missing for ["
2150 << std::hex << _service << "." << _instance
2151 << "]. Service is internal.";
2152 }
2153 }
2154 } else {
2155 VSOMEIP_ERROR << "Missing vsomeip configuration.";
2156 }
2157 }
2158
remove_local(client_t _client,bool _remove_uid)2159 void routing_manager_impl::remove_local(client_t _client, bool _remove_uid) {
2160 auto clients_subscriptions = get_subscriptions(_client);
2161 {
2162 std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
2163 for (const auto& s : clients_subscriptions) {
2164 remote_subscription_state_.erase(std::tuple_cat(s, std::make_tuple(_client)));
2165 }
2166 }
2167 routing_manager_base::remove_local(_client, clients_subscriptions, _remove_uid);
2168
2169 std::forward_list<std::pair<service_t, instance_t>> services_to_release_;
2170 {
2171 std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
2172 auto its_client = requested_services_.find(_client);
2173 if (its_client != requested_services_.end()) {
2174 for (const auto& its_service : its_client->second) {
2175 for (const auto& its_instance : its_service.second) {
2176 services_to_release_.push_front(
2177 { its_service.first, its_instance.first });
2178 }
2179 }
2180 }
2181 }
2182 for (const auto &s : services_to_release_) {
2183 release_service(_client, s.first, s.second);
2184 }
2185 }
2186
is_field(service_t _service,instance_t _instance,event_t _event) const2187 bool routing_manager_impl::is_field(service_t _service, instance_t _instance,
2188 event_t _event) const {
2189 std::lock_guard<std::mutex> its_lock(events_mutex_);
2190 auto find_service = events_.find(_service);
2191 if (find_service != events_.end()) {
2192 auto find_instance = find_service->second.find(_instance);
2193 if (find_instance != find_service->second.end()) {
2194 auto find_event = find_instance->second.find(_event);
2195 if (find_event != find_instance->second.end())
2196 return find_event->second->is_field();
2197 }
2198 }
2199 return false;
2200 }
2201
2202 //only called from the SD
add_routing_info(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,ttl_t _ttl,const boost::asio::ip::address & _reliable_address,uint16_t _reliable_port,const boost::asio::ip::address & _unreliable_address,uint16_t _unreliable_port)2203 void routing_manager_impl::add_routing_info(
2204 service_t _service, instance_t _instance,
2205 major_version_t _major, minor_version_t _minor, ttl_t _ttl,
2206 const boost::asio::ip::address &_reliable_address,
2207 uint16_t _reliable_port,
2208 const boost::asio::ip::address &_unreliable_address,
2209 uint16_t _unreliable_port) {
2210
2211 std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
2212 if (routing_state_ == routing_state_e::RS_SUSPENDED) {
2213 VSOMEIP_INFO << "rmi::" << __func__ << " We are suspened --> do nothing.";
2214 return;
2215 }
2216
2217 // Create/Update service info
2218 std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
2219 if (!its_info) {
2220 boost::asio::ip::address its_unicast_address
2221 = configuration_->get_unicast_address();
2222 bool is_local(false);
2223 if (_reliable_port != ILLEGAL_PORT
2224 && its_unicast_address == _reliable_address)
2225 is_local = true;
2226 else if (_unreliable_port != ILLEGAL_PORT
2227 && its_unicast_address == _unreliable_address)
2228 is_local = true;
2229
2230 its_info = create_service_info(_service, _instance, _major, _minor, _ttl, is_local);
2231 init_service_info(_service, _instance, is_local);
2232 } else if (its_info->is_local()) {
2233 // We received a service info for a service which is already offered locally
2234 VSOMEIP_ERROR << "routing_manager_impl::add_routing_info: "
2235 << "rejecting routing info. Remote: "
2236 << ((_reliable_port != ILLEGAL_PORT) ? _reliable_address.to_string()
2237 : _unreliable_address.to_string()) << " is trying to offer ["
2238 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
2239 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
2240 << std::dec << static_cast<std::uint32_t>(_major) << "."
2241 << std::dec << _minor
2242 << "] on port " << ((_reliable_port != ILLEGAL_PORT) ? _reliable_port
2243 : _unreliable_port) << " offered previously on this node: ["
2244 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
2245 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
2246 << std::dec << static_cast<std::uint32_t>(its_info->get_major())
2247 << "." << its_info->get_minor() << "]";
2248 return;
2249 } else {
2250 its_info->set_ttl(_ttl);
2251 }
2252
2253 // Check whether remote services are unchanged
2254 bool is_reliable_known(false);
2255 bool is_unreliable_known(false);
2256 ep_mgr_impl_->is_remote_service_known(_service, _instance, _major,
2257 _minor, _reliable_address, _reliable_port, &is_reliable_known,
2258 _unreliable_address, _unreliable_port, &is_unreliable_known);
2259
2260 bool udp_inserted(false);
2261 bool tcp_inserted(false);
2262 // Add endpoint(s) if necessary
2263 if (_reliable_port != ILLEGAL_PORT && !is_reliable_known) {
2264 std::shared_ptr<endpoint_definition> endpoint_def_tcp
2265 = endpoint_definition::get(_reliable_address, _reliable_port, true, _service, _instance);
2266 if (_unreliable_port != ILLEGAL_PORT && !is_unreliable_known) {
2267 std::shared_ptr<endpoint_definition> endpoint_def_udp
2268 = endpoint_definition::get(_unreliable_address, _unreliable_port, false, _service, _instance);
2269 ep_mgr_impl_->add_remote_service_info(_service, _instance,
2270 endpoint_def_tcp, endpoint_def_udp);
2271 udp_inserted = true;
2272 tcp_inserted = true;
2273 } else {
2274 ep_mgr_impl_->add_remote_service_info(_service, _instance,
2275 endpoint_def_tcp);
2276 tcp_inserted = true;
2277 }
2278
2279 // check if service was requested and establish TCP connection if necessary
2280 {
2281 bool connected(false);
2282 std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
2283 for(const auto &client_id : requested_services_) {
2284 auto found_service = client_id.second.find(_service);
2285 if (found_service != client_id.second.end()) {
2286 auto found_instance = found_service->second.find(_instance);
2287 if (found_instance != found_service->second.end()) {
2288 for (const auto &major_minor_pair : found_instance->second) {
2289 if ((major_minor_pair.first == _major
2290 || _major == DEFAULT_MAJOR
2291 || major_minor_pair.first == ANY_MAJOR)
2292 && (major_minor_pair.second <= _minor
2293 || _minor == DEFAULT_MINOR
2294 || major_minor_pair.second == ANY_MINOR)) {
2295 // SWS_SD_00376 establish TCP connection to service
2296 // service is marked as available later in on_connect()
2297 if(!connected) {
2298 if (udp_inserted) {
2299 // atomically create reliable and unreliable endpoint
2300 ep_mgr_impl_->find_or_create_remote_client(
2301 _service, _instance);
2302 } else {
2303 ep_mgr_impl_->find_or_create_remote_client(
2304 _service, _instance, true);
2305 }
2306 connected = true;
2307 }
2308 its_info->add_client(client_id.first);
2309 break;
2310 }
2311 }
2312 }
2313 }
2314 }
2315 }
2316 } else if (_reliable_port != ILLEGAL_PORT && is_reliable_known) {
2317 std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
2318 bool connected(false);
2319 for(const auto &client_id : requested_services_) {
2320 auto found_service = client_id.second.find(_service);
2321 if (found_service != client_id.second.end()) {
2322 auto found_instance = found_service->second.find(_instance);
2323 if (found_instance != found_service->second.end()) {
2324 for (const auto &major_minor_pair : found_instance->second) {
2325 if ((major_minor_pair.first == _major
2326 || _major == DEFAULT_MAJOR
2327 || major_minor_pair.first == ANY_MAJOR)
2328 && (major_minor_pair.second <= _minor
2329 || _minor == DEFAULT_MINOR
2330 || major_minor_pair.second == ANY_MINOR)) {
2331 std::shared_ptr<endpoint> ep = its_info->get_endpoint(true);
2332 if (ep) {
2333 if (ep->is_established() &&
2334 !stub_->contained_in_routing_info(
2335 VSOMEIP_ROUTING_CLIENT, _service, _instance,
2336 its_info->get_major(),
2337 its_info->get_minor())) {
2338 on_availability(_service, _instance,
2339 true, its_info->get_major(), its_info->get_minor());
2340 stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT,
2341 _service, _instance,
2342 its_info->get_major(),
2343 its_info->get_minor());
2344 if (discovery_) {
2345 discovery_->on_endpoint_connected(
2346 _service, _instance, ep);
2347 }
2348 }
2349 } else {
2350 // no endpoint yet, but requested -> create one
2351
2352 // SWS_SD_00376 establish TCP connection to service
2353 // service is marked as available later in on_connect()
2354 if (!connected) {
2355 ep_mgr_impl_->find_or_create_remote_client(
2356 _service, _instance, true);
2357 connected = true;
2358 }
2359 its_info->add_client(client_id.first);
2360 }
2361 break;
2362 }
2363 }
2364 }
2365 }
2366 }
2367 }
2368
2369 if (_unreliable_port != ILLEGAL_PORT && !is_unreliable_known) {
2370 if (!udp_inserted) {
2371 std::shared_ptr<endpoint_definition> endpoint_def
2372 = endpoint_definition::get(_unreliable_address, _unreliable_port, false, _service, _instance);
2373 ep_mgr_impl_->add_remote_service_info(_service, _instance, endpoint_def);
2374 // check if service was requested and increase requester count if necessary
2375 {
2376 bool connected(false);
2377 std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
2378 for (const auto &client_id : requested_services_) {
2379 const auto found_service = client_id.second.find(_service);
2380 if (found_service != client_id.second.end()) {
2381 const auto found_instance = found_service->second.find(
2382 _instance);
2383 if (found_instance != found_service->second.end()) {
2384 for (const auto &major_minor_pair : found_instance->second) {
2385 if ((major_minor_pair.first == _major
2386 || _major == DEFAULT_MAJOR
2387 || major_minor_pair.first == ANY_MAJOR)
2388 && (major_minor_pair.second <= _minor
2389 || _minor == DEFAULT_MINOR
2390 || major_minor_pair.second
2391 == ANY_MINOR)) {
2392 if(!connected) {
2393 ep_mgr_impl_->find_or_create_remote_client(_service, _instance,
2394 false);
2395 connected = true;
2396 }
2397 its_info->add_client(client_id.first);
2398 break;
2399 }
2400 }
2401 }
2402 }
2403 }
2404 }
2405 }
2406 if (!is_reliable_known && !tcp_inserted) {
2407 // UDP only service can be marked as available instantly
2408 on_availability(_service, _instance, true, _major, _minor);
2409 stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, _major, _minor);
2410 }
2411 if (discovery_) {
2412 std::shared_ptr<endpoint> ep = its_info->get_endpoint(false);
2413 if (ep && ep->is_established()) {
2414 discovery_->on_endpoint_connected(_service, _instance, ep);
2415 }
2416 }
2417 } else if (_unreliable_port != ILLEGAL_PORT && is_unreliable_known) {
2418 std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
2419 for(const auto &client_id : requested_services_) {
2420 auto found_service = client_id.second.find(_service);
2421 if (found_service != client_id.second.end()) {
2422 auto found_instance = found_service->second.find(_instance);
2423 if (found_instance != found_service->second.end()) {
2424 for (const auto &major_minor_pair : found_instance->second) {
2425 if ((major_minor_pair.first == _major
2426 || _major == DEFAULT_MAJOR
2427 || major_minor_pair.first == ANY_MAJOR)
2428 && (major_minor_pair.second <= _minor
2429 || _minor == DEFAULT_MINOR
2430 || major_minor_pair.second == ANY_MINOR)) {
2431 if (_reliable_port == ILLEGAL_PORT && !is_reliable_known &&
2432 !stub_->contained_in_routing_info(
2433 VSOMEIP_ROUTING_CLIENT, _service, _instance,
2434 its_info->get_major(),
2435 its_info->get_minor())) {
2436 on_availability(_service, _instance,
2437 true, its_info->get_major(), its_info->get_minor());
2438 stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT,
2439 _service, _instance,
2440 its_info->get_major(),
2441 its_info->get_minor());
2442 if (discovery_) {
2443 std::shared_ptr<endpoint> ep = its_info->get_endpoint(false);
2444 if (ep && ep->is_established()) {
2445 discovery_->on_endpoint_connected(
2446 _service, _instance,
2447 ep);
2448 }
2449 }
2450 }
2451 break;
2452 }
2453 }
2454 }
2455 }
2456 }
2457 }
2458 }
2459
del_routing_info(service_t _service,instance_t _instance,bool _has_reliable,bool _has_unreliable)2460 void routing_manager_impl::del_routing_info(service_t _service, instance_t _instance,
2461 bool _has_reliable, bool _has_unreliable) {
2462
2463 std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
2464 if(!its_info)
2465 return;
2466
2467 on_availability(_service, _instance, false,
2468 its_info->get_major(), its_info->get_minor());
2469 stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance,
2470 its_info->get_major(), its_info->get_minor());
2471 // Implicit unsubscribe
2472
2473 std::vector<std::shared_ptr<event>> its_events;
2474 {
2475 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
2476 auto found_service = eventgroups_.find(_service);
2477 if (found_service != eventgroups_.end()) {
2478 auto found_instance = found_service->second.find(_instance);
2479 if (found_instance != found_service->second.end()) {
2480 for (auto &its_eventgroup : found_instance->second) {
2481 // As the service is gone, all subscriptions to its events
2482 // do no longer exist and the last received payload is no
2483 // longer valid.
2484 for (auto &its_event : its_eventgroup.second->get_events()) {
2485 const auto& its_subscribers = its_event->get_subscribers();
2486 for (const auto its_subscriber : its_subscribers) {
2487 if (its_subscriber != get_client()) {
2488 its_event->remove_subscriber(
2489 its_eventgroup.first, its_subscriber);
2490 }
2491 }
2492 its_events.push_back(its_event);
2493 }
2494 }
2495 }
2496 }
2497 }
2498 for (const auto& e : its_events) {
2499 e->unset_payload(true);
2500 }
2501
2502 {
2503 std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
2504 std::set<std::tuple<
2505 service_t, instance_t, eventgroup_t, client_t> > its_invalid;
2506
2507 for (const auto &its_state : remote_subscription_state_) {
2508 if (std::get<0>(its_state.first) == _service
2509 && std::get<1>(its_state.first) == _instance) {
2510 its_invalid.insert(its_state.first);
2511 }
2512 }
2513
2514 for (const auto &its_key : its_invalid)
2515 remote_subscription_state_.erase(its_key);
2516 }
2517
2518 {
2519 std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
2520 auto found_service = remote_subscribers_.find(_service);
2521 if (found_service != remote_subscribers_.end()) {
2522 if (found_service->second.erase(_instance) > 0 &&
2523 !found_service->second.size()) {
2524 remote_subscribers_.erase(found_service);
2525 }
2526 }
2527 }
2528
2529 if (_has_reliable) {
2530 ep_mgr_impl_->clear_client_endpoints(_service, _instance, true);
2531 ep_mgr_impl_->clear_remote_service_info(_service, _instance, true);
2532 }
2533 if (_has_unreliable) {
2534 ep_mgr_impl_->clear_client_endpoints(_service, _instance, false);
2535 ep_mgr_impl_->clear_remote_service_info(_service, _instance, false);
2536 }
2537
2538 ep_mgr_impl_->clear_multicast_endpoints(_service, _instance);
2539
2540 if (_has_reliable)
2541 clear_service_info(_service, _instance, true);
2542 if (_has_unreliable)
2543 clear_service_info(_service, _instance, false);
2544
2545 // For expired services using only unreliable endpoints that have never been created before
2546 if (!_has_reliable && !_has_unreliable) {
2547 ep_mgr_impl_->clear_remote_service_info(_service, _instance, true);
2548 ep_mgr_impl_->clear_remote_service_info(_service, _instance, false);
2549 clear_service_info(_service, _instance, true);
2550 clear_service_info(_service, _instance, false);
2551 }
2552 }
2553
update_routing_info(std::chrono::milliseconds _elapsed)2554 void routing_manager_impl::update_routing_info(std::chrono::milliseconds _elapsed) {
2555 std::map<service_t, std::vector<instance_t> > its_expired_offers;
2556
2557 {
2558 std::lock_guard<std::mutex> its_lock(services_remote_mutex_);
2559 for (const auto &s : services_remote_) {
2560 for (const auto &i : s.second) {
2561 ttl_t its_ttl = i.second->get_ttl();
2562 if (its_ttl < DEFAULT_TTL) { // do not touch "forever"
2563 std::chrono::milliseconds precise_ttl = i.second->get_precise_ttl();
2564 if (precise_ttl.count() < _elapsed.count() || precise_ttl.count() == 0) {
2565 i.second->set_ttl(0);
2566 its_expired_offers[s.first].push_back(i.first);
2567 } else {
2568 std::chrono::milliseconds its_new_ttl(precise_ttl - _elapsed);
2569 i.second->set_precise_ttl(its_new_ttl);
2570 }
2571 }
2572 }
2573 }
2574 }
2575
2576 for (const auto &s : its_expired_offers) {
2577 for (const auto &i : s.second) {
2578 if (discovery_) {
2579 discovery_->unsubscribe_all(s.first, i);
2580 }
2581 del_routing_info(s.first, i, true, true);
2582 VSOMEIP_INFO << "update_routing_info: elapsed=" << _elapsed.count()
2583 << " : delete service/instance "
2584 << std::hex << std::setw(4) << std::setfill('0') << s.first
2585 << "." << std::hex << std::setw(4) << std::setfill('0') << i;
2586 }
2587 }
2588 }
2589
expire_services(const boost::asio::ip::address & _address)2590 void routing_manager_impl::expire_services(
2591 const boost::asio::ip::address &_address) {
2592 expire_services(_address, configuration::port_range_t(ANY_PORT, ANY_PORT),
2593 false);
2594 }
2595
expire_services(const boost::asio::ip::address & _address,std::uint16_t _port,bool _reliable)2596 void routing_manager_impl::expire_services(
2597 const boost::asio::ip::address &_address, std::uint16_t _port,
2598 bool _reliable) {
2599 expire_services(_address, configuration::port_range_t(_port, _port),
2600 _reliable);
2601 }
2602
expire_services(const boost::asio::ip::address & _address,const configuration::port_range_t & _range,bool _reliable)2603 void routing_manager_impl::expire_services(
2604 const boost::asio::ip::address &_address,
2605 const configuration::port_range_t& _range, bool _reliable) {
2606 std::map<service_t, std::vector<instance_t> > its_expired_offers;
2607
2608 const bool expire_all = (_range.first == ANY_PORT
2609 && _range.second == ANY_PORT);
2610
2611 for (auto &s : get_services_remote()) {
2612 for (auto &i : s.second) {
2613 boost::asio::ip::address its_address;
2614 std::shared_ptr<client_endpoint> its_client_endpoint =
2615 std::dynamic_pointer_cast<client_endpoint>(
2616 i.second->get_endpoint(_reliable));
2617 if (!its_client_endpoint && expire_all) {
2618 its_client_endpoint = std::dynamic_pointer_cast<client_endpoint>(
2619 i.second->get_endpoint(!_reliable));
2620 }
2621 if (its_client_endpoint) {
2622 if ((expire_all || (its_client_endpoint->get_remote_port() >= _range.first
2623 && its_client_endpoint->get_remote_port() <= _range.second))
2624 && its_client_endpoint->get_remote_address(its_address)
2625 && its_address == _address) {
2626 if (discovery_) {
2627 discovery_->unsubscribe_all(s.first, i.first);
2628 }
2629 its_expired_offers[s.first].push_back(i.first);
2630 }
2631 }
2632 }
2633 }
2634
2635 for (auto &s : its_expired_offers) {
2636 for (auto &i : s.second) {
2637 VSOMEIP_INFO << "expire_services for address: " << _address
2638 << " : delete service/instance "
2639 << std::hex << std::setw(4) << std::setfill('0') << s.first
2640 << "." << std::hex << std::setw(4) << std::setfill('0') << i
2641 << " port [" << std::dec << _range.first << "," << _range.second
2642 << "] reliability=" << std::boolalpha << _reliable;
2643 del_routing_info(s.first, i, true, true);
2644 }
2645 }
2646 }
2647
2648 void
expire_subscriptions(const boost::asio::ip::address & _address)2649 routing_manager_impl::expire_subscriptions(
2650 const boost::asio::ip::address &_address) {
2651 expire_subscriptions(_address,
2652 configuration::port_range_t(ANY_PORT, ANY_PORT), false);
2653 }
2654
2655 void
expire_subscriptions(const boost::asio::ip::address & _address,std::uint16_t _port,bool _reliable)2656 routing_manager_impl::expire_subscriptions(
2657 const boost::asio::ip::address &_address, std::uint16_t _port,
2658 bool _reliable) {
2659 expire_subscriptions(_address, configuration::port_range_t(_port, _port),
2660 _reliable);
2661 }
2662
2663 void
expire_subscriptions(const boost::asio::ip::address & _address,const configuration::port_range_t & _range,bool _reliable)2664 routing_manager_impl::expire_subscriptions(
2665 const boost::asio::ip::address &_address,
2666 const configuration::port_range_t& _range, bool _reliable) {
2667 const bool expire_all = (_range.first == ANY_PORT
2668 && _range.second == ANY_PORT);
2669
2670 std::map<service_t,
2671 std::map<instance_t,
2672 std::map<eventgroup_t,
2673 std::shared_ptr<eventgroupinfo> > > >its_eventgroups;
2674 {
2675 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
2676 its_eventgroups = eventgroups_;
2677 }
2678 for (const auto &its_service : its_eventgroups) {
2679 for (const auto &its_instance : its_service.second) {
2680 for (const auto &its_eventgroup : its_instance.second) {
2681 const auto its_info = its_eventgroup.second;
2682 for (auto its_subscription
2683 : its_info->get_remote_subscriptions()) {
2684 // Note: get_remote_subscription delivers a copied
2685 // set of subscriptions. Thus, its is possible to
2686 // to remove them within the loop.
2687 auto its_ep_definition = (_reliable) ?
2688 its_subscription->get_reliable() :
2689 its_subscription->get_unreliable();
2690
2691 if (!its_ep_definition && expire_all)
2692 its_ep_definition = (!_reliable) ?
2693 its_subscription->get_reliable() :
2694 its_subscription->get_unreliable();
2695
2696 if (its_ep_definition
2697 && its_ep_definition->get_address() == _address
2698 && (expire_all ||
2699 (its_ep_definition->get_remote_port() >= _range.first
2700 && its_ep_definition->get_remote_port() <= _range.second))) {
2701
2702 // TODO: Check whether subscriptions to different hosts are valid.
2703 // IF yes, we probably need to simply reset the corresponding
2704 // endpoint instead of removing the subscription...
2705 VSOMEIP_INFO << __func__
2706 << ": removing subscription to "
2707 << std::hex << its_info->get_service() << "."
2708 << std::hex << its_info->get_instance() << "."
2709 << std::hex << its_info->get_eventgroup()
2710 << " from target "
2711 << its_ep_definition->get_address() << ":"
2712 << std::dec << its_ep_definition->get_port()
2713 << " reliable="
2714 << std::boolalpha << its_ep_definition->is_reliable();
2715 if (expire_all) {
2716 its_ep_definition = (!its_ep_definition->is_reliable()) ?
2717 its_subscription->get_reliable() :
2718 its_subscription->get_unreliable();
2719 if (its_ep_definition) {
2720 VSOMEIP_INFO << __func__
2721 << ": removing subscription to "
2722 << std::hex << its_info->get_service() << "."
2723 << std::hex << its_info->get_instance() << "."
2724 << std::hex << its_info->get_eventgroup()
2725 << " from target "
2726 << its_ep_definition->get_address() << ":"
2727 << std::dec << its_ep_definition->get_port()
2728 << " reliable="
2729 << std::boolalpha << its_ep_definition->is_reliable();
2730 }
2731 }
2732 on_remote_unsubscribe(its_subscription);
2733 }
2734 }
2735 }
2736 }
2737 }
2738 }
2739
init_routing_info()2740 void routing_manager_impl::init_routing_info() {
2741 VSOMEIP_INFO<< "Service Discovery disabled. Using static routing information.";
2742 for (auto i : configuration_->get_remote_services()) {
2743 boost::asio::ip::address its_address(
2744 boost::asio::ip::address::from_string(
2745 configuration_->get_unicast_address(i.first, i.second)));
2746 uint16_t its_reliable_port
2747 = configuration_->get_reliable_port(i.first, i.second);
2748 uint16_t its_unreliable_port
2749 = configuration_->get_unreliable_port(i.first, i.second);
2750 major_version_t its_major
2751 = configuration_->get_major_version(i.first, i.second);
2752 minor_version_t its_minor
2753 = configuration_->get_minor_version(i.first, i.second);
2754 ttl_t its_ttl
2755 = configuration_->get_ttl(i.first, i.second);
2756
2757 if (its_reliable_port != ILLEGAL_PORT
2758 || its_unreliable_port != ILLEGAL_PORT) {
2759
2760 VSOMEIP_INFO << "Adding static remote service ["
2761 << std::hex << std::setw(4) << std::setfill('0')
2762 << i.first << "." << i.second
2763 << std::dec << ":" << +its_major << "." << its_minor
2764 << "]";
2765
2766 add_routing_info(i.first, i.second,
2767 its_major, its_minor, its_ttl,
2768 its_address, its_reliable_port,
2769 its_address, its_unreliable_port);
2770
2771 if(its_reliable_port != ILLEGAL_PORT) {
2772 ep_mgr_impl_->find_or_create_remote_client(
2773 i.first, i.second, true);
2774 }
2775 if(its_unreliable_port != ILLEGAL_PORT) {
2776 ep_mgr_impl_->find_or_create_remote_client(
2777 i.first, i.second, false);
2778 }
2779 }
2780 }
2781 }
2782
on_remote_subscribe(std::shared_ptr<remote_subscription> & _subscription,const remote_subscription_callback_t & _callback)2783 void routing_manager_impl::on_remote_subscribe(
2784 std::shared_ptr<remote_subscription> &_subscription,
2785 const remote_subscription_callback_t &_callback) {
2786 auto its_eventgroupinfo = _subscription->get_eventgroupinfo();
2787 if (!its_eventgroupinfo) {
2788 VSOMEIP_ERROR << __func__ << " eventgroupinfo is invalid";
2789 return;
2790 }
2791 const ttl_t its_ttl = _subscription->get_ttl();
2792
2793 const auto its_service = its_eventgroupinfo->get_service();
2794 const auto its_instance = its_eventgroupinfo->get_instance();
2795 const auto its_eventgroup = its_eventgroupinfo->get_eventgroup();
2796 const auto its_major = its_eventgroupinfo->get_major();
2797
2798 // Get remote port(s)
2799 auto its_reliable = _subscription->get_reliable();
2800 if (its_reliable) {
2801 uint16_t its_port
2802 = configuration_->get_reliable_port(its_service, its_instance);
2803 its_reliable->set_remote_port(its_port);
2804 }
2805
2806 auto its_unreliable = _subscription->get_unreliable();
2807 if (its_unreliable) {
2808 uint16_t its_port
2809 = configuration_->get_unreliable_port(its_service, its_instance);
2810 its_unreliable->set_remote_port(its_port);
2811 }
2812
2813 // Calculate expiration time
2814 const std::chrono::steady_clock::time_point its_expiration
2815 = std::chrono::steady_clock::now() + std::chrono::seconds(its_ttl);
2816
2817 // Try to update the subscription. This will fail, if the subscription does
2818 // not exist or is still (partly) pending.
2819 remote_subscription_id_t its_id;
2820 std::set<client_t> its_added;
2821 update_remote_subscription_mutex_.lock();
2822 auto its_result = its_eventgroupinfo->update_remote_subscription(
2823 _subscription, its_expiration, its_added, its_id, true);
2824 if (its_result) {
2825 if (!_subscription->is_pending()) { // resubscription without change
2826 update_remote_subscription_mutex_.unlock();
2827 _callback(_subscription);
2828 } else if (!its_added.empty()) { // new clients for a selective subscription
2829 const client_t its_offering_client
2830 = find_local_client(its_service, its_instance);
2831 send_subscription(its_offering_client,
2832 its_service, its_instance, its_eventgroup, its_major,
2833 its_added, _subscription->get_id());
2834 update_remote_subscription_mutex_.unlock();
2835 } else { // identical subscription is not yet processed
2836 std::stringstream its_warning;
2837 its_warning << __func__ << " a remote subscription is already pending ["
2838 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
2839 << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
2840 << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]"
2841 << " from ";
2842 if (its_reliable && its_unreliable)
2843 its_warning << "[";
2844 if (its_reliable)
2845 its_warning << its_reliable->get_address().to_string()
2846 << ":" << std::dec << its_reliable->get_port();
2847 if (its_reliable && its_unreliable)
2848 its_warning << ", ";
2849 if (its_unreliable)
2850 its_warning << its_unreliable->get_address().to_string()
2851 << ":" << std::dec << its_unreliable->get_port();
2852 if (its_reliable && its_unreliable)
2853 its_warning << "]";
2854 VSOMEIP_WARNING << its_warning.str();
2855
2856 update_remote_subscription_mutex_.unlock();
2857 _callback(_subscription);
2858 }
2859 } else { // new subscription
2860 if (its_eventgroupinfo->is_remote_subscription_limit_reached(
2861 _subscription)) {
2862 _subscription->set_all_client_states(
2863 remote_subscription_state_e::SUBSCRIPTION_NACKED);
2864
2865 update_remote_subscription_mutex_.unlock();
2866 _callback(_subscription);
2867 return;
2868 }
2869
2870 auto its_id
2871 = its_eventgroupinfo->add_remote_subscription(_subscription);
2872
2873 const client_t its_offering_client
2874 = find_local_client(its_service, its_instance);
2875 send_subscription(its_offering_client,
2876 its_service, its_instance, its_eventgroup, its_major,
2877 _subscription->get_clients(), its_id);
2878 update_remote_subscription_mutex_.unlock();
2879 }
2880 }
2881
on_remote_unsubscribe(std::shared_ptr<remote_subscription> & _subscription)2882 void routing_manager_impl::on_remote_unsubscribe(
2883 std::shared_ptr<remote_subscription> &_subscription) {
2884 std::shared_ptr<eventgroupinfo> its_info
2885 = _subscription->get_eventgroupinfo();
2886 if (!its_info) {
2887 VSOMEIP_ERROR << __func__
2888 << ": Received Unsubscribe for unregistered eventgroup.";
2889 return;
2890 }
2891
2892 const auto its_service = its_info->get_service();
2893 const auto its_instance = its_info->get_instance();
2894 const auto its_eventgroup = its_info->get_eventgroup();
2895 const auto its_major = its_info->get_major();
2896
2897 // Get remote port(s)
2898 auto its_reliable = _subscription->get_reliable();
2899 if (its_reliable) {
2900 uint16_t its_port
2901 = configuration_->get_reliable_port(its_service, its_instance);
2902 its_reliable->set_remote_port(its_port);
2903 }
2904
2905 auto its_unreliable = _subscription->get_unreliable();
2906 if (its_unreliable) {
2907 uint16_t its_port
2908 = configuration_->get_unreliable_port(its_service, its_instance);
2909 its_unreliable->set_remote_port(its_port);
2910 }
2911
2912 remote_subscription_id_t its_id(0);
2913 std::set<client_t> its_removed;
2914 update_remote_subscription_mutex_.lock();
2915 auto its_result = its_info->update_remote_subscription(
2916 _subscription, std::chrono::steady_clock::now(),
2917 its_removed, its_id, false);
2918
2919 if (its_result) {
2920 const client_t its_offering_client
2921 = find_local_client(its_service, its_instance);
2922 send_unsubscription(its_offering_client,
2923 its_service, its_instance, its_eventgroup, its_major,
2924 its_removed, its_id);
2925 }
2926
2927 update_remote_subscription_mutex_.unlock();
2928 }
2929
on_subscribe_ack_with_multicast(service_t _service,instance_t _instance,const boost::asio::ip::address & _sender,const boost::asio::ip::address & _address,uint16_t _port)2930 void routing_manager_impl::on_subscribe_ack_with_multicast(
2931 service_t _service, instance_t _instance,
2932 const boost::asio::ip::address &_sender,
2933 const boost::asio::ip::address &_address, uint16_t _port) {
2934 ep_mgr_impl_->find_or_create_multicast_endpoint(_service,
2935 _instance, _sender, _address, _port);
2936 }
2937
on_subscribe_ack(client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,remote_subscription_id_t _id)2938 void routing_manager_impl::on_subscribe_ack(client_t _client,
2939 service_t _service, instance_t _instance, eventgroup_t _eventgroup,
2940 event_t _event, remote_subscription_id_t _id) {
2941 std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
2942 auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
2943 if (its_eventgroup) {
2944 auto its_subscription = its_eventgroup->get_remote_subscription(_id);
2945 if (its_subscription) {
2946 its_subscription->set_client_state(_client,
2947 remote_subscription_state_e::SUBSCRIPTION_ACKED);
2948
2949 auto its_parent = its_subscription->get_parent();
2950 if (its_parent) {
2951 its_parent->set_client_state(_client,
2952 remote_subscription_state_e::SUBSCRIPTION_ACKED);
2953 if (!its_subscription->is_pending()) {
2954 its_eventgroup->remove_remote_subscription(_id);
2955 }
2956 }
2957
2958 if (discovery_) {
2959 std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
2960 remote_subscribers_[_service][_instance][VSOMEIP_ROUTING_CLIENT].insert(
2961 its_subscription->get_subscriber());
2962 discovery_->update_remote_subscription(its_subscription);
2963
2964 VSOMEIP_INFO << "REMOTE SUBSCRIBE("
2965 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
2966 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2967 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2968 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
2969 << " from " << its_subscription->get_subscriber()->get_address()
2970 << ":" << std::dec << its_subscription->get_subscriber()->get_port()
2971 << (its_subscription->get_subscriber()->is_reliable() ? " reliable" : " unreliable")
2972 << " was accepted";
2973
2974 return;
2975 }
2976 } else {
2977 const auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, _client);
2978 const auto its_state = remote_subscription_state_.find(its_tuple);
2979 if (its_state != remote_subscription_state_.end()) {
2980 if (its_state->second == subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED) {
2981 // Already notified!
2982 return;
2983 }
2984 }
2985 remote_subscription_state_[its_tuple] = subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED;
2986 }
2987
2988 std::set<client_t> subscribed_clients;
2989 if (_client == VSOMEIP_ROUTING_CLIENT) {
2990 for (const auto &its_event : its_eventgroup->get_events()) {
2991 if (_event == ANY_EVENT || _event == its_event->get_event()) {
2992 const auto &its_subscribers = its_event->get_subscribers();
2993 subscribed_clients.insert(its_subscribers.begin(), its_subscribers.end());
2994 }
2995 }
2996 } else {
2997 subscribed_clients.insert(_client);
2998 }
2999
3000 for (const auto &its_subscriber : subscribed_clients) {
3001 if (its_subscriber == get_client()) {
3002 if (_event == ANY_EVENT) {
3003 for (const auto &its_event : its_eventgroup->get_events()) {
3004 host_->on_subscription_status(_service, _instance,
3005 _eventgroup, its_event->get_event(),
3006 0x0 /*OK*/);
3007 }
3008 } else {
3009 host_->on_subscription_status(_service, _instance,
3010 _eventgroup, _event, 0x0 /*OK*/);
3011 }
3012 } else {
3013 stub_->send_subscribe_ack(its_subscriber, _service,
3014 _instance, _eventgroup, _event);
3015 }
3016 }
3017 }
3018 }
3019
find_or_create_remote_client(service_t _service,instance_t _instance,bool _reliable)3020 std::shared_ptr<endpoint> routing_manager_impl::find_or_create_remote_client(
3021 service_t _service, instance_t _instance, bool _reliable) {
3022 return ep_mgr_impl_->find_or_create_remote_client(_service,
3023 _instance, _reliable);
3024 }
3025
on_subscribe_nack(client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,remote_subscription_id_t _id,bool _simulated)3026 void routing_manager_impl::on_subscribe_nack(client_t _client,
3027 service_t _service, instance_t _instance, eventgroup_t _eventgroup,
3028 event_t _event, remote_subscription_id_t _id, bool _simulated) {
3029 (void)_event; // TODO: Remove completely?
3030
3031 auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
3032 if (its_eventgroup) {
3033 auto its_subscription = its_eventgroup->get_remote_subscription(_id);
3034 if (its_subscription) {
3035 if (_simulated) {
3036 // method was called because a subscription for unoffered
3037 // service was received. Therefore, remove the remote_subscription
3038 // from the eventgroupinfo to ensure subsequent similar
3039 // subscriptions are handled like a new/unknown subscription
3040 its_eventgroup->remove_remote_subscription(_id);
3041 }
3042 its_subscription->set_client_state(_client,
3043 remote_subscription_state_e::SUBSCRIPTION_NACKED);
3044
3045 auto its_parent = its_subscription->get_parent();
3046 if (its_parent) {
3047 its_parent->set_client_state(_client,
3048 remote_subscription_state_e::SUBSCRIPTION_NACKED);
3049 if (!its_subscription->is_pending()) {
3050 its_eventgroup->remove_remote_subscription(_id);
3051 }
3052 }
3053
3054 if (discovery_) {
3055 discovery_->update_remote_subscription(its_subscription);
3056 VSOMEIP_INFO << "REMOTE SUBSCRIBE("
3057 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
3058 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
3059 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
3060 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
3061 << " from " << its_subscription->get_subscriber()->get_address()
3062 << ":" << std::dec << its_subscription->get_subscriber()->get_port()
3063 << (its_subscription->get_subscriber()->is_reliable() ? " reliable" : " unreliable")
3064 << " was not accepted";
3065 }
3066 }
3067 }
3068 }
3069
check_error(const byte_t * _data,length_t _size,instance_t _instance)3070 return_code_e routing_manager_impl::check_error(const byte_t *_data, length_t _size,
3071 instance_t _instance) {
3072
3073 service_t its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
3074 _data[VSOMEIP_SERVICE_POS_MAX]);
3075
3076 if (_size >= VSOMEIP_PAYLOAD_POS) {
3077 if (utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])
3078 || utility::is_request_no_return(_data[VSOMEIP_MESSAGE_TYPE_POS]) ) {
3079 if (_data[VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
3080 VSOMEIP_WARNING << "Received a message with unsupported protocol version for service 0x"
3081 << std::hex << its_service;
3082 return return_code_e::E_WRONG_PROTOCOL_VERSION;
3083 }
3084 if (_instance == 0xFFFF) {
3085 VSOMEIP_WARNING << "Receiving endpoint is not configured for service 0x"
3086 << std::hex << its_service;
3087 return return_code_e::E_UNKNOWN_SERVICE;
3088 }
3089 // Check interface version of service/instance
3090 auto its_info = find_service(its_service, _instance);
3091 if (its_info) {
3092 major_version_t its_version = _data[VSOMEIP_INTERFACE_VERSION_POS];
3093 if (its_version != its_info->get_major()) {
3094 VSOMEIP_WARNING << "Received a message with unsupported interface version for service 0x"
3095 << std::hex << its_service;
3096 return return_code_e::E_WRONG_INTERFACE_VERSION;
3097 }
3098 }
3099 if (_data[VSOMEIP_RETURN_CODE_POS] != static_cast<byte_t> (return_code_e::E_OK)) {
3100 // Request calls must to have return code E_OK set!
3101 VSOMEIP_WARNING << "Received a message with unsupported return code set for service 0x"
3102 << std::hex << its_service;
3103 return return_code_e::E_NOT_OK;
3104 }
3105 }
3106 } else {
3107 // Message shorter than vSomeIP message header
3108 VSOMEIP_WARNING << "Received a message message which is shorter than vSomeIP message header!";
3109 return return_code_e::E_MALFORMED_MESSAGE;
3110 }
3111 return return_code_e::E_OK;
3112 }
3113
send_error(return_code_e _return_code,const byte_t * _data,length_t _size,instance_t _instance,bool _reliable,endpoint * const _receiver,const boost::asio::ip::address & _remote_address,std::uint16_t _remote_port)3114 void routing_manager_impl::send_error(return_code_e _return_code,
3115 const byte_t *_data, length_t _size,
3116 instance_t _instance, bool _reliable,
3117 endpoint* const _receiver,
3118 const boost::asio::ip::address &_remote_address,
3119 std::uint16_t _remote_port) {
3120
3121 client_t its_client = 0;
3122 service_t its_service = 0;
3123 method_t its_method = 0;
3124 session_t its_session = 0;
3125 major_version_t its_version = 0;
3126
3127 if (_size >= VSOMEIP_CLIENT_POS_MAX) {
3128 its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
3129 _data[VSOMEIP_CLIENT_POS_MAX]);
3130 }
3131 if (_size >= VSOMEIP_SERVICE_POS_MAX) {
3132 its_service = VSOMEIP_BYTES_TO_WORD(
3133 _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
3134 }
3135 if (_size >= VSOMEIP_METHOD_POS_MAX) {
3136 its_method = VSOMEIP_BYTES_TO_WORD(
3137 _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
3138 }
3139 if (_size >= VSOMEIP_SESSION_POS_MAX) {
3140 its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
3141 _data[VSOMEIP_SESSION_POS_MAX]);
3142 }
3143 if( _size >= VSOMEIP_INTERFACE_VERSION_POS) {
3144 its_version = _data[VSOMEIP_INTERFACE_VERSION_POS];
3145 }
3146
3147 auto error_message = runtime::get()->create_message(_reliable);
3148 error_message->set_client(its_client);
3149 error_message->set_instance(_instance);
3150 error_message->set_interface_version(its_version);
3151 error_message->set_message_type(message_type_e::MT_ERROR);
3152 error_message->set_method(its_method);
3153 error_message->set_return_code(_return_code);
3154 error_message->set_service(its_service);
3155 error_message->set_session(its_session);
3156 {
3157 std::shared_ptr<serializer> its_serializer(get_serializer());
3158 if (its_serializer->serialize(error_message.get())) {
3159 if (_receiver) {
3160 auto its_endpoint_def = std::make_shared<endpoint_definition>(
3161 _remote_address, _remote_port,
3162 _receiver->is_reliable());
3163 its_endpoint_def->set_remote_port(_receiver->get_local_port());
3164 std::shared_ptr<endpoint> its_endpoint =
3165 ep_mgr_impl_->find_server_endpoint(
3166 its_endpoint_def->get_remote_port(),
3167 its_endpoint_def->is_reliable());
3168 if (its_endpoint) {
3169 #ifdef USE_DLT
3170 const uint16_t its_data_size
3171 = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
3172
3173 trace::header its_header;
3174 if (its_header.prepare(its_endpoint, true, _instance))
3175 tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
3176 _data, its_data_size);
3177 #else
3178 (void) _instance;
3179 #endif
3180 its_endpoint->send_error(its_endpoint_def,
3181 its_serializer->get_data(), its_serializer->get_size());
3182 }
3183 }
3184 its_serializer->reset();
3185 put_serializer(its_serializer);
3186 } else {
3187 VSOMEIP_ERROR<< "Failed to serialize error message.";
3188 }
3189 }
3190 }
3191
clear_remote_subscriber(service_t _service,instance_t _instance,client_t _client,const std::shared_ptr<endpoint_definition> & _target)3192 void routing_manager_impl::clear_remote_subscriber(
3193 service_t _service, instance_t _instance, client_t _client,
3194 const std::shared_ptr<endpoint_definition> &_target) {
3195 std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
3196 auto its_service = remote_subscribers_.find(_service);
3197 if (its_service != remote_subscribers_.end()) {
3198 auto its_instance = its_service->second.find(_instance);
3199 if (its_instance != its_service->second.end()) {
3200 auto its_client = its_instance->second.find(_client);
3201 if (its_client != its_instance->second.end()) {
3202 if (its_client->second.erase(_target)) {
3203 if (!its_client->second.size()) {
3204 its_instance->second.erase(_client);
3205 }
3206 }
3207 }
3208 }
3209 }
3210 }
3211
3212 std::chrono::steady_clock::time_point
expire_subscriptions(bool _force)3213 routing_manager_impl::expire_subscriptions(bool _force) {
3214 std::map<service_t,
3215 std::map<instance_t,
3216 std::map<eventgroup_t,
3217 std::shared_ptr<eventgroupinfo> > > >its_eventgroups;
3218 std::map<std::shared_ptr<remote_subscription>,
3219 std::set<client_t> > its_expired_subscriptions;
3220
3221 std::chrono::steady_clock::time_point now
3222 = std::chrono::steady_clock::now();
3223 std::chrono::steady_clock::time_point its_next_expiration
3224 = std::chrono::steady_clock::now() + std::chrono::hours(24);
3225 {
3226 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
3227 its_eventgroups = eventgroups_;
3228 }
3229
3230 for (auto &its_service : its_eventgroups) {
3231 for (auto &its_instance : its_service.second) {
3232 for (auto &its_eventgroup : its_instance.second) {
3233 auto its_subscriptions
3234 = its_eventgroup.second->get_remote_subscriptions();
3235 for (auto &s : its_subscriptions) {
3236 for (auto its_client : s->get_clients()) {
3237 if (_force) {
3238 its_expired_subscriptions[s].insert(its_client);
3239 } else {
3240 auto its_expiration = s->get_expiration(its_client);
3241 if (its_expiration != std::chrono::steady_clock::time_point()) {
3242 if (its_expiration < now) {
3243 its_expired_subscriptions[s].insert(its_client);
3244 } else if (its_expiration < its_next_expiration) {
3245 its_next_expiration = its_expiration;
3246 }
3247 }
3248 }
3249 }
3250 }
3251 }
3252 }
3253 }
3254
3255 for (auto &s : its_expired_subscriptions) {
3256 auto its_info = s.first->get_eventgroupinfo();
3257 if (its_info) {
3258 auto its_service = its_info->get_service();
3259 auto its_instance = its_info->get_instance();
3260 auto its_eventgroup = its_info->get_eventgroup();
3261
3262 remote_subscription_id_t its_id;
3263 update_remote_subscription_mutex_.lock();
3264 auto its_result = its_info->update_remote_subscription(
3265 s.first, std::chrono::steady_clock::now(),
3266 s.second, its_id, false);
3267 if (its_result) {
3268 const client_t its_offering_client
3269 = find_local_client(its_service, its_instance);
3270 const auto its_subscription = its_info->get_remote_subscription(its_id);
3271 if (its_subscription) {
3272 its_info->remove_remote_subscription(its_id);
3273
3274 std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
3275 remote_subscribers_[its_service][its_instance].erase(its_offering_client);
3276
3277 if (its_info->get_remote_subscriptions().size() == 0) {
3278 for (const auto &its_event : its_info->get_events()) {
3279 bool has_remote_subscriber(false);
3280 for (const auto &its_eventgroup : its_event->get_eventgroups()) {
3281 const auto its_eventgroup_info
3282 = find_eventgroup(its_service, its_instance, its_eventgroup);
3283 if (its_eventgroup_info
3284 && its_eventgroup_info->get_remote_subscriptions().size() > 0) {
3285 has_remote_subscriber = true;
3286 }
3287 }
3288 if (!has_remote_subscriber && its_event->is_shadow()) {
3289 its_event->unset_payload();
3290 }
3291 }
3292 }
3293 } else {
3294 VSOMEIP_ERROR << __func__
3295 << ": Unknown expired subscription " << std::dec << its_id << " for eventgroup ["
3296 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
3297 << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
3298 << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]";
3299 }
3300 send_expired_subscription(its_offering_client,
3301 its_service, its_instance, its_eventgroup,
3302 s.second, s.first->get_id());
3303 }
3304 update_remote_subscription_mutex_.unlock();
3305
3306 if (s.first->get_unreliable()) {
3307 VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription ["
3308 << std::hex << std::setfill('0') << std::setw(4) << its_service << "."
3309 << std::hex << std::setfill('0') << std::setw(4) << its_instance << "."
3310 << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] unreliable from "
3311 << s.first->get_unreliable()->get_address() << ":"
3312 << std::dec << s.first->get_unreliable()->get_port();
3313 }
3314
3315 if (s.first->get_reliable()) {
3316 VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription ["
3317 << std::hex << std::setfill('0') << std::setw(4) << its_service << "."
3318 << std::hex << std::setfill('0') << std::setw(4) << its_instance << "."
3319 << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] reliable from "
3320 << s.first->get_reliable()->get_address() << ":"
3321 << std::dec << s.first->get_reliable()->get_port();
3322 }
3323 }
3324 }
3325
3326 return its_next_expiration;
3327 }
3328
log_version_timer_cbk(boost::system::error_code const & _error)3329 void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const & _error) {
3330 if (!_error) {
3331
3332 #ifndef VSOMEIP_VERSION
3333 #define VSOMEIP_VERSION "unknown version"
3334 #endif
3335 static int its_counter(0);
3336 static uint32_t its_interval = configuration_->get_log_version_interval();
3337
3338 bool is_diag_mode(false);
3339
3340 if (discovery_) {
3341 is_diag_mode = discovery_->get_diagnosis_mode();
3342 }
3343 std::stringstream its_last_resume;
3344 {
3345 std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
3346 if (last_resume_ != std::chrono::steady_clock::time_point::min()) {
3347 its_last_resume << " | " << std::dec
3348 << std::chrono::duration_cast<std::chrono::seconds>(
3349 std::chrono::steady_clock::now() - last_resume_).count() << "s";
3350 }
3351 }
3352
3353 VSOMEIP_INFO << "vSomeIP " << VSOMEIP_VERSION << " | ("
3354 << ((is_diag_mode == true) ? "diagnosis)" : "default)")
3355 << its_last_resume.str();
3356
3357 its_counter++;
3358 if (its_counter == 6) {
3359 ep_mgr_->log_client_states();
3360 ep_mgr_impl_->log_client_states();
3361 its_counter = 0;
3362 }
3363
3364 {
3365 std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
3366 version_log_timer_.expires_from_now(std::chrono::seconds(its_interval));
3367 version_log_timer_.async_wait(
3368 std::bind(&routing_manager_impl::log_version_timer_cbk,
3369 this, std::placeholders::_1));
3370 }
3371 }
3372 }
3373
handle_local_offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)3374 bool routing_manager_impl::handle_local_offer_service(client_t _client, service_t _service,
3375 instance_t _instance, major_version_t _major,minor_version_t _minor) {
3376 {
3377 std::lock_guard<std::mutex> its_lock(local_services_mutex_);
3378 auto found_service = local_services_.find(_service);
3379 if (found_service != local_services_.end()) {
3380 auto found_instance = found_service->second.find(_instance);
3381 if (found_instance != found_service->second.end()) {
3382 const major_version_t its_stored_major(std::get<0>(found_instance->second));
3383 const minor_version_t its_stored_minor(std::get<1>(found_instance->second));
3384 const client_t its_stored_client(std::get<2>(found_instance->second));
3385 if ( its_stored_major == _major
3386 && its_stored_minor == _minor
3387 && its_stored_client == _client) {
3388 VSOMEIP_WARNING << "routing_manager_impl::handle_local_offer_service: "
3389 << "Application: " << std::hex << std::setfill('0')
3390 << std::setw(4) << _client << " is offering: ["
3391 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3392 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3393 << std::dec << static_cast<std::uint32_t>(_major) << "."
3394 << _minor << "] offered previously by itself.";
3395 return false;
3396 } else if ( its_stored_major == _major
3397 && its_stored_minor == _minor
3398 && its_stored_client != _client) {
3399 // check if previous offering application is still alive
3400 bool already_pinged(false);
3401 {
3402 std::lock_guard<std::mutex> its_lock(pending_offers_mutex_);
3403 auto found_service2 = pending_offers_.find(_service);
3404 if (found_service2 != pending_offers_.end()) {
3405 auto found_instance2 = found_service2->second.find(_instance);
3406 if (found_instance2 != found_service2->second.end()) {
3407 if(std::get<2>(found_instance2->second) == _client) {
3408 already_pinged = true;
3409 } else {
3410 VSOMEIP_ERROR << "routing_manager_impl::handle_local_offer_service: "
3411 << "rejecting service registration. Application: "
3412 << std::hex << std::setfill('0') << std::setw(4)
3413 << _client << " is trying to offer ["
3414 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3415 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3416 << std::dec << static_cast<std::uint32_t>(_major) << "."
3417 << std::dec << _minor
3418 << "] current pending offer by application: " << std::hex
3419 << std::setfill('0') << std::setw(4)
3420 << its_stored_client << ": ["
3421 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3422 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3423 << std::dec << static_cast<std::uint32_t>(its_stored_major)
3424 << "." << its_stored_minor << "]";
3425 return false;
3426 }
3427 }
3428 }
3429 }
3430 if (!already_pinged) {
3431 // find out endpoint of previously offering application
3432 std::shared_ptr<local_client_endpoint_base_impl>
3433 its_old_endpoint
3434 = std::dynamic_pointer_cast<local_client_endpoint_base_impl>(
3435 find_local(its_stored_client));
3436 if (its_old_endpoint) {
3437 std::lock_guard<std::mutex> its_lock(pending_offers_mutex_);
3438 if(stub_->send_ping(its_stored_client)) {
3439 pending_offers_[_service][_instance] =
3440 std::make_tuple(_major, _minor, _client,
3441 its_stored_client);
3442 VSOMEIP_WARNING << "OFFER("
3443 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
3444 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
3445 << std::hex << std::setw(4) << std::setfill('0') << _instance
3446 << ":" << std::dec << int(_major) << "." << std::dec << _minor
3447 << "] is now pending. Waiting for pong from application: "
3448 << std::hex << std::setw(4) << std::setfill('0') << its_stored_client;
3449 return false;
3450 }
3451 } else if (its_stored_client == host_->get_client()) {
3452 VSOMEIP_ERROR << "routing_manager_impl::handle_local_offer_service: "
3453 << "rejecting service registration. Application: "
3454 << std::hex << std::setfill('0') << std::setw(4)
3455 << _client << " is trying to offer ["
3456 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3457 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3458 << std::dec << static_cast<std::uint32_t>(_major) << "."
3459 << std::dec << _minor
3460 << "] offered previously by routing manager stub itself with application: "
3461 << std::hex << std::setfill('0') << std::setw(4)
3462 << its_stored_client << ": ["
3463 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3464 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3465 << std::dec << static_cast<std::uint32_t>(its_stored_major)
3466 << "." << its_stored_minor << "] which is still alive";
3467 return false;
3468 }
3469 } else {
3470 return false;
3471 }
3472 } else {
3473 VSOMEIP_ERROR << "routing_manager_impl::handle_local_offer_service: "
3474 << "rejecting service registration. Application: "
3475 << std::hex << std::setfill('0') << std::setw(4)
3476 << _client << " is trying to offer ["
3477 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3478 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3479 << std::dec << static_cast<std::uint32_t>(_major) << "."
3480 << std::dec << _minor
3481 << "] offered previously by application: " << std::hex
3482 << std::setfill('0') << std::setw(4)
3483 << its_stored_client << ": ["
3484 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3485 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3486 << std::dec << static_cast<std::uint32_t>(its_stored_major)
3487 << "." << its_stored_minor << "]";
3488 return false;
3489 }
3490 }
3491 }
3492
3493 // check if the same service instance is already offered remotely
3494 if (routing_manager_base::offer_service(_client, _service, _instance,
3495 _major, _minor)) {
3496 local_services_[_service][_instance] = std::make_tuple(_major,
3497 _minor, _client);
3498 } else {
3499 VSOMEIP_ERROR << "routing_manager_impl::handle_local_offer_service: "
3500 << "rejecting service registration. Application: "
3501 << std::hex << std::setfill('0') << std::setw(4)
3502 << _client << " is trying to offer ["
3503 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3504 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3505 << std::dec << static_cast<std::uint32_t>(_major) << "."
3506 << std::dec << _minor << "]"
3507 << "] already offered remotely";
3508 return false;
3509 }
3510 }
3511 return true;
3512 }
3513
on_pong(client_t _client)3514 void routing_manager_impl::on_pong(client_t _client) {
3515 std::lock_guard<std::mutex> its_lock(pending_offers_mutex_);
3516 if (pending_offers_.size() == 0) {
3517 return;
3518 }
3519 for (auto service_iter = pending_offers_.begin();
3520 service_iter != pending_offers_.end(); ) {
3521 for (auto instance_iter = service_iter->second.begin();
3522 instance_iter != service_iter->second.end(); ) {
3523 if (std::get<3>(instance_iter->second) == _client) {
3524 // received pong from an application were another application wants
3525 // to offer its service, delete the other applications offer as
3526 // the current offering application is still alive
3527 VSOMEIP_WARNING << "OFFER("
3528 << std::hex << std::setw(4) << std::setfill('0')
3529 << std::get<2>(instance_iter->second) <<"): ["
3530 << std::hex << std::setw(4) << std::setfill('0')
3531 << service_iter->first << "."
3532 << std::hex << std::setw(4) << std::setfill('0')
3533 << instance_iter->first << ":" << std::dec
3534 << std::uint32_t(std::get<0>(instance_iter->second))
3535 << "." << std::dec << std::get<1>(instance_iter->second)
3536 << "] was rejected as application: "
3537 << std::hex << std::setw(4) << std::setfill('0') << _client
3538 << " is still alive";
3539 instance_iter = service_iter->second.erase(instance_iter);
3540 } else {
3541 ++instance_iter;
3542 }
3543 }
3544
3545 if (service_iter->second.size() == 0) {
3546 service_iter = pending_offers_.erase(service_iter);
3547 } else {
3548 ++service_iter;
3549 }
3550 }
3551 }
3552
register_client_error_handler(client_t _client,const std::shared_ptr<endpoint> & _endpoint)3553 void routing_manager_impl::register_client_error_handler(client_t _client,
3554 const std::shared_ptr<endpoint> &_endpoint) {
3555 _endpoint->register_error_handler(
3556 std::bind(&routing_manager_impl::handle_client_error, this, _client));
3557 }
3558
handle_client_error(client_t _client)3559 void routing_manager_impl::handle_client_error(client_t _client) {
3560 VSOMEIP_INFO << "Client 0x" << std::hex << get_client()
3561 << " handles a client error(" << std::hex << _client << ")";
3562 if (stub_)
3563 stub_->update_registration(_client, registration_type_e::DEREGISTER_ON_ERROR);
3564
3565 std::forward_list<std::tuple<client_t, service_t, instance_t, major_version_t,
3566 minor_version_t>> its_offers;
3567 {
3568 std::lock_guard<std::mutex> its_lock(pending_offers_mutex_);
3569 if (pending_offers_.size() == 0) {
3570 return;
3571 }
3572
3573 for (auto service_iter = pending_offers_.begin();
3574 service_iter != pending_offers_.end(); ) {
3575 for (auto instance_iter = service_iter->second.begin();
3576 instance_iter != service_iter->second.end(); ) {
3577 if (std::get<3>(instance_iter->second) == _client) {
3578 VSOMEIP_WARNING << "OFFER("
3579 << std::hex << std::setw(4) << std::setfill('0')
3580 << std::get<2>(instance_iter->second) <<"): ["
3581 << std::hex << std::setw(4) << std::setfill('0')
3582 << service_iter->first << "."
3583 << std::hex << std::setw(4) << std::setfill('0')
3584 << instance_iter->first << ":" << std::dec
3585 << std::uint32_t(std::get<0>(instance_iter->second))
3586 << "." << std::dec << std::get<1>(instance_iter->second)
3587 << "] is not pending anymore as application: "
3588 << std::hex << std::setw(4) << std::setfill('0')
3589 << std::get<3>(instance_iter->second)
3590 << " is dead. Offering again!";
3591 its_offers.push_front(std::make_tuple(
3592 std::get<2>(instance_iter->second),
3593 service_iter->first,
3594 instance_iter->first,
3595 std::get<0>(instance_iter->second),
3596 std::get<1>(instance_iter->second)));
3597 instance_iter = service_iter->second.erase(instance_iter);
3598 } else {
3599 ++instance_iter;
3600 }
3601 }
3602
3603 if (service_iter->second.size() == 0) {
3604 service_iter = pending_offers_.erase(service_iter);
3605 } else {
3606 ++service_iter;
3607 }
3608 }
3609 }
3610 for (const auto &offer : its_offers) {
3611 offer_service(std::get<0>(offer), std::get<1>(offer), std::get<2>(offer),
3612 std::get<3>(offer), std::get<4>(offer), true);
3613 }
3614 }
3615
get_endpoint_manager() const3616 std::shared_ptr<endpoint_manager_impl> routing_manager_impl::get_endpoint_manager() const {
3617 return ep_mgr_impl_;
3618 }
3619
send_subscribe(client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,major_version_t _major,event_t _event)3620 void routing_manager_impl::send_subscribe(client_t _client, service_t _service,
3621 instance_t _instance, eventgroup_t _eventgroup, major_version_t _major,
3622 event_t _event) {
3623 auto endpoint = ep_mgr_->find_local(_service, _instance);
3624 if (endpoint) {
3625 stub_->send_subscribe(endpoint, _client,
3626 _service, _instance, _eventgroup, _major, _event, PENDING_SUBSCRIPTION_ID);
3627 }
3628 }
3629
set_routing_state(routing_state_e _routing_state)3630 void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
3631
3632 // Ignore setting to the current routing state
3633 {
3634 std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
3635 if (routing_state_ == _routing_state) {
3636 VSOMEIP_INFO << "rmi::" << __func__ << " No routing state change --> do nothing.";
3637 return;
3638 }
3639
3640 routing_state_ = _routing_state;
3641 }
3642
3643 if(discovery_) {
3644 switch (_routing_state) {
3645 case routing_state_e::RS_SUSPENDED:
3646 {
3647 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode, diagnosis mode is "
3648 << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3649
3650 // stop processing of incoming SD messages
3651 discovery_->stop();
3652
3653 VSOMEIP_INFO << "rmi::" << __func__ << " Inform all applications that we are going to suspend";
3654 send_suspend();
3655
3656 // remove all remote subscriptions to remotely offered services on this node
3657 expire_subscriptions(true);
3658
3659 // send StopOffer messages for remotely offered services on this node
3660 for (const auto &its_service : get_offered_services()) {
3661 for (const auto &its_instance : its_service.second) {
3662 if (its_instance.second->get_endpoint(true) || its_instance.second->get_endpoint(false)) {
3663 const client_t its_client(find_local_client(its_service.first, its_instance.first));
3664 VSOMEIP_WARNING << "service "
3665 << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "."
3666 << std::hex << std::setw(4) << std::setfill('0') << its_instance.first << " still offered by "
3667 << std::hex << std::setw(4) << std::setfill('0') << its_client;
3668 }
3669 discovery_->stop_offer_service(its_instance.second);
3670 }
3671 }
3672 {
3673 std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
3674 remote_subscription_state_.clear();
3675 }
3676
3677 // send StopSubscribes and clear subscribed_ map
3678 discovery_->unsubscribe_all_on_suspend();
3679
3680 // mark all external services as offline
3681 services_t its_remote_services;
3682 {
3683 std::lock_guard<std::mutex> its_lock(services_remote_mutex_);
3684 its_remote_services = services_remote_;
3685 }
3686 for (const auto &s : its_remote_services) {
3687 for (const auto &i : s.second) {
3688 const bool has_reliable(i.second->get_endpoint(true));
3689 const bool has_unreliable(i.second->get_endpoint(false));
3690 del_routing_info(s.first, i.first, has_reliable, has_unreliable);
3691
3692 // clear all cached payloads of remote services
3693 unset_all_eventpayloads(s.first, i.first);
3694 }
3695 }
3696
3697 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode done, diagnosis mode is "
3698 << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3699
3700 break;
3701 }
3702 case routing_state_e::RS_RESUMED:
3703 {
3704 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode, diagnosis mode was "
3705 << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3706 {
3707 std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
3708 last_resume_ = std::chrono::steady_clock::now();
3709 }
3710
3711 // Reset relevant in service info
3712 for (const auto &its_service : get_offered_services()) {
3713 for (const auto &its_instance : its_service.second) {
3714 its_instance.second->set_ttl(DEFAULT_TTL);
3715 its_instance.second->set_is_in_mainphase(false);
3716 }
3717 }
3718 // Switch SD back to normal operation
3719 discovery_->set_diagnosis_mode(false);
3720
3721 if (routing_state_handler_) {
3722 routing_state_handler_(_routing_state);
3723 }
3724
3725 // start processing of SD messages (incoming remote offers should lead to new subscribe messages)
3726 discovery_->start();
3727
3728 // Trigger initial offer phase for relevant services
3729 for (const auto &its_service : get_offered_services()) {
3730 for (const auto &its_instance : its_service.second) {
3731 discovery_->offer_service(its_instance.second);
3732 }
3733 }
3734
3735 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode done, diagnosis mode was "
3736 << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3737 break;
3738 }
3739 case routing_state_e::RS_DIAGNOSIS:
3740 {
3741 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode.";
3742 discovery_->set_diagnosis_mode(true);
3743
3744 // send StopOffer messages for all someip protocol services
3745 for (const auto &its_service : get_offered_services()) {
3746 for (const auto &its_instance : its_service.second) {
3747 if (host_->get_configuration()->is_someip(
3748 its_service.first, its_instance.first)) {
3749 discovery_->stop_offer_service(its_instance.second);
3750 }
3751 }
3752 }
3753
3754 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode done.";
3755 break;
3756 }
3757 case routing_state_e::RS_RUNNING:
3758 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode, diagnosis mode was "
3759 << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3760
3761 // Reset relevant in service info
3762 for (const auto &its_service : get_offered_services()) {
3763 for (const auto &its_instance : its_service.second) {
3764 if (host_->get_configuration()->is_someip(
3765 its_service.first, its_instance.first)) {
3766 its_instance.second->set_ttl(DEFAULT_TTL);
3767 its_instance.second->set_is_in_mainphase(false);
3768 }
3769 }
3770 }
3771 // Switch SD back to normal operation
3772 discovery_->set_diagnosis_mode(false);
3773
3774 // Trigger initial phase for relevant services
3775 for (const auto &its_service : get_offered_services()) {
3776 for (const auto &its_instance : its_service.second) {
3777 if (host_->get_configuration()->is_someip(
3778 its_service.first, its_instance.first)) {
3779 discovery_->offer_service(its_instance.second);
3780 }
3781 }
3782 }
3783
3784 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode done, diagnosis mode was "
3785 << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3786 break;
3787 default:
3788 break;
3789 }
3790 }
3791 }
3792
on_net_interface_or_route_state_changed(bool _is_interface,std::string _if,bool _available)3793 void routing_manager_impl::on_net_interface_or_route_state_changed(
3794 bool _is_interface, std::string _if, bool _available) {
3795 std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
3796 auto log_change_message = [&_if, _available, _is_interface](bool _warning) {
3797 std::stringstream ss;
3798 ss << (_is_interface ? "Network interface" : "Route") << " \"" << _if
3799 << "\" state changed: " << (_available ? "up" : "down");
3800 if (_warning) {
3801 VSOMEIP_WARNING << ss.str();
3802 } else {
3803 VSOMEIP_INFO << ss.str();
3804 }
3805 };
3806 if (_is_interface) {
3807 if (if_state_running_
3808 || (_available && !if_state_running_ && routing_running_)) {
3809 log_change_message(true);
3810 } else if (!if_state_running_) {
3811 log_change_message(false);
3812 }
3813 if (_available && !if_state_running_) {
3814 if_state_running_ = true;
3815 if (!routing_running_) {
3816 if(configuration_->is_sd_enabled()) {
3817 if (sd_route_set_) {
3818 start_ip_routing();
3819 }
3820 } else {
3821 // Static routing, don't wait for route!
3822 start_ip_routing();
3823 }
3824 }
3825 }
3826 } else {
3827 if (sd_route_set_
3828 || (_available && !sd_route_set_ && routing_running_)) {
3829 log_change_message(true);
3830 } else if (!sd_route_set_) {
3831 log_change_message(false);
3832 }
3833 if (_available && !sd_route_set_) {
3834 sd_route_set_ = true;
3835 if (!routing_running_) {
3836 if (if_state_running_) {
3837 start_ip_routing();
3838 }
3839 }
3840 }
3841 }
3842 }
3843
start_ip_routing()3844 void routing_manager_impl::start_ip_routing() {
3845 #ifdef _WIN32
3846 if_state_running_ = true;
3847 #endif
3848
3849 if (routing_ready_handler_) {
3850 routing_ready_handler_();
3851 }
3852 if (discovery_) {
3853 discovery_->start();
3854 } else {
3855 init_routing_info();
3856 }
3857
3858 for (auto its_service : pending_sd_offers_) {
3859 init_service_info(its_service.first, its_service.second, true);
3860 }
3861 pending_sd_offers_.clear();
3862
3863 routing_running_ = true;
3864 VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE;
3865 }
3866
requested_service_add(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)3867 void routing_manager_impl::requested_service_add(client_t _client,
3868 service_t _service,
3869 instance_t _instance,
3870 major_version_t _major,
3871 minor_version_t _minor) {
3872 std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
3873 requested_services_[_client][_service][_instance].insert({ _major, _minor });
3874 }
3875
requested_service_remove(client_t _client,service_t _service,instance_t _instance)3876 void routing_manager_impl::requested_service_remove(client_t _client,
3877 service_t _service,
3878 instance_t _instance) {
3879 std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
3880 auto found_client = requested_services_.find(_client);
3881 if (found_client != requested_services_.end()) {
3882 auto found_service = found_client->second.find(_service);
3883 if (found_service != found_client->second.end()) {
3884 auto found_instance = found_service->second.find(_instance);
3885 if (found_instance != found_service->second.end()) {
3886 // delete all requested major/minor versions
3887 found_service->second.erase(_instance);
3888 if (!found_service->second.size()) {
3889 found_client->second.erase(_service);
3890 if (!found_client->second.size()) {
3891 requested_services_.erase(_client);
3892 }
3893 }
3894 }
3895 }
3896 }
3897 }
3898
3899 std::set<eventgroup_t>
get_subscribed_eventgroups(service_t _service,instance_t _instance)3900 routing_manager_impl::get_subscribed_eventgroups(
3901 service_t _service, instance_t _instance) {
3902 std::set<eventgroup_t> its_eventgroups;
3903
3904 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
3905 auto found_service = eventgroups_.find(_service);
3906 if (found_service != eventgroups_.end()) {
3907 auto found_instance = found_service->second.find(_instance);
3908 if (found_instance != found_service->second.end()) {
3909 for (const auto& its_group : found_instance->second) {
3910 for (const auto& its_event : its_group.second->get_events()) {
3911 if (its_event->has_subscriber(its_group.first, ANY_CLIENT)) {
3912 its_eventgroups.insert(its_group.first);
3913 }
3914 }
3915 }
3916 }
3917 }
3918
3919 return its_eventgroups;
3920 }
3921
clear_targets_and_pending_sub_from_eventgroups(service_t _service,instance_t _instance)3922 void routing_manager_impl::clear_targets_and_pending_sub_from_eventgroups(
3923 service_t _service, instance_t _instance) {
3924 std::vector<std::shared_ptr<event>> its_events;
3925 {
3926 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
3927 auto found_service = eventgroups_.find(_service);
3928 if (found_service != eventgroups_.end()) {
3929 auto found_instance = found_service->second.find(_instance);
3930 if (found_instance != found_service->second.end()) {
3931 for (const auto &its_eventgroup : found_instance->second) {
3932 // As the service is gone, all subscriptions to its events
3933 // do no longer exist and the last received payload is no
3934 // longer valid.
3935 for (auto &its_event : its_eventgroup.second->get_events()) {
3936 const auto its_subscribers = its_event->get_subscribers();
3937 for (const auto& its_subscriber : its_subscribers) {
3938 if (its_subscriber != get_client()) {
3939 its_event->remove_subscriber(
3940 its_eventgroup.first, its_subscriber);
3941 }
3942
3943 client_t its_client = VSOMEIP_ROUTING_CLIENT; //is_specific_endpoint_client(its_subscriber, _service, _instance);
3944 {
3945 std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
3946 const auto its_tuple =
3947 std::make_tuple(found_service->first, found_instance->first,
3948 its_eventgroup.first, its_client);
3949 remote_subscription_state_.erase(its_tuple);
3950 }
3951 }
3952 its_events.push_back(its_event);
3953 }
3954 // TODO dn: find out why this was commented out
3955 //its_eventgroup.second->clear_targets();
3956 //its_eventgroup.second->clear_pending_subscriptions();
3957 }
3958 }
3959 }
3960 }
3961 for (const auto& e : its_events) {
3962 e->unset_payload(true);
3963 }
3964 }
3965
clear_remote_subscriber(service_t _service,instance_t _instance)3966 void routing_manager_impl::clear_remote_subscriber(service_t _service,
3967 instance_t _instance) {
3968 std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
3969 auto found_service = remote_subscribers_.find(_service);
3970 if (found_service != remote_subscribers_.end()) {
3971 if (found_service->second.erase(_instance) > 0 &&
3972 !found_service->second.size()) {
3973 remote_subscribers_.erase(found_service);
3974 }
3975 }
3976 }
3977
3978
call_sd_endpoint_connected(const boost::system::error_code & _error,service_t _service,instance_t _instance,const std::shared_ptr<endpoint> & _endpoint,std::shared_ptr<boost::asio::steady_timer> _timer)3979 void routing_manager_impl::call_sd_endpoint_connected(
3980 const boost::system::error_code& _error,
3981 service_t _service, instance_t _instance,
3982 const std::shared_ptr<endpoint>& _endpoint,
3983 std::shared_ptr<boost::asio::steady_timer> _timer) {
3984 (void)_timer;
3985 if (_error) {
3986 return;
3987 }
3988 _endpoint->set_established(true);
3989 if (discovery_) {
3990 discovery_->on_endpoint_connected(_service, _instance,
3991 _endpoint);
3992 }
3993 }
3994
create_placeholder_event_and_subscribe(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,client_t _client)3995 bool routing_manager_impl::create_placeholder_event_and_subscribe(
3996 service_t _service, instance_t _instance, eventgroup_t _eventgroup,
3997 event_t _event, client_t _client) {
3998 bool is_inserted(false);
3999 // we received a event which was not yet requested/offered
4000 // create a placeholder field until someone requests/offers this event with
4001 // full information like eventgroup, field or not etc.
4002 std::set<eventgroup_t> its_eventgroups({_eventgroup});
4003
4004 const client_t its_local_client(find_local_client(_service, _instance));
4005 if (its_local_client == host_->get_client()) {
4006 // received subscription for event of a service instance hosted by
4007 // application acting as rm_impl register with own client id and shadow = false
4008 register_event(host_->get_client(),
4009 _service, _instance,
4010 _event,
4011 its_eventgroups, event_type_e::ET_UNKNOWN, reliability_type_e::RT_UNKNOWN,
4012 std::chrono::milliseconds::zero(), false, true,
4013 nullptr, false, false, true);
4014 } else if (its_local_client != VSOMEIP_ROUTING_CLIENT) {
4015 // received subscription for event of a service instance hosted on
4016 // this node register with client id of local_client and set shadow to true
4017 register_event(its_local_client,
4018 _service, _instance,
4019 _event, its_eventgroups, event_type_e::ET_UNKNOWN,
4020 reliability_type_e::RT_UNKNOWN,
4021 std::chrono::milliseconds::zero(), false, true,
4022 nullptr, false, true, true);
4023 } else {
4024 // received subscription for event of a unknown or remote service instance
4025 std::shared_ptr<serviceinfo> its_info = find_service(_service,
4026 _instance);
4027 if (its_info && !its_info->is_local()) {
4028 // remote service, register shadow event with client ID of subscriber
4029 // which should have called register_event
4030 register_event(_client,
4031 _service, _instance,
4032 _event, its_eventgroups, event_type_e::ET_UNKNOWN,
4033 reliability_type_e::RT_UNKNOWN,
4034 std::chrono::milliseconds::zero(),
4035 false, true, nullptr, false, true, true);
4036 } else {
4037 VSOMEIP_WARNING
4038 << "routing_manager_impl::create_placeholder_event_and_subscribe("
4039 << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
4040 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
4041 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
4042 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
4043 << std::hex << std::setw(4) << std::setfill('0') << _event << "]"
4044 << " received subscription for unknown service instance.";
4045 }
4046 }
4047
4048 std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
4049 if (its_event) {
4050 is_inserted = its_event->add_subscriber(_eventgroup, _client, false);
4051 }
4052 return is_inserted;
4053 }
4054
handle_subscription_state(client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)4055 void routing_manager_impl::handle_subscription_state(
4056 client_t _client, service_t _service, instance_t _instance,
4057 eventgroup_t _eventgroup, event_t _event) {
4058 #if 0
4059 VSOMEIP_ERROR << "routing_manager_impl::" << __func__
4060 << "(" << std::hex << _client << "): "
4061 << "event="
4062 << std::hex << _service << "."
4063 << std::hex << _instance << "."
4064 << std::hex << _eventgroup << "."
4065 << std::hex << _event
4066 << " me="
4067 << std::hex << get_client();
4068 #endif
4069 // Note: remote_subscription_state_mutex_ is already locked as this
4070 // method builds a critical section together with insert_subscription
4071 // from routing_manager_base.
4072 // Todo: Improve this situation...
4073 auto its_event = find_event(_service, _instance, _event);
4074 client_t its_client(VSOMEIP_ROUTING_CLIENT);
4075 if (its_event &&
4076 its_event->get_type() == event_type_e::ET_SELECTIVE_EVENT) {
4077 its_client = _client;
4078 }
4079
4080 auto its_tuple
4081 = std::make_tuple(_service, _instance, _eventgroup, its_client);
4082 auto its_state = remote_subscription_state_.find(its_tuple);
4083 if (its_state != remote_subscription_state_.end()) {
4084 #if 0
4085 VSOMEIP_ERROR << "routing_manager_impl::" << __func__
4086 << "(" << std::hex << _client << "): "
4087 << "event="
4088 << std::hex << _service << "."
4089 << std::hex << _instance << "."
4090 << std::hex << _eventgroup << "."
4091 << std::hex << _event
4092 << " state=" << std::hex << (int)its_state->second
4093 << " me="
4094 << std::hex << get_client();
4095 #endif
4096 if (its_state->second == subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED) {
4097 // Subscription already acknowledged!
4098 if (_client == get_client()) {
4099 host_->on_subscription_status(_service, _instance, _eventgroup, _event, 0x0 /*OK*/);
4100 } else {
4101 stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event);
4102 }
4103 }
4104 }
4105 }
4106
register_sd_acceptance_handler(const sd_acceptance_handler_t & _handler) const4107 void routing_manager_impl::register_sd_acceptance_handler(
4108 const sd_acceptance_handler_t& _handler) const {
4109 if (discovery_) {
4110 discovery_->register_sd_acceptance_handler(_handler);
4111 }
4112 }
4113
register_reboot_notification_handler(const reboot_notification_handler_t & _handler) const4114 void routing_manager_impl::register_reboot_notification_handler(
4115 const reboot_notification_handler_t& _handler) const {
4116 if (discovery_) {
4117 discovery_->register_reboot_notification_handler(_handler);
4118 }
4119 }
4120
register_routing_ready_handler(const routing_ready_handler_t & _handler)4121 void routing_manager_impl::register_routing_ready_handler(
4122 const routing_ready_handler_t& _handler) {
4123 routing_ready_handler_ = _handler;
4124 }
4125
register_routing_state_handler(const routing_state_handler_t & _handler)4126 void routing_manager_impl::register_routing_state_handler(
4127 const routing_state_handler_t& _handler) {
4128 routing_state_handler_ = _handler;
4129 }
4130
sd_acceptance_enabled(const boost::asio::ip::address & _address,const configuration::port_range_t & _range,bool _reliable)4131 void routing_manager_impl::sd_acceptance_enabled(
4132 const boost::asio::ip::address& _address,
4133 const configuration::port_range_t& _range, bool _reliable) {
4134 expire_subscriptions(_address, _range, _reliable);
4135 expire_services(_address, _range, _reliable);
4136 }
4137
memory_log_timer_cbk(boost::system::error_code const & _error)4138 void routing_manager_impl::memory_log_timer_cbk(
4139 boost::system::error_code const & _error) {
4140 if (_error) {
4141 return;
4142 }
4143 #ifndef _WIN32
4144 static const std::uint32_t its_pagesize = static_cast<std::uint32_t>(getpagesize() / 1024);
4145 #else
4146 static const std::uint32_t its_pagesize = 4096 / 1024;
4147 #endif
4148 std::FILE *its_file = std::fopen("/proc/self/statm", "r");
4149 if (!its_file) {
4150 VSOMEIP_ERROR << "memory_log_timer_cbk: couldn't open:"
4151 << std::string(std::strerror(errno));
4152 return;
4153 }
4154 std::uint64_t its_size(0);
4155 std::uint64_t its_rsssize(0);
4156 std::uint64_t its_sharedpages(0);
4157 std::uint64_t its_text(0);
4158 std::uint64_t its_lib(0);
4159 std::uint64_t its_data(0);
4160 std::uint64_t its_dirtypages(0);
4161
4162 if (EOF == std::fscanf(its_file, "%lu %lu %lu %lu %lu %lu %lu", &its_size,
4163 &its_rsssize, &its_sharedpages, &its_text, &its_lib,
4164 &its_data, &its_dirtypages)) {
4165 VSOMEIP_ERROR<< "memory_log_timer_cbk: error reading:"
4166 << std::string(std::strerror(errno));
4167 }
4168 std::fclose(its_file);
4169 #ifndef _WIN32
4170 struct timespec cputs, monots;
4171 clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &cputs);
4172 clock_gettime(CLOCK_MONOTONIC, &monots);
4173 #endif
4174
4175 VSOMEIP_INFO << "memory usage: "
4176 << "VmSize " << std::dec << its_size * its_pagesize << " kB, "
4177 << "VmRSS " << std::dec << its_rsssize * its_pagesize << " kB, "
4178 << "shared pages " << std::dec << its_sharedpages * its_pagesize << " kB, "
4179 << "text " << std::dec << its_text * its_pagesize << " kB, "
4180 << "data " << std::dec << its_data * its_pagesize << " kB "
4181 #ifndef _WIN32
4182 << "| monotonic time: " << std::dec << monots.tv_sec << "."
4183 << std::dec << monots.tv_nsec << " cpu time: "
4184 << std::dec << cputs.tv_sec << "." << std::dec << cputs.tv_nsec
4185 #endif
4186 ;
4187
4188 {
4189 std::lock_guard<std::mutex> its_lock(memory_log_timer_mutex_);
4190 boost::system::error_code ec;
4191 memory_log_timer_.expires_from_now(std::chrono::seconds(
4192 configuration_->get_log_memory_interval()), ec);
4193 memory_log_timer_.async_wait(
4194 std::bind(&routing_manager_impl::memory_log_timer_cbk, this,
4195 std::placeholders::_1));
4196 }
4197 }
4198
status_log_timer_cbk(boost::system::error_code const & _error)4199 void routing_manager_impl::status_log_timer_cbk(
4200 boost::system::error_code const & _error) {
4201 if (_error) {
4202 return;
4203 }
4204
4205 ep_mgr_impl_->print_status();
4206 {
4207 std::lock_guard<std::mutex> its_lock(status_log_timer_mutex_);
4208 boost::system::error_code ec;
4209 status_log_timer_.expires_from_now(std::chrono::seconds(
4210 configuration_->get_log_status_interval()), ec);
4211 status_log_timer_.async_wait(
4212 std::bind(&routing_manager_impl::status_log_timer_cbk, this,
4213 std::placeholders::_1));
4214 }
4215 }
4216
4217 void
on_unsubscribe_ack(client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,remote_subscription_id_t _id)4218 routing_manager_impl::on_unsubscribe_ack(client_t _client,
4219 service_t _service, instance_t _instance, eventgroup_t _eventgroup,
4220 remote_subscription_id_t _id) {
4221 std::shared_ptr<eventgroupinfo> its_info
4222 = find_eventgroup(_service, _instance, _eventgroup);
4223 if (its_info) {
4224 update_remote_subscription_mutex_.lock();
4225 const auto its_subscription = its_info->get_remote_subscription(_id);
4226 if (its_subscription) {
4227 its_info->remove_remote_subscription(_id);
4228
4229 std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
4230 remote_subscribers_[_service][_instance].erase(_client);
4231
4232 if (its_info->get_remote_subscriptions().size() == 0) {
4233 for (const auto &its_event : its_info->get_events()) {
4234 bool has_remote_subscriber(false);
4235 for (const auto &its_eventgroup : its_event->get_eventgroups()) {
4236 const auto its_eventgroup_info
4237 = find_eventgroup(_service, _instance, its_eventgroup);
4238 if (its_eventgroup_info
4239 && its_eventgroup_info->get_remote_subscriptions().size() > 0) {
4240 has_remote_subscriber = true;
4241 }
4242 }
4243
4244 if (!has_remote_subscriber && its_event->is_shadow()) {
4245 its_event->unset_payload();
4246 }
4247 }
4248 }
4249 } else {
4250 VSOMEIP_ERROR << __func__
4251 << ": Unknown StopSubscribe " << std::dec << _id << " for eventgroup ["
4252 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
4253 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
4254 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
4255 }
4256 update_remote_subscription_mutex_.unlock();
4257 } else {
4258 VSOMEIP_ERROR << __func__
4259 << ": Received StopSubscribe for unknown eventgroup: ("
4260 << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
4261 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
4262 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
4263 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
4264 }
4265 }
4266
on_connect(const std::shared_ptr<endpoint> & _endpoint)4267 void routing_manager_impl::on_connect(const std::shared_ptr<endpoint>& _endpoint) {
4268 (void)_endpoint;
4269 }
on_disconnect(const std::shared_ptr<endpoint> & _endpoint)4270 void routing_manager_impl::on_disconnect(const std::shared_ptr<endpoint>& _endpoint) {
4271 (void)_endpoint;
4272 }
send_subscription(const client_t _offering_client,const service_t _service,const instance_t _instance,const eventgroup_t _eventgroup,const major_version_t _major,const std::set<client_t> & _clients,const remote_subscription_id_t _id)4273 void routing_manager_impl::send_subscription(
4274 const client_t _offering_client,
4275 const service_t _service, const instance_t _instance,
4276 const eventgroup_t _eventgroup, const major_version_t _major,
4277 const std::set<client_t> &_clients,
4278 const remote_subscription_id_t _id) {
4279 if (host_->get_client() == _offering_client) {
4280 auto self = shared_from_this();
4281 for (const auto& its_client : _clients) {
4282 host_->on_subscription(_service, _instance, _eventgroup, its_client, own_uid_, own_gid_, true,
4283 [this, self, _service, _instance, _eventgroup, its_client, _id]
4284 (const bool _is_accepted) {
4285 try {
4286 if (!_is_accepted) {
4287 const auto its_callback = std::bind(
4288 &routing_manager_stub_host::on_subscribe_nack,
4289 std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
4290 its_client, _service, _instance,
4291 _eventgroup, ANY_EVENT, _id, false);
4292 io_.post(its_callback);
4293 } else {
4294 const auto its_callback = std::bind(
4295 &routing_manager_stub_host::on_subscribe_ack,
4296 std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
4297 its_client, _service, _instance,
4298 _eventgroup, ANY_EVENT, _id);
4299 io_.post(its_callback);
4300 }
4301 } catch (const std::exception &e) {
4302 VSOMEIP_ERROR << __func__ << e.what();
4303 }
4304 });
4305 }
4306 } else { // service hosted by local client
4307 for (const auto& its_client : _clients) {
4308 if (!stub_->send_subscribe(find_local(_offering_client), its_client,
4309 _service, _instance, _eventgroup, _major, ANY_EVENT, _id)) {
4310 try {
4311 const auto its_callback = std::bind(
4312 &routing_manager_stub_host::on_subscribe_nack,
4313 std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
4314 its_client, _service, _instance, _eventgroup,
4315 ANY_EVENT, _id, true);
4316 io_.post(its_callback);
4317 } catch (const std::exception &e) {
4318 VSOMEIP_ERROR << __func__ << e.what();
4319 }
4320 }
4321 }
4322 }
4323 }
4324
cleanup_server_endpoint(service_t _service,const std::shared_ptr<endpoint> & _endpoint)4325 void routing_manager_impl::cleanup_server_endpoint(
4326 service_t _service, const std::shared_ptr<endpoint>& _endpoint) {
4327 if (_endpoint) {
4328 // Clear service_instances_, check whether any service still
4329 // uses this endpoint and clear server endpoint if no service
4330 // remains using it
4331 if (ep_mgr_impl_->remove_instance(_service, _endpoint.get())) {
4332 if (ep_mgr_impl_->remove_server_endpoint(
4333 _endpoint->get_local_port(), _endpoint->is_reliable())) {
4334 // Stop endpoint (close socket) to release its async_handlers!
4335 _endpoint->stop();
4336 }
4337 }
4338 }
4339 }
4340
pending_remote_offer_add(service_t _service,instance_t _instance)4341 pending_remote_offer_id_t routing_manager_impl::pending_remote_offer_add(
4342 service_t _service, instance_t _instance) {
4343 std::lock_guard<std::mutex> its_lock(pending_remote_offers_mutex_);
4344 if (++pending_remote_offer_id_ == 0) {
4345 pending_remote_offer_id_++;
4346 }
4347 pending_remote_offers_[pending_remote_offer_id_] = std::make_pair(_service,
4348 _instance);
4349 return pending_remote_offer_id_;
4350 }
4351
pending_remote_offer_remove(pending_remote_offer_id_t _id)4352 std::pair<service_t, instance_t> routing_manager_impl::pending_remote_offer_remove(
4353 pending_remote_offer_id_t _id) {
4354 std::lock_guard<std::mutex> its_lock(pending_remote_offers_mutex_);
4355 std::pair<service_t, instance_t> ret = std::make_pair(ANY_SERVICE,
4356 ANY_INSTANCE);
4357 auto found_si = pending_remote_offers_.find(_id);
4358 if (found_si != pending_remote_offers_.end()) {
4359 ret = found_si->second;
4360 pending_remote_offers_.erase(found_si);
4361 }
4362 return ret;
4363 }
4364
on_resend_provided_events_response(pending_remote_offer_id_t _id)4365 void routing_manager_impl::on_resend_provided_events_response(
4366 pending_remote_offer_id_t _id) {
4367 const std::pair<service_t, instance_t> its_service =
4368 pending_remote_offer_remove(_id);
4369 if (its_service.first != ANY_SERVICE) {
4370 // create server endpoint
4371 std::shared_ptr<serviceinfo> its_info = find_service(its_service.first,
4372 its_service.second);
4373 if (its_info) {
4374 its_info->set_ttl(DEFAULT_TTL);
4375 init_service_info(its_service.first, its_service.second, true);
4376 }
4377 }
4378 }
4379
print_stub_status() const4380 void routing_manager_impl::print_stub_status() const {
4381 stub_->print_endpoint_status();
4382 }
4383
service_endpoint_connected(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,const std::shared_ptr<endpoint> & _endpoint,bool _unreliable_only)4384 void routing_manager_impl::service_endpoint_connected(
4385 service_t _service, instance_t _instance, major_version_t _major,
4386 minor_version_t _minor, const std::shared_ptr<endpoint>& _endpoint,
4387 bool _unreliable_only) {
4388
4389 if (!_unreliable_only) {
4390 // Mark only TCP-only and TCP+UDP services available here
4391 // UDP-only services are already marked as available in add_routing_info
4392 on_availability(_service, _instance, true, _major, _minor);
4393 stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance,
4394 _major, _minor);
4395 }
4396
4397 std::shared_ptr<boost::asio::steady_timer> its_timer =
4398 std::make_shared<boost::asio::steady_timer>(io_);
4399 boost::system::error_code ec;
4400 its_timer->expires_from_now(std::chrono::milliseconds(3), ec);
4401 if (!ec) {
4402 its_timer->async_wait(
4403 std::bind(&routing_manager_impl::call_sd_endpoint_connected,
4404 std::static_pointer_cast<routing_manager_impl>(
4405 shared_from_this()), std::placeholders::_1,
4406 _service, _instance, _endpoint, its_timer));
4407 } else {
4408 VSOMEIP_ERROR << __func__ << " " << ec.message();
4409 }
4410 }
4411
service_endpoint_disconnected(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,const std::shared_ptr<endpoint> & _endpoint)4412 void routing_manager_impl::service_endpoint_disconnected(
4413 service_t _service, instance_t _instance, major_version_t _major,
4414 minor_version_t _minor, const std::shared_ptr<endpoint>& _endpoint) {
4415 (void)_endpoint;
4416 on_availability(_service, _instance, false, _major, _minor);
4417 stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance,
4418 _major, _minor);
4419 VSOMEIP_WARNING << __func__ << ": lost connection to remote service: ["
4420 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
4421 << std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
4422 }
4423
4424 void
send_unsubscription(client_t _offering_client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,major_version_t _major,const std::set<client_t> & _removed,remote_subscription_id_t _id)4425 routing_manager_impl::send_unsubscription(client_t _offering_client,
4426 service_t _service, instance_t _instance,
4427 eventgroup_t _eventgroup, major_version_t _major,
4428 const std::set<client_t> &_removed,
4429 remote_subscription_id_t _id) {
4430
4431 (void)_major; // TODO: Remove completely?
4432
4433 if (host_->get_client() == _offering_client) {
4434 auto self = shared_from_this();
4435 for (const auto& its_client : _removed) {
4436 host_->on_subscription(_service, _instance,
4437 _eventgroup, its_client, own_uid_, own_gid_, false,
4438 [this, self, _service, _instance, _eventgroup,
4439 its_client, _id]
4440 (const bool _is_accepted) {
4441 (void)_is_accepted;
4442 try {
4443 const auto its_callback = std::bind(
4444 &routing_manager_stub_host::on_unsubscribe_ack,
4445 std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
4446 its_client, _service, _instance, _eventgroup, _id);
4447 io_.post(its_callback);
4448 } catch (const std::exception &e) {
4449 VSOMEIP_ERROR << __func__ << e.what();
4450 }
4451 }
4452 );
4453 }
4454 } else {
4455 for (const auto& its_client : _removed) {
4456 if (!stub_->send_unsubscribe(find_local(_offering_client), its_client,
4457 _service, _instance, _eventgroup, ANY_EVENT, _id)) {
4458 try {
4459 const auto its_callback = std::bind(
4460 &routing_manager_stub_host::on_unsubscribe_ack,
4461 std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
4462 its_client, _service, _instance, _eventgroup, _id);
4463 io_.post(its_callback);
4464 } catch (const std::exception &e) {
4465 VSOMEIP_ERROR << __func__ << e.what();
4466 }
4467 }
4468 }
4469 }
4470 }
4471
4472 void
send_expired_subscription(client_t _offering_client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,const std::set<client_t> & _removed,remote_subscription_id_t _id)4473 routing_manager_impl::send_expired_subscription(client_t _offering_client,
4474 service_t _service, instance_t _instance,
4475 eventgroup_t _eventgroup,
4476 const std::set<client_t> &_removed,
4477 remote_subscription_id_t _id) {
4478
4479 if (host_->get_client() == _offering_client) {
4480 auto self = shared_from_this();
4481 for (const auto its_client : _removed) {
4482 host_->on_subscription(_service, _instance,
4483 _eventgroup, its_client, own_uid_, own_gid_, false,
4484 [] (const bool _subscription_accepted){
4485 (void)_subscription_accepted;
4486 });
4487 }
4488 } else {
4489 for (const auto its_client : _removed) {
4490 stub_->send_expired_subscription(find_local(_offering_client), its_client,
4491 _service, _instance, _eventgroup, ANY_EVENT, _id);
4492 }
4493 }
4494 }
4495
4496 bool
update_security_policy_configuration(uint32_t _uid,uint32_t _gid,const std::shared_ptr<policy> & _policy,const std::shared_ptr<payload> & _payload,const security_update_handler_t & _handler)4497 routing_manager_impl::update_security_policy_configuration(
4498 uint32_t _uid, uint32_t _gid,
4499 const std::shared_ptr<policy> &_policy,
4500 const std::shared_ptr<payload> &_payload,
4501 const security_update_handler_t &_handler) {
4502
4503 if (stub_)
4504 return stub_->update_security_policy_configuration(_uid, _gid,
4505 _policy, _payload, _handler);
4506
4507 return (false);
4508 }
4509
4510 bool
remove_security_policy_configuration(uint32_t _uid,uint32_t _gid,const security_update_handler_t & _handler)4511 routing_manager_impl::remove_security_policy_configuration(
4512 uint32_t _uid, uint32_t _gid,
4513 const security_update_handler_t &_handler) {
4514
4515 if (stub_)
4516 return stub_->remove_security_policy_configuration(_uid, _gid,
4517 _handler);
4518
4519 return (false);
4520 }
4521
insert_event_statistics(service_t _service,instance_t _instance,method_t _method,length_t _length)4522 bool routing_manager_impl::insert_event_statistics(service_t _service, instance_t _instance,
4523 method_t _method, length_t _length) {
4524
4525 static uint32_t its_max_messages = configuration_->get_statistics_max_messages();
4526 std::lock_guard<std::mutex> its_lock(message_statistics_mutex_);
4527 const auto its_tuple = std::make_tuple(_service, _instance, _method);
4528 const auto its_main_s = message_statistics_.find(its_tuple);
4529 if (its_main_s != message_statistics_.end()) {
4530 // increase counter and calculate moving avergae for payload length
4531 its_main_s->second.avg_length_ =
4532 (its_main_s->second.avg_length_ * its_main_s->second.counter_ + _length) /
4533 (its_main_s->second.counter_ + 1);
4534 its_main_s->second.counter_++;
4535
4536 if (its_tuple == message_to_discard_) {
4537 // check list for entry with least counter value
4538 uint32_t its_min_count(0xFFFFFFFF);
4539 auto its_tuple_to_discard = std::make_tuple(0xFFFF, 0xFFFF, 0xFFFF);
4540 for (const auto &it : message_statistics_) {
4541 if (it.second.counter_ < its_min_count) {
4542 its_min_count = it.second.counter_;
4543 its_tuple_to_discard = it.first;
4544 }
4545 }
4546 if (its_min_count != 0xFFFF
4547 && its_min_count < its_main_s->second.counter_) {
4548 // update message to discard with current message
4549 message_to_discard_ = its_tuple;
4550 }
4551 }
4552 } else {
4553 if (message_statistics_.size() < its_max_messages) {
4554 message_statistics_[its_tuple] = {1, _length};
4555 message_to_discard_ = its_tuple;
4556 } else {
4557 // no slot empty
4558 const auto it = message_statistics_.find(message_to_discard_);
4559 if (it != message_statistics_.end()
4560 && it->second.counter_ == 1) {
4561 message_statistics_.erase(message_to_discard_);
4562 message_statistics_[its_tuple] = {1, _length};
4563 message_to_discard_ = its_tuple;
4564 } else {
4565 // ignore message
4566 ignored_statistics_counter_++;
4567 return false;
4568 }
4569 }
4570 }
4571 return true;
4572 }
4573
statistics_log_timer_cbk(boost::system::error_code const & _error)4574 void routing_manager_impl::statistics_log_timer_cbk(boost::system::error_code const & _error) {
4575 if (!_error) {
4576 static uint32_t its_interval = configuration_->get_statistics_interval();
4577 its_interval = its_interval >= 1000 ? its_interval : 1000;
4578 static uint32_t its_min_freq = configuration_->get_statistics_min_freq();
4579 std::stringstream its_log;
4580 {
4581 std::lock_guard<std::mutex> its_lock(message_statistics_mutex_);
4582 for (const auto &s : message_statistics_) {
4583 if (s.second.counter_ / (its_interval / 1000) >= its_min_freq) {
4584 uint16_t its_subscribed(0);
4585 std::shared_ptr<event> its_event = find_event(std::get<0>(s.first), std::get<1>(s.first), std::get<2>(s.first));
4586 if (its_event) {
4587 if (!its_event->is_provided()) {
4588 its_subscribed = static_cast<std::uint16_t>(its_event->get_subscribers().size());
4589 }
4590 }
4591 its_log << std::hex << std::setw(4) << std::setfill('0')
4592 << std::get<0>(s.first) << "."
4593 << std::get<1>(s.first) << "."
4594 << std::get<2>(s.first) << ": #="
4595 << std::dec << s.second.counter_ << " L="
4596 << s.second.avg_length_ << " S="
4597 << std::dec << its_subscribed << ", ";
4598 }
4599 }
4600
4601 if (ignored_statistics_counter_) {
4602 its_log << std::dec << " #ignored: " << ignored_statistics_counter_;
4603 }
4604
4605 message_statistics_.clear();
4606 message_to_discard_ = std::make_tuple(0x00, 0x00, 0x00);
4607 ignored_statistics_counter_ = 0;
4608 }
4609
4610 if (its_log.str().length() > 0) {
4611 VSOMEIP_INFO << "Received events statistics: [" << its_log.str() << "]";
4612 }
4613
4614 {
4615 std::lock_guard<std::mutex> its_lock(statistics_log_timer_mutex_);
4616 statistics_log_timer_.expires_from_now(std::chrono::milliseconds(its_interval));
4617 statistics_log_timer_.async_wait(
4618 std::bind(&routing_manager_impl::statistics_log_timer_cbk,
4619 this, std::placeholders::_1));
4620 }
4621 }
4622 }
4623
send_suspend() const4624 void routing_manager_impl::send_suspend() const {
4625
4626 stub_->send_suspend();
4627 }
4628
4629 } // namespace vsomeip_v3
4630