1 // Copyright (C) 2014-2018 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 
6 #include <climits>
7 #include <iomanip>
8 #include <memory>
9 #include <sstream>
10 #include <forward_list>
11 
12 #ifndef _WIN32
13 #include <unistd.h>
14 #include <cstdio>
15 #include <time.h>
16 #endif
17 
18 #include <boost/asio/steady_timer.hpp>
19 
20 #include <vsomeip/constants.hpp>
21 #include <vsomeip/payload.hpp>
22 #include <vsomeip/runtime.hpp>
23 #include <vsomeip/internal/logger.hpp>
24 
25 #include "../include/event.hpp"
26 #include "../include/eventgroupinfo.hpp"
27 #include "../include/remote_subscription.hpp"
28 #include "../include/routing_manager_host.hpp"
29 #include "../include/routing_manager_impl.hpp"
30 #include "../include/routing_manager_stub.hpp"
31 #include "../include/serviceinfo.hpp"
32 #include "../../configuration/include/configuration.hpp"
33 #include "../../security/include/security.hpp"
34 
35 #include "../../endpoints/include/endpoint_definition.hpp"
36 #include "../../endpoints/include/local_client_endpoint_impl.hpp"
37 #include "../../endpoints/include/tcp_client_endpoint_impl.hpp"
38 #include "../../endpoints/include/tcp_server_endpoint_impl.hpp"
39 #include "../../endpoints/include/udp_client_endpoint_impl.hpp"
40 #include "../../endpoints/include/udp_server_endpoint_impl.hpp"
41 #include "../../endpoints/include/virtual_server_endpoint_impl.hpp"
42 #include "../../message/include/deserializer.hpp"
43 #include "../../message/include/message_impl.hpp"
44 #include "../../message/include/serializer.hpp"
45 #include "../../service_discovery/include/constants.hpp"
46 #include "../../service_discovery/include/defines.hpp"
47 #include "../../service_discovery/include/runtime.hpp"
48 #include "../../service_discovery/include/service_discovery.hpp"
49 #include "../../utility/include/byteorder.hpp"
50 #include "../../utility/include/utility.hpp"
51 #include "../../plugin/include/plugin_manager_impl.hpp"
52 #ifdef USE_DLT
53 #include "../../tracing/include/connector_impl.hpp"
54 #endif
55 
56 #ifndef ANDROID
57 #include "../../e2e_protection/include/buffer/buffer.hpp"
58 #include "../../e2e_protection/include/e2exf/config.hpp"
59 
60 #include "../../e2e_protection/include/e2e/profile/e2e_provider.hpp"
61 #endif
62 
63 #ifdef USE_DLT
64 #include "../../tracing/include/connector_impl.hpp"
65 #endif
66 
67 namespace vsomeip_v3 {
68 
69 #ifdef ANDROID
70 namespace sd {
~runtime()71 runtime::~runtime() {}
72 }
73 #endif
74 
routing_manager_impl(routing_manager_host * _host)75 routing_manager_impl::routing_manager_impl(routing_manager_host *_host) :
76         routing_manager_base(_host),
77         version_log_timer_(_host->get_io()),
78         if_state_running_(false),
79         sd_route_set_(false),
80         routing_running_(false),
81         status_log_timer_(_host->get_io()),
82         memory_log_timer_(_host->get_io()),
83         ep_mgr_impl_(std::make_shared<endpoint_manager_impl>(this, io_, configuration_)),
84         pending_remote_offer_id_(0),
85         last_resume_(std::chrono::steady_clock::now().min()),
86         statistics_log_timer_(_host->get_io()),
87         ignored_statistics_counter_(0)
88 {
89 }
90 
~routing_manager_impl()91 routing_manager_impl::~routing_manager_impl() {
92     utility::remove_lockfile(configuration_);
93     utility::reset_client_ids();
94 }
95 
get_io()96 boost::asio::io_service & routing_manager_impl::get_io() {
97     return routing_manager_base::get_io();
98 }
99 
get_client() const100 client_t routing_manager_impl::get_client() const {
101     return routing_manager_base::get_client();
102 }
103 
find_local_clients(service_t _service,instance_t _instance)104 std::set<client_t> routing_manager_impl::find_local_clients(service_t _service, instance_t _instance) {
105     return routing_manager_base::find_local_clients(_service, _instance);
106 }
107 
find_local_client(service_t _service,instance_t _instance)108 client_t routing_manager_impl::find_local_client(service_t _service, instance_t _instance) {
109     return routing_manager_base::find_local_client(_service, _instance);
110 }
111 
is_subscribe_to_any_event_allowed(credentials_t _credentials,client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup)112 bool routing_manager_impl::is_subscribe_to_any_event_allowed(credentials_t _credentials, client_t _client,
113         service_t _service, instance_t _instance, eventgroup_t _eventgroup) {
114     return routing_manager_base::is_subscribe_to_any_event_allowed(_credentials, _client,
115             _service, _instance, _eventgroup);
116 }
117 
init()118 void routing_manager_impl::init() {
119     routing_manager_base::init(ep_mgr_impl_);
120 
121     // TODO: Only instantiate the stub if needed
122     stub_ = std::make_shared<routing_manager_stub>(this, configuration_);
123     stub_->init();
124 
125     if (configuration_->is_sd_enabled()) {
126         VSOMEIP_INFO<< "Service Discovery enabled. Trying to load module.";
127         auto its_plugin = plugin_manager::get()->get_plugin(
128                 plugin_type_e::SD_RUNTIME_PLUGIN, VSOMEIP_SD_LIBRARY);
129         if (its_plugin) {
130             VSOMEIP_INFO << "Service Discovery module loaded.";
131             discovery_ = std::dynamic_pointer_cast<sd::runtime>(its_plugin)->create_service_discovery(this, configuration_);
132             discovery_->init();
133         } else {
134             VSOMEIP_ERROR << "Service Discovery module could not be loaded!";
135             std::exit(EXIT_FAILURE);
136         }
137     }
138 
139 #ifndef ANDROID
140     if( configuration_->is_e2e_enabled()) {
141         VSOMEIP_INFO << "E2E protection enabled.";
142 
143         const char *its_e2e_module = getenv(VSOMEIP_ENV_E2E_PROTECTION_MODULE);
144         std::string plugin_name = its_e2e_module != nullptr ? its_e2e_module : VSOMEIP_E2E_LIBRARY;
145 
146         auto its_plugin = plugin_manager::get()->get_plugin(plugin_type_e::APPLICATION_PLUGIN, plugin_name);
147         if (its_plugin) {
148             VSOMEIP_INFO << "E2E module loaded.";
149             e2e_provider_ = std::dynamic_pointer_cast<e2e::e2e_provider>(its_plugin);
150         }
151     }
152 
153     if(e2e_provider_) {
154         std::map<e2exf::data_identifier_t, std::shared_ptr<cfg::e2e>> its_e2e_configuration = configuration_->get_e2e_configuration();
155         for (auto &identifier : its_e2e_configuration) {
156             if(!e2e_provider_->add_configuration(identifier.second)) {
157                 VSOMEIP_INFO << "Unknown E2E profile: " << identifier.second->profile << ", skipping ...";
158             }
159         }
160     }
161 #endif
162 }
163 
start()164 void routing_manager_impl::start() {
165 #ifndef _WIN32
166     boost::asio::ip::address its_multicast;
167     try {
168         its_multicast = boost::asio::ip::address::from_string(configuration_->get_sd_multicast());
169     } catch (...) {
170         VSOMEIP_ERROR << "Illegal multicast address \""
171                 << configuration_->get_sd_multicast()
172                 << "\". Please check your configuration.";
173     }
174 
175     netlink_connector_ = std::make_shared<netlink_connector>(
176             host_->get_io(), configuration_->get_unicast_address(), its_multicast);
177     netlink_connector_->register_net_if_changes_handler(
178             std::bind(&routing_manager_impl::on_net_interface_or_route_state_changed,
179             this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
180     netlink_connector_->start();
181 #else
182     {
183         std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
184         start_ip_routing();
185     }
186 #endif
187 
188     stub_->start();
189     host_->on_state(state_type_e::ST_REGISTERED);
190 
191     if (configuration_->log_version()) {
192         std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
193         version_log_timer_.expires_from_now(
194                 std::chrono::seconds(0));
195         version_log_timer_.async_wait(std::bind(&routing_manager_impl::log_version_timer_cbk,
196                 this, std::placeholders::_1));
197     }
198 #ifndef _WIN32
199     if (configuration_->log_memory()) {
200         std::lock_guard<std::mutex> its_lock(memory_log_timer_mutex_);
201         boost::system::error_code ec;
202         memory_log_timer_.expires_from_now(std::chrono::seconds(0), ec);
203         memory_log_timer_.async_wait(
204                 std::bind(&routing_manager_impl::memory_log_timer_cbk, this,
205                         std::placeholders::_1));
206     }
207 #endif
208     if (configuration_->log_status()) {
209         std::lock_guard<std::mutex> its_lock(status_log_timer_mutex_);
210         boost::system::error_code ec;
211         status_log_timer_.expires_from_now(std::chrono::seconds(0), ec);
212         status_log_timer_.async_wait(
213                 std::bind(&routing_manager_impl::status_log_timer_cbk, this,
214                         std::placeholders::_1));
215     }
216 
217     if (configuration_->log_statistics()) {
218         std::lock_guard<std::mutex> its_lock(statistics_log_timer_mutex_);
219         boost::system::error_code ec;
220         statistics_log_timer_.expires_from_now(std::chrono::seconds(0), ec);
221         statistics_log_timer_.async_wait(
222                 std::bind(&routing_manager_impl::statistics_log_timer_cbk, this,
223                         std::placeholders::_1));
224     }
225 }
226 
stop()227 void routing_manager_impl::stop() {
228     // Ensure to StopOffer all services that are offered by the application hosting the rm
229     local_services_map_t its_services;
230     {
231         std::lock_guard<std::mutex> its_lock(local_services_mutex_);
232         for (const auto& s : local_services_) {
233             for (const auto& i : s.second) {
234                 if (std::get<2>(i.second) == client_) {
235                     its_services[s.first][i.first] = i.second;
236                 }
237             }
238         }
239 
240     }
241     for (const auto& s : its_services) {
242         for (const auto& i : s.second) {
243             on_stop_offer_service(std::get<2>(i.second), s.first, i.first,
244                     std::get<0>(i.second), std::get<1>(i.second));
245         }
246     }
247 
248     {
249         std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
250         version_log_timer_.cancel();
251     }
252 #ifndef _WIN32
253     {
254         boost::system::error_code ec;
255         std::lock_guard<std::mutex> its_lock(memory_log_timer_mutex_);
256         memory_log_timer_.cancel(ec);
257     }
258     if (netlink_connector_) {
259         netlink_connector_->stop();
260     }
261 #endif
262 
263     {
264         std::lock_guard<std::mutex> its_lock(status_log_timer_mutex_);
265         boost::system::error_code ec;
266         status_log_timer_.cancel(ec);
267     }
268 
269     {
270         std::lock_guard<std::mutex> its_lock(statistics_log_timer_mutex_);
271         boost::system::error_code ec;
272         statistics_log_timer_.cancel(ec);
273     }
274 
275     host_->on_state(state_type_e::ST_DEREGISTERED);
276 
277     if (discovery_)
278         discovery_->stop();
279     stub_->stop();
280 
281     for (const auto& client : ep_mgr_->get_connected_clients()) {
282         if (client != VSOMEIP_ROUTING_CLIENT) {
283             remove_local(client, true);
284         }
285     }
286 }
287 
insert_offer_command(service_t _service,instance_t _instance,uint8_t _command,client_t _client,major_version_t _major,minor_version_t _minor)288 bool routing_manager_impl::insert_offer_command(service_t _service, instance_t _instance, uint8_t _command,
289                 client_t _client, major_version_t _major, minor_version_t _minor) {
290     std::lock_guard<std::mutex> its_lock(offer_serialization_mutex_);
291     // flag to indicate whether caller of this function can start directly processing the command
292     bool must_process(false);
293     auto found_service_instance = offer_commands_.find(std::make_pair(_service, _instance));
294     if (found_service_instance != offer_commands_.end()) {
295         // if nothing is queued
296         if (found_service_instance->second.empty()) {
297             must_process = true;
298         }
299         found_service_instance->second.push_back(
300                 std::make_tuple(_command, _client, _major, _minor));
301     } else {
302         // nothing is queued -> add command to queue and process command directly
303         offer_commands_[std::make_pair(_service, _instance)].push_back(
304                 std::make_tuple(_command, _client, _major, _minor));
305         must_process = true;
306     }
307     return must_process;
308 }
309 
erase_offer_command(service_t _service,instance_t _instance)310 bool routing_manager_impl::erase_offer_command(service_t _service, instance_t _instance) {
311     std::lock_guard<std::mutex> its_lock(offer_serialization_mutex_);
312     auto found_service_instance = offer_commands_.find(std::make_pair(_service, _instance));
313     if (found_service_instance != offer_commands_.end()) {
314         // erase processed command
315         if (!found_service_instance->second.empty()) {
316             found_service_instance->second.pop_front();
317             if (!found_service_instance->second.empty()) {
318                 // check for other commands to be processed
319                 auto its_command = found_service_instance->second.front();
320                 if (std::get<0>(its_command) == VSOMEIP_OFFER_SERVICE) {
321                     io_.post([&, its_command, _service, _instance](){
322                         offer_service(std::get<1>(its_command), _service, _instance,
323                             std::get<2>(its_command), std::get<3>(its_command), false);
324                     });
325                 } else {
326                     io_.post([&, its_command, _service, _instance](){
327                         stop_offer_service(std::get<1>(its_command), _service, _instance,
328                             std::get<2>(its_command), std::get<3>(its_command), false);
329                     });
330                 }
331             }
332         }
333     }
334     return true;
335 }
336 
offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)337 bool routing_manager_impl::offer_service(client_t _client,
338         service_t _service, instance_t _instance,
339         major_version_t _major, minor_version_t _minor) {
340 
341     return offer_service(_client, _service, _instance, _major, _minor, true);
342 }
343 
offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,bool _must_queue)344 bool routing_manager_impl::offer_service(client_t _client,
345         service_t _service, instance_t _instance,
346         major_version_t _major, minor_version_t _minor,
347         bool _must_queue) {
348 
349     VSOMEIP_INFO << "OFFER("
350         << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
351         << std::hex << std::setw(4) << std::setfill('0') << _service << "."
352         << std::hex << std::setw(4) << std::setfill('0') << _instance
353         << ":" << std::dec << int(_major) << "." << std::dec << _minor << "]"
354         << " (" << std::boolalpha << _must_queue << ")";
355 
356     // only queue commands if method was NOT called via erase_offer_command()
357     if (_must_queue) {
358         if (!insert_offer_command(_service, _instance, VSOMEIP_OFFER_SERVICE,
359                 _client, _major, _minor)) {
360             return false;
361         }
362     }
363 
364     // Check if the application hosted by routing manager is allowed to offer
365     // offer_service requests of local proxies are checked in rms::on:message
366     if (_client == get_client()) {
367 #ifdef _WIN32
368         std::uint32_t its_routing_uid = ANY_UID;
369         std::uint32_t its_routing_gid = ANY_GID;
370 #else
371         std::uint32_t its_routing_uid = getuid();
372         std::uint32_t its_routing_gid = getgid();
373 #endif
374         if (!security::get()->is_offer_allowed(its_routing_uid, its_routing_gid,
375                         _client, _service, _instance)) {
376             VSOMEIP_WARNING << "routing_manager_impl::offer_service: "
377                     << std::hex << "Security: Client 0x" << _client
378                     << " isn't allowed to offer the following service/instance "
379                     << _service << "/" << _instance
380                     << " ~> Skip offer!";
381             erase_offer_command(_service, _instance);
382             return false;
383         }
384     }
385 
386     if (!handle_local_offer_service(_client, _service, _instance, _major, _minor)) {
387         erase_offer_command(_service, _instance);
388         return false;
389     }
390 
391     {
392         std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
393         if (if_state_running_) {
394             init_service_info(_service, _instance, true);
395         } else {
396             pending_sd_offers_.push_back(std::make_pair(_service, _instance));
397         }
398     }
399 
400     if (discovery_) {
401         std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
402         if (its_info) {
403             discovery_->offer_service(its_info);
404         }
405     }
406 
407     {
408         std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
409         std::set<event_t> its_already_subscribed_events;
410         for (auto &ps : pending_subscriptions_) {
411             if (ps.service_ == _service
412                     && ps.instance_ == _instance
413                     && ps.major_ == _major) {
414                 insert_subscription(ps.service_, ps.instance_,
415                         ps.eventgroup_, ps.event_, client_, &its_already_subscribed_events);
416 #if 0
417                 VSOMEIP_ERROR << __func__
418                         << ": event="
419                         << std::hex << ps.service_ << "."
420                         << std::hex << ps.instance_ << "."
421                         << std::hex << ps.event_;
422 #endif
423             }
424         }
425 
426         send_pending_subscriptions(_service, _instance, _major);
427     }
428     stub_->on_offer_service(_client, _service, _instance, _major, _minor);
429     on_availability(_service, _instance, true, _major, _minor);
430     erase_offer_command(_service, _instance);
431     return true;
432 }
433 
stop_offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)434 void routing_manager_impl::stop_offer_service(client_t _client,
435         service_t _service, instance_t _instance,
436         major_version_t _major, minor_version_t _minor) {
437 
438     stop_offer_service(_client, _service, _instance, _major, _minor, true);
439 }
440 
stop_offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,bool _must_queue)441 void routing_manager_impl::stop_offer_service(client_t _client,
442         service_t _service, instance_t _instance,
443         major_version_t _major, minor_version_t _minor,
444         bool _must_queue) {
445 
446     VSOMEIP_INFO << "STOP OFFER("
447         << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
448         << std::hex << std::setw(4) << std::setfill('0') << _service << "."
449         << std::hex << std::setw(4) << std::setfill('0') << _instance
450         << ":" << std::dec << int(_major) << "." << _minor << "]"
451         << " (" << std::boolalpha << _must_queue << ")";
452 
453     if (_must_queue) {
454         if (!insert_offer_command(_service, _instance, VSOMEIP_STOP_OFFER_SERVICE,
455                 _client, _major, _minor)) {
456             return;
457         }
458     }
459 
460     bool is_local(false);
461     {
462         std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
463         is_local = (its_info && its_info->is_local());
464     }
465     if (is_local) {
466         {
467             std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
468             for (auto it = pending_sd_offers_.begin(); it != pending_sd_offers_.end(); ) {
469                 if (it->first == _service && it->second == _instance) {
470                     it = pending_sd_offers_.erase(it);
471                     break;
472                 } else {
473                     ++it;
474                 }
475             }
476         }
477 
478         on_stop_offer_service(_client, _service, _instance, _major, _minor);
479         stub_->on_stop_offer_service(_client, _service, _instance, _major, _minor);
480         on_availability(_service, _instance, false, _major, _minor);
481     } else {
482         VSOMEIP_WARNING << __func__ << " received STOP_OFFER("
483                 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
484                 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
485                 << std::hex << std::setw(4) << std::setfill('0') << _instance
486                 << ":" << std::dec << int(_major) << "." << _minor << "] "
487                 << "for remote service --> ignore";
488         erase_offer_command(_service, _instance);
489     }
490 }
491 
request_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)492 void routing_manager_impl::request_service(client_t _client, service_t _service,
493         instance_t _instance, major_version_t _major, minor_version_t _minor) {
494 
495     VSOMEIP_INFO << "REQUEST("
496         << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
497         << std::hex << std::setw(4) << std::setfill('0') << _service << "."
498         << std::hex << std::setw(4) << std::setfill('0') << _instance << ":"
499         << std::dec << int(_major) << "." << std::dec << _minor << "]";
500 
501     routing_manager_base::request_service(_client,
502             _service, _instance, _major, _minor);
503 
504     auto its_info = find_service(_service, _instance);
505     if (!its_info) {
506         requested_service_add(_client, _service, _instance, _major, _minor);
507         if (discovery_) {
508             if (!configuration_->is_local_service(_service, _instance)) {
509                 // Non local service instance ~> tell SD to find it!
510                 discovery_->request_service(_service, _instance, _major, _minor,
511                         DEFAULT_TTL);
512             } else {
513                 VSOMEIP_INFO << std::hex
514                         << "Avoid trigger SD find-service message"
515                         << " for local service/instance/major/minor: "
516                         << _service << "/" << _instance << std::dec
517                         << "/" << (uint32_t)_major << "/" << _minor;
518             }
519         }
520     } else {
521         if ((_major == its_info->get_major()
522                 || DEFAULT_MAJOR == its_info->get_major()
523                 || ANY_MAJOR == _major)
524                 && (_minor <= its_info->get_minor()
525                         || DEFAULT_MINOR == its_info->get_minor()
526                         || _minor == ANY_MINOR)) {
527             if(!its_info->is_local()) {
528                 requested_service_add(_client, _service, _instance, _major, _minor);
529                 if (discovery_) {
530                     // Non local service instance ~> tell SD to find it!
531                     discovery_->request_service(_service, _instance, _major,
532                             _minor, DEFAULT_TTL);
533                 }
534                 its_info->add_client(_client);
535                 ep_mgr_impl_->find_or_create_remote_client(_service, _instance);
536             }
537         }
538     }
539 
540     if (_client == get_client()) {
541         stub_->create_local_receiver();
542 
543         service_data_t request = { _service, _instance, _major, _minor };
544         std::set<service_data_t> requests;
545         requests.insert(request);
546         stub_->handle_requests(_client, requests);
547     }
548 }
549 
release_service(client_t _client,service_t _service,instance_t _instance)550 void routing_manager_impl::release_service(client_t _client, service_t _service,
551         instance_t _instance) {
552 
553     VSOMEIP_INFO << "RELEASE("
554         << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
555         << std::hex << std::setw(4) << std::setfill('0') << _service << "."
556         << std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
557 
558     if (host_->get_client() == _client) {
559         std::lock_guard<std::mutex> its_lock(pending_subscription_mutex_);
560         remove_pending_subscription(_service, _instance, 0xFFFF, ANY_EVENT);
561     }
562     routing_manager_base::release_service(_client, _service, _instance);
563     requested_service_remove(_client, _service, _instance);
564 
565     std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
566     if (its_info && !its_info->is_local()) {
567         if (!its_info->get_requesters_size()) {
568             if (discovery_) {
569                 discovery_->release_service(_service, _instance);
570                 discovery_->unsubscribe_all(_service, _instance);
571             }
572             ep_mgr_impl_->clear_client_endpoints(_service, _instance, true);
573             ep_mgr_impl_->clear_client_endpoints(_service, _instance, false);
574             its_info->set_endpoint(nullptr, true);
575             its_info->set_endpoint(nullptr, false);
576             unset_all_eventpayloads(_service, _instance);
577         }
578     } else {
579         if (discovery_) {
580             discovery_->release_service(_service, _instance);
581         }
582     }
583 }
584 
subscribe(client_t _client,uid_t _uid,gid_t _gid,service_t _service,instance_t _instance,eventgroup_t _eventgroup,major_version_t _major,event_t _event)585 void routing_manager_impl::subscribe(client_t _client, uid_t _uid, gid_t _gid,
586         service_t _service, instance_t _instance, eventgroup_t _eventgroup,
587         major_version_t _major, event_t _event) {
588 
589     VSOMEIP_INFO << "SUBSCRIBE("
590         << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
591         << std::hex << std::setw(4) << std::setfill('0') << _service << "."
592         << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
593         << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << ":"
594         << std::hex << std::setw(4) << std::setfill('0') << _event << ":"
595         << std::dec << (uint16_t)_major << "]";
596     const client_t its_local_client = find_local_client(_service, _instance);
597     if (get_client() == its_local_client) {
598 #ifdef VSOMEIP_ENABLE_COMPAT
599         routing_manager_base::set_incoming_subscription_state(_client, _service, _instance,
600                 _eventgroup, _event, subscription_state_e::IS_SUBSCRIBING);
601 #endif
602         auto self = shared_from_this();
603         host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, true,
604             [this, self, _client, _uid, _gid, _service, _instance, _eventgroup,
605                 _event, _major]
606                     (const bool _subscription_accepted) {
607             (void) ep_mgr_->find_or_create_local(_client);
608             if (!_subscription_accepted) {
609                 stub_->send_subscribe_nack(_client, _service, _instance, _eventgroup, _event);
610                 VSOMEIP_INFO << "Subscription request from client: 0x" << std::hex
611                              << _client << std::dec << " for eventgroup: 0x" << _eventgroup
612                              << " rejected from application handler.";
613                 return;
614             } else {
615                 stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event);
616             }
617             routing_manager_base::subscribe(_client, _uid, _gid, _service, _instance, _eventgroup, _major, _event);
618 #ifdef VSOMEIP_ENABLE_COMPAT
619             send_pending_notify_ones(_service, _instance, _eventgroup, _client);
620             routing_manager_base::erase_incoming_subscription_state(_client, _service, _instance,
621                     _eventgroup, _event);
622 #endif
623         });
624     } else {
625         if (discovery_) {
626             std::set<event_t> its_already_subscribed_events;
627 
628             // Note: The calls to insert_subscription & handle_subscription_state must not
629             // run concurrently to a call to on_subscribe_ack. Therefore the lock is acquired
630             // before calling insert_subscription and released after the call to
631             // handle_subscription_state.
632             std::unique_lock<std::mutex> its_critical(remote_subscription_state_mutex_);
633             bool inserted = insert_subscription(_service, _instance, _eventgroup,
634                     _event, _client, &its_already_subscribed_events);
635             const bool subscriber_is_rm_host = (get_client() == _client);
636             if (inserted) {
637                 if (0 == its_local_client) {
638                     handle_subscription_state(_client, _service, _instance, _eventgroup, _event);
639                     its_critical.unlock();
640                     static const ttl_t configured_ttl(configuration_->get_sd_ttl());
641                     notify_one_current_value(_client, _service, _instance,
642                             _eventgroup, _event, its_already_subscribed_events);
643 
644                     auto its_info = find_eventgroup(_service, _instance, _eventgroup);
645                     // if the subscriber is the rm_host itself: check if service
646                     // is available before subscribing via SD otherwise we sent
647                     // a StopSubscribe/Subscribe once the first offer is received
648                     if (its_info &&
649                             (!subscriber_is_rm_host || find_service(_service, _instance))) {
650                         discovery_->subscribe(_service, _instance, _eventgroup,
651                                 _major, configured_ttl,
652                                 its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT,
653                                 its_info);
654                     }
655                 } else {
656                     its_critical.unlock();
657                     if (is_available(_service, _instance, _major)) {
658                         stub_->send_subscribe(ep_mgr_->find_local(_service, _instance),
659                                _client, _service, _instance, _eventgroup, _major, _event,
660                                PENDING_SUBSCRIPTION_ID);
661                     }
662                 }
663             }
664             if (subscriber_is_rm_host) {
665                 std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
666                 subscription_data_t subscription = {
667                     _service, _instance, _eventgroup, _major, _event, _uid, _gid
668                 };
669                 pending_subscriptions_.insert(subscription);
670             }
671         } else {
672             VSOMEIP_ERROR<< "SOME/IP eventgroups require SD to be enabled!";
673         }
674     }
675 }
676 
unsubscribe(client_t _client,uid_t _uid,gid_t _gid,service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)677 void routing_manager_impl::unsubscribe(client_t _client, uid_t _uid, gid_t _gid,
678     service_t _service, instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
679 
680     VSOMEIP_INFO << "UNSUBSCRIBE("
681         << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
682         << std::hex << std::setw(4) << std::setfill('0') << _service << "."
683         << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
684         << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
685         << std::hex << std::setw(4) << std::setfill('0') << _event << "]";
686 
687     bool last_subscriber_removed(true);
688 
689     std::shared_ptr<eventgroupinfo> its_info
690         = find_eventgroup(_service, _instance, _eventgroup);
691     if (its_info) {
692         for (const auto& e : its_info->get_events()) {
693             if (e->get_event() == _event || ANY_EVENT == _event)
694                 e->remove_subscriber(_eventgroup, _client);
695         }
696         for (const auto& e : its_info->get_events()) {
697             if (e->has_subscriber(_eventgroup, ANY_CLIENT)) {
698                 last_subscriber_removed = false;
699                 break;
700             }
701         }
702     }
703 
704     if (discovery_) {
705         host_->on_subscription(_service, _instance, _eventgroup, _client, _uid, _gid, false,
706                 [](const bool _subscription_accepted){ (void)_subscription_accepted; });
707         if (0 == find_local_client(_service, _instance)) {
708             if (get_client() == _client) {
709                 std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
710                 remove_pending_subscription(_service, _instance, _eventgroup, _event);
711             }
712             if (last_subscriber_removed) {
713                 unset_all_eventpayloads(_service, _instance, _eventgroup);
714                 {
715                     auto tuple = std::make_tuple(_service, _instance, _eventgroup, _client);
716                     std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
717                     remote_subscription_state_.erase(tuple);
718                 }
719             }
720 
721             if (its_info &&
722                     (last_subscriber_removed || its_info->is_selective())) {
723 
724                 discovery_->unsubscribe(_service, _instance, _eventgroup,
725                         its_info->is_selective() ? _client : VSOMEIP_ROUTING_CLIENT);
726             }
727         } else {
728             if (get_client() == _client) {
729                 std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
730                 remove_pending_subscription(_service, _instance, _eventgroup, _event);
731                 stub_->send_unsubscribe(
732                         ep_mgr_->find_local(_service, _instance),
733                         _client, _service, _instance, _eventgroup, _event,
734                         PENDING_SUBSCRIPTION_ID);
735             }
736         }
737         ep_mgr_impl_->clear_multicast_endpoints(_service, _instance);
738 
739     } else {
740         VSOMEIP_ERROR<< "SOME/IP eventgroups require SD to be enabled!";
741     }
742 }
743 
send(client_t _client,std::shared_ptr<message> _message)744 bool routing_manager_impl::send(client_t _client,
745         std::shared_ptr<message> _message) {
746     return routing_manager_base::send(_client, _message);
747 }
748 
send(client_t _client,const byte_t * _data,length_t _size,instance_t _instance,bool _reliable,client_t _bound_client,credentials_t _credentials,uint8_t _status_check,bool _sent_from_remote)749 bool routing_manager_impl::send(client_t _client, const byte_t *_data,
750         length_t _size, instance_t _instance, bool _reliable,
751         client_t _bound_client,
752         credentials_t _credentials,
753         uint8_t _status_check, bool _sent_from_remote) {
754     bool is_sent(false);
755     if (_size > VSOMEIP_MESSAGE_TYPE_POS) {
756         std::shared_ptr<endpoint> its_target;
757         bool is_request = utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS]);
758         bool is_notification = utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS]);
759         bool is_response = utility::is_response(_data[VSOMEIP_MESSAGE_TYPE_POS]);
760 
761         client_t its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
762                 _data[VSOMEIP_CLIENT_POS_MAX]);
763         service_t its_service = VSOMEIP_BYTES_TO_WORD(
764                 _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
765         method_t its_method = VSOMEIP_BYTES_TO_WORD(
766                 _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
767 
768         bool is_service_discovery
769             = (its_service == sd::service && its_method == sd::method);
770 
771         if (is_request) {
772             its_target = ep_mgr_->find_local(its_service, _instance);
773         } else if (!is_notification) {
774             its_target = find_local(its_client);
775         } else if (is_notification && _client && !is_service_discovery) { // Selective notifications!
776             if (_client == get_client()) {
777 #ifdef USE_DLT
778                 const uint16_t its_data_size
779                     = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
780 
781                 trace::header its_header;
782                 if (its_header.prepare(its_target, true, _instance))
783                     tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
784                             _data, its_data_size);
785 #endif
786                 deliver_message(_data, _size, _instance, _reliable, _bound_client, _credentials, _status_check, _sent_from_remote);
787                 return true;
788             }
789             its_target = find_local(_client);
790         }
791 
792         if (its_target) {
793 #ifdef USE_DLT
794             if ((is_request && its_client == get_client()) ||
795                     (is_response && find_local_client(its_service, _instance) == get_client()) ||
796                     (is_notification && find_local_client(its_service, _instance) == VSOMEIP_ROUTING_CLIENT)) {
797                 const uint16_t its_data_size
798                     = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
799 
800                 trace::header its_header;
801                 if (its_header.prepare(its_target, true, _instance))
802                     tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
803                             _data, its_data_size);
804             }
805 #endif
806             is_sent = send_local(its_target, get_client(), _data, _size, _instance, _reliable, VSOMEIP_SEND, _status_check);
807         } else {
808             // Check whether hosting application should get the message
809             // If not, check routes to external
810             if ((its_client == host_->get_client() && is_response)
811                     || (find_local_client(its_service, _instance)
812                             == host_->get_client() && is_request)) {
813                 // TODO: Find out how to handle session id here
814                 is_sent = deliver_message(_data, _size, _instance, _reliable, VSOMEIP_ROUTING_CLIENT, _credentials, _status_check);
815             } else {
816                 e2e_buffer its_buffer;
817 
818                 if (e2e_provider_) {
819                     if ( !is_service_discovery) {
820                         service_t its_service = VSOMEIP_BYTES_TO_WORD(
821                                 _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
822                         method_t its_method = VSOMEIP_BYTES_TO_WORD(
823                                 _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
824 #ifndef ANDROID
825                         if (e2e_provider_->is_protected({its_service, its_method})) {
826                             // Find out where the protected area starts
827                             size_t its_base = e2e_provider_->get_protection_base({its_service, its_method});
828 
829                             // Build a corresponding buffer
830                             its_buffer.assign(_data + its_base, _data + _size);
831 
832                             e2e_provider_->protect({ its_service, its_method }, its_buffer, _instance);
833 
834                             // Prepend header
835                             its_buffer.insert(its_buffer.begin(), _data, _data + its_base);
836 
837                             _data = its_buffer.data();
838                        }
839 #endif
840                     }
841                 }
842                 if (is_request) {
843                     its_target = ep_mgr_impl_->find_or_create_remote_client(
844                             its_service, _instance, _reliable);
845                     if (its_target) {
846 #ifdef USE_DLT
847                         const uint16_t its_data_size
848                             = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
849 
850                         trace::header its_header;
851                         if (its_header.prepare(its_target, true, _instance))
852                             tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
853                                     _data, its_data_size);
854 #endif
855                         is_sent = its_target->send(_data, _size);
856                     } else {
857                         const session_t its_session = VSOMEIP_BYTES_TO_WORD(
858                                 _data[VSOMEIP_SESSION_POS_MIN],
859                                 _data[VSOMEIP_SESSION_POS_MAX]);
860                         VSOMEIP_ERROR<< "Routing info for remote service could not be found! ("
861                                 << std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
862                                 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
863                                 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
864                                 << std::hex << std::setw(4) << std::setfill('0') << its_method << "] "
865                                 << std::hex << std::setw(4) << std::setfill('0') << its_session;
866                     }
867                 } else {
868                     std::shared_ptr<serviceinfo> its_info(find_service(its_service, _instance));
869                     if (its_info || is_service_discovery) {
870                         if (is_notification && !is_service_discovery) {
871                             send_local_notification(get_client(), _data, _size, _instance, _reliable, _status_check);
872                             method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
873                                     _data[VSOMEIP_METHOD_POS_MAX]);
874                             std::shared_ptr<event> its_event = find_event(its_service, _instance, its_method);
875                             if (its_event) {
876 #ifdef USE_DLT
877                                 bool has_sent(false);
878 #endif
879                                 std::set<std::shared_ptr<endpoint_definition>> its_targets;
880                                 // we need both endpoints as clients can subscribe to events via TCP and UDP
881                                 std::shared_ptr<endpoint> its_udp_server_endpoint = its_info->get_endpoint(false);
882                                 std::shared_ptr<endpoint> its_tcp_server_endpoint = its_info->get_endpoint(true);
883 
884                                 if (its_udp_server_endpoint || its_tcp_server_endpoint) {
885                                     const auto its_reliability = its_event->get_reliability();
886                                     for (auto its_group : its_event->get_eventgroups()) {
887                                         auto its_eventgroup = find_eventgroup(its_service, _instance, its_group);
888                                         if (its_eventgroup) {
889                                             // Unicast targets
890                                             for (const auto &its_remote : its_eventgroup->get_unicast_targets()) {
891                                                 if (its_remote->is_reliable() && its_tcp_server_endpoint) {
892                                                     if (its_reliability == reliability_type_e::RT_RELIABLE
893                                                             || its_reliability == reliability_type_e::RT_BOTH) {
894                                                         its_targets.insert(its_remote);
895                                                     }
896                                                 } else if (its_udp_server_endpoint && !its_eventgroup->is_sending_multicast()) {
897                                                     if (its_reliability == reliability_type_e::RT_UNRELIABLE
898                                                             || its_reliability == reliability_type_e::RT_BOTH) {
899                                                         its_targets.insert(its_remote);
900                                                     }
901                                                 }
902                                             }
903                                             // Send to multicast targets if subscribers are still interested
904                                             if (its_eventgroup->is_sending_multicast()) {
905                                                 if (its_reliability == reliability_type_e::RT_UNRELIABLE
906                                                         || its_reliability == reliability_type_e::RT_BOTH) {
907                                                     boost::asio::ip::address its_address;
908                                                     uint16_t its_port;
909                                                     if (its_eventgroup->get_multicast(its_address, its_port)) {
910                                                         std::shared_ptr<endpoint_definition> its_multicast_target;
911                                                         its_multicast_target = endpoint_definition::get(its_address,
912                                                                 its_port, false, its_service, _instance);
913                                                         its_targets.insert(its_multicast_target);
914                                                     }
915                                                 }
916                                             }
917                                         }
918                                     }
919                                 }
920 
921                                 for (auto const &target : its_targets) {
922                                     if (target->is_reliable()) {
923                                         its_tcp_server_endpoint->send_to(target, _data, _size);
924                                     } else {
925                                         its_udp_server_endpoint->send_to(target, _data, _size);
926                                     }
927 #ifdef USE_DLT
928                                     has_sent = true;
929 #endif
930                                 }
931 #ifdef USE_DLT
932                                 if (has_sent) {
933                                     const uint16_t its_data_size
934                                         = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
935 
936                                     trace::header its_header;
937                                     if (its_header.prepare(nullptr, true, _instance))
938                                         tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
939                                                 _data, its_data_size);
940                                 }
941 #endif
942                             }
943                         } else {
944                             if ((utility::is_response(_data[VSOMEIP_MESSAGE_TYPE_POS])
945                                  || utility::is_error(_data[VSOMEIP_MESSAGE_TYPE_POS]))
946                                     && !its_info->is_local()) {
947                                 // We received a response/error but neither the hosting application
948                                 // nor another local client could be found --> drop
949                                 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
950                                         _data[VSOMEIP_SESSION_POS_MIN],
951                                         _data[VSOMEIP_SESSION_POS_MAX]);
952                                 VSOMEIP_ERROR
953                                     << "routing_manager_impl::send: Received response/error for unknown client ("
954                                     << std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
955                                     << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
956                                     << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
957                                     << std::hex << std::setw(4) << std::setfill('0') << its_method << "] "
958                                     << std::hex << std::setw(4) << std::setfill('0') << its_session;
959                                 return false;
960                             }
961                             its_target = is_service_discovery ?
962                                          (sd_info_ ? sd_info_->get_endpoint(false) : nullptr) : its_info->get_endpoint(_reliable);
963                             if (its_target) {
964 #ifdef USE_DLT
965                                 const uint16_t its_data_size
966                                     = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
967 
968                                 trace::header its_header;
969                                 if (its_header.prepare(its_target, true, _instance))
970                                     tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
971                                             _data, its_data_size);
972 #endif
973                                 is_sent = its_target->send(_data, _size);
974                             } else {
975                                 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
976                                         _data[VSOMEIP_SESSION_POS_MIN],
977                                         _data[VSOMEIP_SESSION_POS_MAX]);
978                                 VSOMEIP_ERROR << "Routing error. Endpoint for service ("
979                                         << std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
980                                         << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
981                                         << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
982                                         << std::hex << std::setw(4) << std::setfill('0') << its_method << "] "
983                                         << std::hex << std::setw(4) << std::setfill('0') << its_session
984                                         << " could not be found!";
985                             }
986                         }
987                     } else {
988                         if (!is_notification) {
989                             const session_t its_session = VSOMEIP_BYTES_TO_WORD(
990                                     _data[VSOMEIP_SESSION_POS_MIN],
991                                     _data[VSOMEIP_SESSION_POS_MAX]);
992                             VSOMEIP_ERROR << "Routing error. Not hosting service ("
993                                     << std::hex << std::setw(4) << std::setfill('0') << its_client << "): ["
994                                     << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
995                                     << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
996                                     << std::hex << std::setw(4) << std::setfill('0') << its_method << "] "
997                                     << std::hex << std::setw(4) << std::setfill('0') << its_session;
998                         }
999                     }
1000                 }
1001             }
1002         }
1003     }
1004 
1005     return (is_sent);
1006 }
1007 
send_to(const client_t _client,const std::shared_ptr<endpoint_definition> & _target,std::shared_ptr<message> _message)1008 bool routing_manager_impl::send_to(
1009         const client_t _client,
1010         const std::shared_ptr<endpoint_definition> &_target,
1011         std::shared_ptr<message> _message) {
1012     bool is_sent(false);
1013 
1014     std::shared_ptr<serializer> its_serializer(get_serializer());
1015     if (its_serializer->serialize(_message.get())) {
1016         const byte_t *its_data = its_serializer->get_data();
1017         length_t its_size = its_serializer->get_size();
1018         e2e_buffer its_buffer;
1019         if (e2e_provider_) {
1020             service_t its_service = VSOMEIP_BYTES_TO_WORD(
1021                     its_data[VSOMEIP_SERVICE_POS_MIN],
1022                     its_data[VSOMEIP_SERVICE_POS_MAX]);
1023             method_t its_method = VSOMEIP_BYTES_TO_WORD(
1024                     its_data[VSOMEIP_METHOD_POS_MIN],
1025                     its_data[VSOMEIP_METHOD_POS_MAX]);
1026 #ifndef ANDROID
1027             if (e2e_provider_->is_protected({its_service, its_method})) {
1028                 auto its_base = e2e_provider_->get_protection_base({its_service, its_method});
1029                 its_buffer.assign(its_data + its_base, its_data + its_size);
1030                 e2e_provider_->protect({its_service, its_method}, its_buffer, _message->get_instance());
1031                 its_buffer.insert(its_buffer.begin(), its_data, its_data + its_base);
1032                 its_data = its_buffer.data();
1033            }
1034 #endif
1035         }
1036 
1037         const_cast<byte_t*>(its_data)[VSOMEIP_CLIENT_POS_MIN] = VSOMEIP_WORD_BYTE1(_client);
1038         const_cast<byte_t*>(its_data)[VSOMEIP_CLIENT_POS_MAX] = VSOMEIP_WORD_BYTE0(_client);
1039 
1040         is_sent = send_to(_target, its_data, its_size, _message->get_instance());
1041 
1042         its_serializer->reset();
1043         put_serializer(its_serializer);
1044     } else {
1045         VSOMEIP_ERROR<< "routing_manager_impl::send_to: serialization failed.";
1046     }
1047     return (is_sent);
1048 }
1049 
send_to(const std::shared_ptr<endpoint_definition> & _target,const byte_t * _data,uint32_t _size,instance_t _instance)1050 bool routing_manager_impl::send_to(
1051         const std::shared_ptr<endpoint_definition> &_target,
1052         const byte_t *_data, uint32_t _size, instance_t _instance) {
1053     std::shared_ptr<endpoint> its_endpoint =
1054             ep_mgr_impl_->find_server_endpoint(
1055                     _target->get_remote_port(), _target->is_reliable());
1056 
1057     if (its_endpoint) {
1058 #ifdef USE_DLT
1059         const uint16_t its_data_size
1060             = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
1061 
1062         trace::header its_header;
1063         if (its_header.prepare(its_endpoint, true, _instance))
1064             tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
1065                     _data, its_data_size);
1066 #else
1067         (void) _instance;
1068 #endif
1069         return its_endpoint->send_to(_target, _data, _size);
1070     }
1071     return false;
1072 }
1073 
send_via_sd(const std::shared_ptr<endpoint_definition> & _target,const byte_t * _data,uint32_t _size,uint16_t _sd_port)1074 bool routing_manager_impl::send_via_sd(
1075         const std::shared_ptr<endpoint_definition> &_target,
1076         const byte_t *_data, uint32_t _size, uint16_t _sd_port) {
1077     std::shared_ptr<endpoint> its_endpoint =
1078             ep_mgr_impl_->find_server_endpoint(_sd_port,
1079                     _target->is_reliable());
1080 
1081     if (its_endpoint) {
1082 #ifdef USE_DLT
1083         if (tc_->is_sd_enabled()) {
1084             const uint16_t its_data_size
1085                 = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
1086 
1087             trace::header its_header;
1088             if (its_header.prepare(its_endpoint, true, 0x0))
1089                 tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
1090                         _data, its_data_size);
1091 
1092         }
1093 #endif
1094         return its_endpoint->send_to(_target, _data, _size);
1095     }
1096 
1097     return false;
1098 }
1099 
register_event(client_t _client,service_t _service,instance_t _instance,event_t _notifier,const std::set<eventgroup_t> & _eventgroups,const event_type_e _type,reliability_type_e _reliability,std::chrono::milliseconds _cycle,bool _change_resets_cycle,bool _update_on_change,epsilon_change_func_t _epsilon_change_func,bool _is_provided,bool _is_shadow,bool _is_cache_placeholder)1100 void routing_manager_impl::register_event(client_t _client,
1101         service_t _service, instance_t _instance,
1102         event_t _notifier,
1103         const std::set<eventgroup_t> &_eventgroups, const event_type_e _type,
1104         reliability_type_e _reliability,
1105         std::chrono::milliseconds _cycle, bool _change_resets_cycle,
1106         bool _update_on_change,
1107         epsilon_change_func_t _epsilon_change_func,
1108         bool _is_provided, bool _is_shadow, bool _is_cache_placeholder) {
1109     auto its_event = find_event(_service, _instance, _notifier);
1110     bool is_first(false);
1111     if (its_event) {
1112         if (!its_event->has_ref(_client, _is_provided)) {
1113             is_first = true;
1114         }
1115     } else {
1116         is_first = true;
1117     }
1118     if (is_first) {
1119         routing_manager_base::register_event(_client,
1120                 _service, _instance,
1121                 _notifier,
1122                 _eventgroups, _type, _reliability,
1123                 _cycle, _change_resets_cycle, _update_on_change,
1124                 _epsilon_change_func, _is_provided, _is_shadow,
1125                 _is_cache_placeholder);
1126     }
1127     VSOMEIP_INFO << "REGISTER EVENT("
1128         << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
1129         << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1130         << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
1131         << std::hex << std::setw(4) << std::setfill('0') << _notifier
1132         << ":is_provider=" << std::boolalpha << _is_provided << "]";
1133 }
1134 
register_shadow_event(client_t _client,service_t _service,instance_t _instance,event_t _notifier,const std::set<eventgroup_t> & _eventgroups,event_type_e _type,reliability_type_e _reliability,bool _is_provided)1135 void routing_manager_impl::register_shadow_event(client_t _client,
1136         service_t _service, instance_t _instance,
1137         event_t _notifier,
1138         const std::set<eventgroup_t> &_eventgroups, event_type_e _type,
1139         reliability_type_e _reliability, bool _is_provided) {
1140     routing_manager_base::register_event(_client,
1141             _service, _instance,
1142             _notifier,
1143             _eventgroups, _type, _reliability,
1144             std::chrono::milliseconds::zero(), false, true,
1145             nullptr,
1146             _is_provided, true);
1147 }
1148 
unregister_shadow_event(client_t _client,service_t _service,instance_t _instance,event_t _event,bool _is_provided)1149 void routing_manager_impl::unregister_shadow_event(client_t _client,
1150         service_t _service, instance_t _instance,
1151         event_t _event, bool _is_provided) {
1152     routing_manager_base::unregister_event(_client, _service, _instance,
1153             _event, _is_provided);
1154 }
1155 
notify_one(service_t _service,instance_t _instance,event_t _event,std::shared_ptr<payload> _payload,client_t _client,bool _force,bool _remote_subscriber)1156 void routing_manager_impl::notify_one(service_t _service, instance_t _instance,
1157         event_t _event, std::shared_ptr<payload> _payload, client_t _client,
1158         bool _force
1159 #ifdef VSOMEIP_ENABLE_COMPAT
1160         , bool _remote_subscriber
1161 #endif
1162         ) {
1163     if (find_local(_client)) {
1164         routing_manager_base::notify_one(_service, _instance, _event, _payload,
1165                 _client, _force
1166 #ifdef VSOMEIP_ENABLE_COMPAT
1167                 , _remote_subscriber
1168 #endif
1169                 );
1170     } else {
1171         std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
1172         if (its_event) {
1173             std::set<std::shared_ptr<endpoint_definition> > its_targets;
1174             const auto its_reliability = its_event->get_reliability();
1175             for (const auto& g : its_event->get_eventgroups()) {
1176                 const auto its_eventgroup = find_eventgroup(_service, _instance, g);
1177                 if (its_eventgroup) {
1178                     const auto its_subscriptions = its_eventgroup->get_remote_subscriptions();
1179                     for (const auto &s : its_subscriptions) {
1180                         if (s->has_client(_client)) {
1181                             if (its_reliability == reliability_type_e::RT_RELIABLE
1182                                     || its_reliability == reliability_type_e::RT_BOTH) {
1183                                 const auto its_reliable = s->get_reliable();
1184                                 if (its_reliable)
1185                                     its_targets.insert(its_reliable);
1186                             }
1187                             if (its_reliability == reliability_type_e::RT_UNRELIABLE
1188                                     || its_reliability == reliability_type_e::RT_BOTH) {
1189                                 const auto its_unreliable = s->get_unreliable();
1190                                 if (its_unreliable)
1191                                     its_targets.insert(its_unreliable);
1192                             }
1193                         }
1194                     }
1195                 }
1196             }
1197 
1198             if (its_targets.size() > 0) {
1199                 for (const auto &its_target : its_targets) {
1200                     its_event->set_payload(_payload, _client, its_target, _force);
1201                 }
1202             }
1203         } else {
1204             VSOMEIP_WARNING << "Attempt to update the undefined event/field ["
1205                 << std::hex << _service << "." << _instance << "." << _event
1206                 << "]";
1207         }
1208     }
1209 }
1210 
on_availability(service_t _service,instance_t _instance,bool _is_available,major_version_t _major,minor_version_t _minor)1211 void routing_manager_impl::on_availability(service_t _service, instance_t _instance,
1212         bool _is_available, major_version_t _major, minor_version_t _minor) {
1213 
1214     // insert subscriptions of routing manager into service discovery
1215     // to send SubscribeEventgroup after StopOffer / Offer was received
1216     if (_is_available) {
1217         if (discovery_) {
1218             const client_t its_local_client = find_local_client(_service, _instance);
1219             // remote service
1220             if (VSOMEIP_ROUTING_CLIENT == its_local_client) {
1221                 static const ttl_t configured_ttl(configuration_->get_sd_ttl());
1222 
1223                 std::lock_guard<std::mutex> ist_lock(pending_subscription_mutex_);
1224                 for (auto &ps : pending_subscriptions_) {
1225                     if (ps.service_ == _service
1226                             && ps.instance_ == _instance
1227                             && ps.major_ == _major) {
1228                         auto its_info = find_eventgroup(_service, _instance, ps.eventgroup_);
1229                         if (its_info) {
1230                             discovery_->subscribe(
1231                                     _service,
1232                                     _instance,
1233                                     ps.eventgroup_,
1234                                     _major,
1235                                     configured_ttl,
1236                                     its_info->is_selective() ? get_client() : VSOMEIP_ROUTING_CLIENT,
1237                                     its_info);
1238                         }
1239                     }
1240                 }
1241             }
1242         }
1243     }
1244     host_->on_availability(_service, _instance, _is_available, _major, _minor);
1245 }
1246 
1247 
offer_service_remotely(service_t _service,instance_t _instance,std::uint16_t _port,bool _reliable,bool _magic_cookies_enabled)1248 bool routing_manager_impl::offer_service_remotely(service_t _service,
1249                                                   instance_t _instance,
1250                                                   std::uint16_t _port,
1251                                                   bool _reliable,
1252                                                   bool _magic_cookies_enabled) {
1253     bool ret = true;
1254 
1255     if(!is_available(_service, _instance, ANY_MAJOR)) {
1256         VSOMEIP_ERROR << __func__ << ": Service ["
1257                 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1258                 << std::hex << std::setw(4) << std::setfill('0') << _instance
1259                 << "] is not offered locally! Won't offer it remotely.";
1260         ret = false;
1261     } else {
1262         // update service info in configuration
1263         if (!configuration_->remote_offer_info_add(_service, _instance, _port,
1264                 _reliable, _magic_cookies_enabled)) {
1265             ret = false;
1266         } else {
1267             // trigger event registration again to create shadow events
1268             const client_t its_offering_client = find_local_client(_service, _instance);
1269             if (its_offering_client == VSOMEIP_ROUTING_CLIENT) {
1270                 VSOMEIP_ERROR << __func__ << " didn't find offering client for service ["
1271                         << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1272                         << std::hex << std::setw(4) << std::setfill('0') << _instance
1273                         << "]";
1274                 ret = false;
1275             } else {
1276                 if (!stub_->send_provided_event_resend_request(its_offering_client,
1277                         pending_remote_offer_add(_service, _instance))) {
1278                     VSOMEIP_ERROR << __func__ << ": Couldn't send event resend"
1279                         << "request to client 0x" << std::hex << std::setw(4)
1280                         << std::setfill('0') << its_offering_client << " providing service ["
1281                         << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1282                         << std::hex << std::setw(4) << std::setfill('0') << _instance
1283                         << "]";
1284 
1285                     ret = false;
1286                 }
1287             }
1288         }
1289     }
1290     return ret;
1291 }
1292 
stop_offer_service_remotely(service_t _service,instance_t _instance,std::uint16_t _port,bool _reliable,bool _magic_cookies_enabled)1293 bool routing_manager_impl::stop_offer_service_remotely(service_t _service,
1294                                                        instance_t _instance,
1295                                                        std::uint16_t _port,
1296                                                        bool _reliable,
1297                                                        bool _magic_cookies_enabled) {
1298     bool ret = true;
1299     bool service_still_offered_remote(false);
1300     // update service configuration
1301     if (!configuration_->remote_offer_info_remove(_service, _instance, _port,
1302             _reliable, _magic_cookies_enabled, &service_still_offered_remote)) {
1303         VSOMEIP_ERROR << __func__ << " couldn't remove remote offer info for service ["
1304                 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1305                 << std::hex << std::setw(4) << std::setfill('0') << _instance
1306                 << "] from configuration";
1307         ret = false;
1308     }
1309     std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
1310     std::shared_ptr<endpoint> its_server_endpoint;
1311     if (its_info) {
1312         its_server_endpoint = its_info->get_endpoint(_reliable);
1313     }
1314     // don't deregister events if the service is still offered remotely
1315     if (!service_still_offered_remote) {
1316         const client_t its_offering_client = find_local_client(_service, _instance);
1317         major_version_t its_major(0);
1318         minor_version_t its_minor(0);
1319         if (its_info) {
1320             its_major = its_info->get_major();
1321             its_minor = its_info->get_minor();
1322         }
1323         // unset payload and clear subcribers
1324         routing_manager_base::stop_offer_service(its_offering_client,
1325                 _service, _instance, its_major, its_minor);
1326         // unregister events
1327         for (const event_t its_event_id : find_events(_service, _instance)) {
1328             unregister_shadow_event(its_offering_client, _service, _instance,
1329                     its_event_id, true);
1330         }
1331         clear_targets_and_pending_sub_from_eventgroups(_service, _instance);
1332         clear_remote_subscriber(_service, _instance);
1333 
1334         if (discovery_ && its_info) {
1335             discovery_->stop_offer_service(its_info);
1336             its_info->set_endpoint(std::shared_ptr<endpoint>(), _reliable);
1337         }
1338     } else {
1339         // service is still partly offered
1340         if (discovery_ && its_info) {
1341             std::shared_ptr<serviceinfo> its_copied_info =
1342                     std::make_shared<serviceinfo>(*its_info);
1343             its_info->set_endpoint(std::shared_ptr<endpoint>(), _reliable);
1344             // ensure to not send StopOffer for endpoint on which the service is
1345             // still offered
1346             its_copied_info->set_endpoint(std::shared_ptr<endpoint>(), !_reliable);
1347             discovery_->stop_offer_service(its_copied_info);
1348         }
1349     }
1350 
1351     cleanup_server_endpoint(_service, its_server_endpoint);
1352     return ret;
1353 }
1354 
on_message(const byte_t * _data,length_t _size,endpoint * _receiver,const boost::asio::ip::address & _destination,client_t _bound_client,credentials_t _credentials,const boost::asio::ip::address & _remote_address,std::uint16_t _remote_port)1355 void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
1356         endpoint *_receiver, const boost::asio::ip::address &_destination,
1357         client_t _bound_client, credentials_t _credentials,
1358         const boost::asio::ip::address &_remote_address,
1359         std::uint16_t _remote_port) {
1360 #if 0
1361     std::stringstream msg;
1362     msg << "rmi::on_message: ";
1363     for (uint32_t i = 0; i < _size; ++i)
1364     msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
1365     VSOMEIP_INFO << msg.str();
1366 #endif
1367     (void)_bound_client;
1368     service_t its_service;
1369     method_t its_method;
1370     uint8_t its_check_status = e2e::profile_interface::generic_check_status::E2E_OK;
1371     instance_t its_instance(0x0);
1372 #ifdef USE_DLT
1373     bool is_forwarded(true);
1374 #endif
1375     if (_size >= VSOMEIP_SOMEIP_HEADER_SIZE) {
1376         its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
1377                 _data[VSOMEIP_SERVICE_POS_MAX]);
1378         if (its_service == VSOMEIP_SD_SERVICE) {
1379             its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
1380                             _data[VSOMEIP_METHOD_POS_MAX]);
1381             if (discovery_ && its_method == sd::method) {
1382                 if (configuration_->get_sd_port() == _remote_port) {
1383                     if (!_remote_address.is_unspecified()) {
1384                         discovery_->on_message(_data, _size, _remote_address, _destination);
1385                     } else {
1386                         VSOMEIP_ERROR << "Ignored SD message from unknown address.";
1387                     }
1388                 } else {
1389                     VSOMEIP_ERROR << "Ignored SD message from unknown port ("
1390                             << _remote_port << ")";
1391                 }
1392             }
1393         } else {
1394             if(_destination.is_multicast()) {
1395                 its_instance = ep_mgr_impl_->find_instance_multicast(its_service, _remote_address);
1396             } else {
1397                 its_instance = ep_mgr_impl_->find_instance(its_service, _receiver);
1398             }
1399             if (its_instance == 0xFFFF) {
1400                 its_method = VSOMEIP_BYTES_TO_WORD(
1401                         _data[VSOMEIP_METHOD_POS_MIN],
1402                         _data[VSOMEIP_METHOD_POS_MAX]);
1403                 const client_t its_client = VSOMEIP_BYTES_TO_WORD(
1404                         _data[VSOMEIP_CLIENT_POS_MIN],
1405                         _data[VSOMEIP_CLIENT_POS_MAX]);
1406                 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
1407                         _data[VSOMEIP_SESSION_POS_MIN],
1408                         _data[VSOMEIP_SESSION_POS_MAX]);
1409                 boost::system::error_code ec;
1410                 VSOMEIP_ERROR << "Received message on invalid port: ["
1411                         << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
1412                         << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
1413                         << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
1414                         << std::hex << std::setw(4) << std::setfill('0') << its_client << "."
1415                         << std::hex << std::setw(4) << std::setfill('0') << its_session << "] from: "
1416                         << _remote_address.to_string(ec) << ":" << std::dec << _remote_port;
1417             }
1418             //Ignore messages with invalid message type
1419             if(_size >= VSOMEIP_MESSAGE_TYPE_POS) {
1420                 if(!utility::is_valid_message_type(static_cast<message_type_e>(_data[VSOMEIP_MESSAGE_TYPE_POS]))) {
1421                     VSOMEIP_ERROR << "Ignored SomeIP message with invalid message type.";
1422                     return;
1423                 }
1424             }
1425             return_code_e return_code = check_error(_data, _size, its_instance);
1426             if(!(_size >= VSOMEIP_MESSAGE_TYPE_POS && utility::is_request_no_return(_data[VSOMEIP_MESSAGE_TYPE_POS]))) {
1427                 if (return_code != return_code_e::E_OK && return_code != return_code_e::E_NOT_OK) {
1428                     send_error(return_code, _data, _size, its_instance,
1429                             _receiver->is_reliable(), _receiver,
1430                             _remote_address, _remote_port);
1431                     return;
1432                 }
1433             } else if(return_code != return_code_e::E_OK && return_code != return_code_e::E_NOT_OK) {
1434                 //Ignore request no response message if an error occured
1435                 return;
1436             }
1437 
1438             // Security checks if enabled!
1439             if (security::get()->is_enabled()) {
1440                 if (utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
1441                     client_t requester = VSOMEIP_BYTES_TO_WORD(
1442                             _data[VSOMEIP_CLIENT_POS_MIN],
1443                             _data[VSOMEIP_CLIENT_POS_MAX]);
1444                     its_method = VSOMEIP_BYTES_TO_WORD(
1445                                _data[VSOMEIP_METHOD_POS_MIN],
1446                                _data[VSOMEIP_METHOD_POS_MAX]);
1447                     if (!configuration_->is_offered_remote(its_service, its_instance)) {
1448                         VSOMEIP_WARNING << std::hex << "Security: Received a remote request "
1449                                 << "for service/instance " << its_service << "/" << its_instance
1450                                 << " which isn't offered remote ~> Skip message!";
1451                         return;
1452                     }
1453                     if (find_local(requester)) {
1454                         VSOMEIP_WARNING << std::hex << "Security: Received a remote request "
1455                                 << "from client identifier 0x" << requester
1456                                 << " which is already used locally ~> Skip message!";
1457                         return;
1458                     }
1459                     if (!security::get()->is_remote_client_allowed()) {
1460                         // check if policy allows remote requests.
1461                         VSOMEIP_WARNING << "routing_manager_impl::on_message: "
1462                                 << std::hex << "Security: Remote client with client ID 0x" << requester
1463                                 << " is not allowed to communicate with service/instance/method "
1464                                 << its_service << "/" << its_instance
1465                                 << "/" << its_method;
1466                         return;
1467                     }
1468                 }
1469             }
1470             if (e2e_provider_) {
1471                 its_method = VSOMEIP_BYTES_TO_WORD(
1472                            _data[VSOMEIP_METHOD_POS_MIN],
1473                            _data[VSOMEIP_METHOD_POS_MAX]);
1474 #ifndef ANDROID
1475                 if (e2e_provider_->is_checked({its_service, its_method})) {
1476                     auto its_base = e2e_provider_->get_protection_base({its_service, its_method});
1477                     e2e_buffer its_buffer(_data + its_base, _data + _size);
1478                     e2e_provider_->check({its_service, its_method},
1479                             its_buffer, its_instance, its_check_status);
1480 
1481                     if (its_check_status != e2e::profile_interface::generic_check_status::E2E_OK) {
1482                         VSOMEIP_INFO << "E2E protection: CRC check failed for service: "
1483                                 << std::hex << its_service << " method: " << its_method;
1484                     }
1485                 }
1486 #endif
1487             }
1488             // Common way of message handling
1489 #ifdef USE_DLT
1490             is_forwarded =
1491 #endif
1492             on_message(its_service, its_instance, _data, _size, _receiver->is_reliable(),
1493                     _bound_client, _credentials, its_check_status, true);
1494         }
1495     }
1496 #ifdef USE_DLT
1497     if (is_forwarded) {
1498         const uint16_t its_data_size
1499             = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
1500 
1501         trace::header its_header;
1502         const boost::asio::ip::address_v4 its_remote_address =
1503                 _remote_address.is_v4() ? _remote_address.to_v4() :
1504                         boost::asio::ip::address_v4::from_string("6.6.6.6");
1505         trace::protocol_e its_protocol =
1506                 _receiver->is_local() ? trace::protocol_e::local :
1507                 _receiver->is_reliable() ? trace::protocol_e::tcp :
1508                     trace::protocol_e::udp;
1509         its_header.prepare(its_remote_address, _remote_port, its_protocol, false,
1510                 its_instance);
1511         tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE, _data,
1512                 its_data_size);
1513     }
1514 #endif
1515 }
1516 
on_message(service_t _service,instance_t _instance,const byte_t * _data,length_t _size,bool _reliable,client_t _bound_client,credentials_t _credentials,uint8_t _check_status,bool _is_from_remote)1517 bool routing_manager_impl::on_message(
1518         service_t _service, instance_t _instance,
1519         const byte_t *_data, length_t _size,
1520         bool _reliable, client_t _bound_client,
1521         credentials_t _credentials,
1522         uint8_t _check_status,
1523         bool _is_from_remote) {
1524 #if 0
1525     std::stringstream msg;
1526     msg << "rmi::on_message("
1527             << std::hex << std::setw(4) << std::setfill('0')
1528             << _service << ", " << _instance << "): ";
1529     for (uint32_t i = 0; i < _size; ++i)
1530         msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
1531     VSOMEIP_INFO << msg.str();
1532 #endif
1533     client_t its_client;
1534     bool is_forwarded(true);
1535 
1536     if (utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
1537         its_client = find_local_client(_service, _instance);
1538     } else {
1539         its_client = VSOMEIP_BYTES_TO_WORD(
1540                 _data[VSOMEIP_CLIENT_POS_MIN],
1541                 _data[VSOMEIP_CLIENT_POS_MAX]);
1542     }
1543 
1544     if (utility::is_notification(_data[VSOMEIP_MESSAGE_TYPE_POS])) {
1545         is_forwarded = deliver_notification(_service, _instance, _data, _size,
1546                 _reliable, _bound_client, _credentials, _check_status, _is_from_remote);
1547     } else if (its_client == host_->get_client()) {
1548         deliver_message(_data, _size, _instance,
1549                 _reliable, _bound_client, _credentials, _check_status, _is_from_remote);
1550     } else {
1551         send(its_client, _data, _size, _instance, _reliable,
1552                 _bound_client, _credentials, _check_status, _is_from_remote); //send to proxy
1553     }
1554     return is_forwarded;
1555 }
1556 
on_notification(client_t _client,service_t _service,instance_t _instance,const byte_t * _data,length_t _size,bool _notify_one)1557 void routing_manager_impl::on_notification(client_t _client,
1558         service_t _service, instance_t _instance,
1559         const byte_t *_data, length_t _size, bool _notify_one) {
1560     event_t its_event_id = VSOMEIP_BYTES_TO_WORD(
1561             _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
1562     std::shared_ptr<event> its_event = find_event(_service, _instance, its_event_id);
1563     if (its_event) {
1564         uint32_t its_length = utility::get_payload_size(_data, _size);
1565         std::shared_ptr<payload> its_payload =
1566                 runtime::get()->create_payload(
1567                                     &_data[VSOMEIP_PAYLOAD_POS],
1568                                     its_length);
1569 
1570         if (_notify_one) {
1571             notify_one(_service, _instance, its_event->get_event(),
1572                     its_payload, _client, true
1573 #ifdef VSOMEIP_ENABLE_COMPAT
1574                     , false
1575 #endif
1576                     );
1577         } else {
1578             if (its_event->is_field()) {
1579                 if (!its_event->set_payload_notify_pending(its_payload)) {
1580                     its_event->set_payload(its_payload, false);
1581                 }
1582             } else {
1583                  its_event->set_payload(its_payload, false, true);
1584             }
1585         }
1586     }
1587 }
1588 
is_last_stop_callback(const uint32_t _callback_id)1589 bool routing_manager_impl::is_last_stop_callback(const uint32_t _callback_id) {
1590     bool last_callback(false);
1591     auto found_id = callback_counts_.find(_callback_id);
1592     if (found_id != callback_counts_.end()) {
1593         found_id->second--;
1594         if (found_id->second == 0) {
1595             last_callback = true;
1596         }
1597     }
1598     if (last_callback) {
1599         callback_counts_.erase(_callback_id);
1600     }
1601     return last_callback;
1602 }
1603 
on_stop_offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)1604 void routing_manager_impl::on_stop_offer_service(client_t _client, service_t _service,
1605         instance_t _instance, major_version_t _major, minor_version_t _minor) {
1606     {
1607         std::lock_guard<std::mutex> its_lock(local_services_mutex_);
1608         auto found_service = local_services_.find(_service);
1609         if (found_service != local_services_.end()) {
1610             auto found_instance = found_service->second.find(_instance);
1611             if (found_instance != found_service->second.end()) {
1612                 if (   std::get<0>(found_instance->second) != _major
1613                     || std::get<1>(found_instance->second) != _minor
1614                     || std::get<2>(found_instance->second) != _client) {
1615                     VSOMEIP_WARNING
1616                             << "routing_manager_impl::on_stop_offer_service: "
1617                             << "trying to delete service not matching exactly "
1618                             << "the one offered previously: " << "[" << std::hex
1619                             << std::setw(4) << std::setfill('0') << _service
1620                             << "." << _instance << "." << std::dec
1621                             << static_cast<std::uint32_t>(_major)
1622                             << "." << _minor << "] by application: "
1623                             << std::hex << std::setw(4) << std::setfill('0')
1624                             << _client << ". Stored: [" << std::hex
1625                             << std::setw(4) << std::setfill('0') << _service
1626                             << "." << _instance << "." << std::dec
1627                             << static_cast<std::uint32_t>(std::get<0>(found_instance->second)) << "."
1628                             << std::get<1>(found_instance->second)
1629                             << "] by application: " << std::hex << std::setw(4)
1630                             << std::setfill('0') << std::get<2>(found_instance->second);
1631                 }
1632                 if (std::get<2>(found_instance->second) == _client) {
1633                     found_service->second.erase(_instance);
1634                     if (found_service->second.size() == 0) {
1635                         local_services_.erase(_service);
1636                     }
1637                 }
1638             }
1639         }
1640     }
1641 
1642     routing_manager_base::stop_offer_service(_client, _service, _instance,
1643             _major, _minor);
1644 
1645     /**
1646      * Hold reliable & unreliable server-endpoints from service info
1647      * because if "del_routing_info" is called those entries could be freed
1648      * and we can't be sure this happens synchronous when SD is active.
1649      * After triggering "del_routing_info" this endpoints gets cleanup up
1650      * within this method if they not longer used by any other local service.
1651      */
1652     std::shared_ptr<endpoint> its_reliable_endpoint;
1653     std::shared_ptr<endpoint> its_unreliable_endpoint;
1654     std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
1655     if (its_info) {
1656         its_reliable_endpoint = its_info->get_endpoint(true);
1657         its_unreliable_endpoint = its_info->get_endpoint(false);
1658         static std::atomic<uint32_t> callback_id(0);
1659         const uint32_t its_callback_id = ++callback_id;
1660 
1661         struct ready_to_stop_t {
1662             ready_to_stop_t() : reliable_(false), unreliable_(false){}
1663             std::atomic<bool> reliable_;
1664             std::atomic<bool> unreliable_;
1665         };
1666         auto ready_to_stop = std::make_shared<ready_to_stop_t>();
1667         auto ptr = shared_from_this();
1668 
1669         auto callback = [&, its_callback_id, ptr, its_info, its_reliable_endpoint, its_unreliable_endpoint,
1670                          ready_to_stop, _service, _instance, _major, _minor]
1671                          (std::shared_ptr<endpoint> _endpoint, service_t _stopped_service) {
1672             (void)_stopped_service;
1673             if (its_reliable_endpoint && its_reliable_endpoint == _endpoint) {
1674                 ready_to_stop->reliable_ = true;
1675             }
1676             if (its_unreliable_endpoint && its_unreliable_endpoint == _endpoint) {
1677                 ready_to_stop->unreliable_ = true;
1678             }
1679             if ((its_unreliable_endpoint && !ready_to_stop->unreliable_) ||
1680                 (its_reliable_endpoint && !ready_to_stop->reliable_)) {
1681                 {
1682                     std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1683                     if (is_last_stop_callback(its_callback_id)) {
1684                         erase_offer_command(_service, _instance);
1685                     }
1686                 }
1687                 return;
1688             }
1689 
1690             if (discovery_) {
1691                 if (its_info->get_major() == _major && its_info->get_minor() == _minor) {
1692                     discovery_->stop_offer_service(its_info);
1693                 }
1694             }
1695             del_routing_info(_service, _instance, (its_reliable_endpoint != nullptr),
1696                     (its_unreliable_endpoint != nullptr));
1697 
1698             for (const auto& ep: {its_reliable_endpoint, its_unreliable_endpoint}) {
1699                 if (ep) {
1700                     if (ep_mgr_impl_->remove_instance(_service, ep.get())) {
1701                         {
1702                             std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1703                             callback_counts_[its_callback_id]++;
1704                         }
1705                         // last instance -> pass ANY_INSTANCE and shutdown completely
1706                         ep->prepare_stop(
1707                             [&, _service, _instance, its_callback_id, ptr, its_reliable_endpoint, its_unreliable_endpoint]
1708                             (std::shared_ptr<endpoint> _endpoint,
1709                             service_t _stopped_service) {
1710                                 (void)_stopped_service;
1711                                 if (ep_mgr_impl_->remove_server_endpoint(
1712                                                 _endpoint->get_local_port(),
1713                                                 _endpoint->is_reliable())) {
1714                                     _endpoint->stop();
1715                                 }
1716                                 {
1717                                     std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1718                                     if (is_last_stop_callback(its_callback_id)) {
1719                                         erase_offer_command(_service, _instance);
1720                                     }
1721                                 }
1722                             }, ANY_SERVICE);
1723                     }
1724                     // Clear service info and service group
1725                     clear_service_info(_service, _instance, ep->is_reliable());
1726                 }
1727             }
1728             {
1729                 std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1730                 if (is_last_stop_callback(its_callback_id)) {
1731                     erase_offer_command(_service, _instance);
1732                 }
1733             }
1734         };
1735 
1736         // determine callback count
1737         for (const auto& ep : {its_reliable_endpoint, its_unreliable_endpoint}) {
1738             if (ep) {
1739                 std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1740                 auto found_id = callback_counts_.find(its_callback_id);
1741                 if (found_id != callback_counts_.end()) {
1742                     found_id->second++;
1743                 } else {
1744                     callback_counts_[its_callback_id] = 1;
1745                 }
1746             }
1747         }
1748         for (const auto& ep : {its_reliable_endpoint, its_unreliable_endpoint}) {
1749             if (ep) {
1750                 ep->prepare_stop(callback, _service);
1751             }
1752         }
1753 
1754         if (!its_reliable_endpoint && !its_unreliable_endpoint) {
1755             {
1756                 std::lock_guard<std::mutex> its_lock(callback_counts_mutex_);
1757                 callback_counts_.erase(its_callback_id);
1758             }
1759             erase_offer_command(_service, _instance);
1760         }
1761 
1762         std::set<std::shared_ptr<eventgroupinfo> > its_eventgroup_info_set;
1763         {
1764             std::lock_guard<std::mutex> its_eventgroups_lock(eventgroups_mutex_);
1765             auto find_service = eventgroups_.find(_service);
1766             if (find_service != eventgroups_.end()) {
1767                 auto find_instance = find_service->second.find(_instance);
1768                 if (find_instance != find_service->second.end()) {
1769                     for (auto e : find_instance->second) {
1770                         its_eventgroup_info_set.insert(e.second);
1771                     }
1772                 }
1773             }
1774         }
1775 
1776         for (auto e : its_eventgroup_info_set) {
1777             e->clear_remote_subscriptions();
1778         }
1779     } else {
1780         erase_offer_command(_service, _instance);
1781     }
1782 }
1783 
deliver_message(const byte_t * _data,length_t _size,instance_t _instance,bool _reliable,client_t _bound_client,credentials_t _credentials,uint8_t _status_check,bool _is_from_remote)1784 bool routing_manager_impl::deliver_message(const byte_t *_data, length_t _size,
1785         instance_t _instance, bool _reliable, client_t _bound_client, credentials_t _credentials,
1786         uint8_t _status_check, bool _is_from_remote) {
1787     bool is_delivered(false);
1788     std::uint32_t its_sender_uid = std::get<0>(_credentials);
1789     std::uint32_t its_sender_gid = std::get<1>(_credentials);
1790 
1791     auto its_deserializer = get_deserializer();
1792     its_deserializer->set_data(_data, _size);
1793     std::shared_ptr<message_impl> its_message(its_deserializer->deserialize_message());
1794     its_deserializer->reset();
1795     put_deserializer(its_deserializer);
1796 
1797     if (its_message) {
1798         its_message->set_instance(_instance);
1799         its_message->set_reliable(_reliable);
1800         its_message->set_check_result(_status_check);
1801         its_message->set_uid(std::get<0>(_credentials));
1802         its_message->set_gid(std::get<1>(_credentials));
1803 
1804         if (!_is_from_remote) {
1805             if (utility::is_notification(its_message->get_message_type())) {
1806                 if (!is_response_allowed(_bound_client, its_message->get_service(),
1807                         its_message->get_instance(), its_message->get_method())) {
1808                     VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1809                             << " : routing_manager_impl::deliver_message: "
1810                             << std::hex << " received a notification from client 0x" << _bound_client
1811                             << " which does not offer service/instance/event "
1812                             << its_message->get_service() << "/" << its_message->get_instance()
1813                             << "/" << its_message->get_method()
1814                             << " ~> Skip message!";
1815                     return false;
1816                 } else {
1817                     if (!security::get()->is_client_allowed(own_uid_, own_gid_,
1818                             get_client(), its_message->get_service(),
1819                             its_message->get_instance(), its_message->get_method())) {
1820                         VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1821                                 << " : routing_manager_impl::deliver_message: "
1822                                 << " isn't allowed to receive a notification from service/instance/event "
1823                                 << its_message->get_service() << "/" << its_message->get_instance()
1824                                 << "/" << its_message->get_method()
1825                                 << " respectively from client 0x" << _bound_client
1826                                 << " ~> Skip message!";
1827                         return false;
1828                     }
1829                 }
1830             } else if (utility::is_request(its_message->get_message_type())) {
1831                 if (security::get()->is_enabled()
1832                         && its_message->get_client() != _bound_client) {
1833                     VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1834                             << " : routing_manager_impl::deliver_message:"
1835                             << " received a request from client 0x" << std::setw(4) << std::setfill('0')
1836                             << its_message->get_client() << " to service/instance/method "
1837                             << its_message->get_service() << "/" << its_message->get_instance()
1838                             << "/" << its_message->get_method() << " which doesn't match the bound client 0x"
1839                             << std::setw(4) << std::setfill('0') << _bound_client
1840                             << " ~> Skip message!";
1841                     return false;
1842                 }
1843 
1844                 if (!security::get()->is_client_allowed(its_sender_uid, its_sender_gid,
1845                         its_message->get_client(), its_message->get_service(),
1846                         its_message->get_instance(), its_message->get_method())) {
1847                     VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1848                             << " : routing_manager_impl::deliver_message: "
1849                             << " isn't allowed to send a request to service/instance/method "
1850                             << its_message->get_service() << "/" << its_message->get_instance()
1851                             << "/" << its_message->get_method()
1852                             << " ~> Skip message!";
1853                     return false;
1854                 }
1855             } else { // response
1856                 if (!is_response_allowed(_bound_client, its_message->get_service(),
1857                         its_message->get_instance(), its_message->get_method())) {
1858                     VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1859                             << " : routing_manager_impl::deliver_message: "
1860                             << " received a response from client 0x" << _bound_client
1861                             << " which does not offer service/instance/method "
1862                             << its_message->get_service() << "/" << its_message->get_instance()
1863                             << "/" << its_message->get_method()
1864                             << " ~> Skip message!";
1865                     return false;
1866                 } else {
1867                     if (!security::get()->is_client_allowed(own_uid_, own_gid_,
1868                             get_client(), its_message->get_service(),
1869                             its_message->get_instance(), its_message->get_method())) {
1870                         VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1871                                 << " : routing_manager_impl::deliver_message: "
1872                                 << " isn't allowed to receive a response from service/instance/method "
1873                                 << its_message->get_service() << "/" << its_message->get_instance()
1874                                 << "/" << its_message->get_method()
1875                                 << " respectively from client 0x" << _bound_client
1876                                 << " ~> Skip message!";
1877                         return false;
1878                     }
1879                 }
1880             }
1881         } else {
1882             if (!security::get()->is_remote_client_allowed()) {
1883                 // if the message is from remote, check if
1884                 // policy allows remote requests.
1885                 VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1886                         << " : routing_manager_impl::deliver_message: "
1887                         << std::hex << "Remote clients are not allowed"
1888                         << " to communicate with service/instance/method "
1889                         << its_message->get_service() << "/" << its_message->get_instance()
1890                         << "/" << its_message->get_method()
1891                         << " respectively with client 0x" << get_client()
1892                         << " ~> Skip message!";
1893                 return false;
1894             } else if (utility::is_notification(its_message->get_message_type())) {
1895                 if (!security::get()->is_client_allowed(own_uid_, own_gid_,
1896                         get_client(), its_message->get_service(),
1897                         its_message->get_instance(), its_message->get_method())) {
1898                     VSOMEIP_WARNING << "vSomeIP Security: Client 0x" << std::hex << get_client()
1899                             << " : routing_manager_impl::deliver_message: "
1900                             << " isn't allowed to receive a notification from service/instance/event "
1901                             << its_message->get_service() << "/" << its_message->get_instance()
1902                             << "/" << its_message->get_method()
1903                             << " respectively from remote client"
1904                             << " ~> Skip message!";
1905                     return false;
1906                 }
1907             }
1908         }
1909 
1910         host_->on_message(std::move(its_message));
1911         is_delivered = true;
1912     } else {
1913         VSOMEIP_ERROR << "Routing manager: deliver_message: "
1914                       << "SomeIP-Header deserialization failed!";
1915     }
1916     return is_delivered;
1917 }
1918 
deliver_notification(service_t _service,instance_t _instance,const byte_t * _data,length_t _length,bool _reliable,client_t _bound_client,credentials_t _credentials,uint8_t _status_check,bool _is_from_remote)1919 bool routing_manager_impl::deliver_notification(
1920         service_t _service, instance_t _instance,
1921         const byte_t *_data, length_t _length,
1922         bool _reliable, client_t _bound_client,
1923         credentials_t _credentials,
1924         uint8_t _status_check, bool _is_from_remote) {
1925     event_t its_event_id = VSOMEIP_BYTES_TO_WORD(
1926             _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
1927     client_t its_client_id = VSOMEIP_BYTES_TO_WORD(
1928             _data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]);
1929 
1930     std::shared_ptr<event> its_event = find_event(_service, _instance, its_event_id);
1931     if (its_event) {
1932         if (!its_event->is_provided()) {
1933             if (its_event->get_subscribers().size() == 0) {
1934                 // no subscribers for this specific event / check subscriptions
1935                 // to other events of the event's eventgroups
1936                 bool cache_event = false;
1937                 for (const auto& eg : its_event->get_eventgroups()) {
1938                     std::shared_ptr<eventgroupinfo> egi = find_eventgroup(_service, _instance, eg);
1939                     if (egi) {
1940                         for (const auto &e : egi->get_events()) {
1941                             cache_event = (e->get_subscribers().size() > 0);
1942                             if (cache_event) {
1943                                 break;
1944                             }
1945                         }
1946                         if (cache_event) {
1947                             break;
1948                         }
1949                     }
1950                 }
1951                 if (!cache_event) {
1952                     VSOMEIP_WARNING << __func__ << ": dropping ["
1953                             << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1954                             << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
1955                             << std::hex << std::setw(4) << std::setfill('0') << its_event_id
1956                             << "]. No subscription to corresponding eventgroup.";
1957                     return true; // as there is nothing to do
1958                 }
1959             }
1960             const uint32_t its_length(utility::get_payload_size(_data, _length));
1961             if (its_length != _length - VSOMEIP_FULL_HEADER_SIZE) {
1962                 VSOMEIP_ERROR << "Message length mismatch, dropping message!";
1963                 return false;
1964             }
1965             std::shared_ptr<payload> its_payload
1966                 = runtime::get()->create_payload(&_data[VSOMEIP_PAYLOAD_POS],
1967                                                  its_length);
1968             if (!its_event->set_payload_dont_notify(its_payload)) {
1969                 // do not forward the notification as it was filtered
1970                 return false;
1971             }
1972         }
1973 
1974         // incoming events statistics
1975         (void) insert_event_statistics(
1976                 _service,
1977                 _instance,
1978                 its_event_id,
1979                 utility::get_payload_size(_data, _length));
1980 
1981         if (its_event->get_type() != event_type_e::ET_SELECTIVE_EVENT) {
1982             for (const auto& its_local_client : its_event->get_subscribers()) {
1983                 if (its_local_client == host_->get_client()) {
1984                     deliver_message(_data, _length, _instance, _reliable,
1985                             _bound_client, _credentials, _status_check, _is_from_remote);
1986                 } else {
1987                     std::shared_ptr<endpoint> its_local_target = find_local(its_local_client);
1988                     if (its_local_target) {
1989                         send_local(its_local_target, VSOMEIP_ROUTING_CLIENT,
1990                                 _data, _length, _instance, _reliable, VSOMEIP_SEND, _status_check);
1991                     }
1992                 }
1993             }
1994         } else {
1995             // TODO: Check whether it makes more sense to set the client id
1996             // for internal selective events. This would create some extra
1997             // effort but we could avoid this hack.
1998             if (its_client_id == VSOMEIP_ROUTING_CLIENT)
1999                 its_client_id = get_client();
2000 
2001             auto its_subscribers = its_event->get_subscribers();
2002             if (its_subscribers.find(its_client_id) != its_subscribers.end()) {
2003                 if (its_client_id == host_->get_client()) {
2004                     deliver_message(_data, _length, _instance, _reliable,
2005                             _bound_client, _credentials, _status_check, _is_from_remote);
2006                 } else {
2007                     std::shared_ptr<endpoint> its_local_target = find_local(its_client_id);
2008                     if (its_local_target) {
2009                         send_local(its_local_target, VSOMEIP_ROUTING_CLIENT,
2010                                 _data, _length, _instance, _reliable, VSOMEIP_SEND, _status_check);
2011                     }
2012                 }
2013             }
2014         }
2015     } else {
2016         VSOMEIP_WARNING << __func__ << ": Event ["
2017                 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2018                 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2019                 << std::hex << std::setw(4) << std::setfill('0') << its_event_id << "]"
2020                 << " is not registered. The message is dropped.";
2021     }
2022 
2023     return true;
2024 }
2025 
find_eventgroup(service_t _service,instance_t _instance,eventgroup_t _eventgroup) const2026 std::shared_ptr<eventgroupinfo> routing_manager_impl::find_eventgroup(
2027         service_t _service, instance_t _instance,
2028         eventgroup_t _eventgroup) const {
2029     return routing_manager_base::find_eventgroup(_service, _instance, _eventgroup);
2030 }
2031 
create_service_discovery_endpoint(const std::string & _address,uint16_t _port,bool _reliable)2032 std::shared_ptr<endpoint> routing_manager_impl::create_service_discovery_endpoint(
2033         const std::string &_address, uint16_t _port, bool _reliable) {
2034     std::shared_ptr<endpoint> its_service_endpoint =
2035             ep_mgr_impl_->find_server_endpoint(_port, _reliable);
2036     if (!its_service_endpoint) {
2037         try {
2038             its_service_endpoint =
2039                     ep_mgr_impl_->create_server_endpoint(_port,
2040                             _reliable, true);
2041 
2042             if (its_service_endpoint) {
2043                 sd_info_ = std::make_shared<serviceinfo>(
2044                         VSOMEIP_SD_SERVICE, VSOMEIP_SD_INSTANCE,
2045                         ANY_MAJOR, ANY_MINOR, DEFAULT_TTL,
2046                         false); // false, because we do _not_ want to announce it...
2047                 sd_info_->set_endpoint(its_service_endpoint, _reliable);
2048                 its_service_endpoint->add_default_target(VSOMEIP_SD_SERVICE,
2049                         _address, _port);
2050                 if (!_reliable) {
2051                     auto its_udp_server_endpoint_impl = std::dynamic_pointer_cast<
2052                             udp_server_endpoint_impl>(its_service_endpoint);
2053                     if (its_udp_server_endpoint_impl)
2054                         its_udp_server_endpoint_impl->join(_address);
2055                 }
2056             } else {
2057                 VSOMEIP_ERROR<< "Service Discovery endpoint could not be created. "
2058                 "Please check your network configuration.";
2059             }
2060         } catch (const std::exception &e) {
2061             VSOMEIP_ERROR << "Server endpoint creation failed: Service "
2062                     "Discovery endpoint could not be created: " << e.what();
2063         }
2064     }
2065     return its_service_endpoint;
2066 }
2067 
get_offered_services() const2068 services_t routing_manager_impl::get_offered_services() const {
2069     services_t its_services;
2070     for (const auto& s : get_services()) {
2071         for (const auto& i : s.second) {
2072             if (i.second->is_local()) {
2073                 its_services[s.first][i.first] = i.second;
2074             }
2075         }
2076     }
2077     return its_services;
2078 }
2079 
get_offered_service(service_t _service,instance_t _instance) const2080 std::shared_ptr<serviceinfo> routing_manager_impl::get_offered_service(
2081         service_t _service, instance_t _instance) const {
2082     std::shared_ptr<serviceinfo> its_info;
2083     its_info = find_service(_service, _instance);
2084     if (its_info && !its_info->is_local()) {
2085         its_info.reset();
2086     }
2087     return its_info;
2088 }
2089 
2090 std::map<instance_t, std::shared_ptr<serviceinfo>>
get_offered_service_instances(service_t _service) const2091 routing_manager_impl::get_offered_service_instances(service_t _service) const {
2092     std::map<instance_t, std::shared_ptr<serviceinfo>> its_instances;
2093     const services_t its_services(get_services());
2094     const auto found_service = its_services.find(_service);
2095     if (found_service != its_services.end()) {
2096         for (const auto& i : found_service->second) {
2097             if (i.second->is_local()) {
2098                 its_instances[i.first] = i.second;
2099             }
2100         }
2101     }
2102     return its_instances;
2103 }
2104 
2105 ///////////////////////////////////////////////////////////////////////////////
2106 // PRIVATE
2107 ///////////////////////////////////////////////////////////////////////////////
init_service_info(service_t _service,instance_t _instance,bool _is_local_service)2108 void routing_manager_impl::init_service_info(
2109         service_t _service, instance_t _instance, bool _is_local_service) {
2110     std::shared_ptr<serviceinfo> its_info = find_service(_service, _instance);
2111     if (!its_info) {
2112         VSOMEIP_ERROR << "routing_manager_impl::init_service_info: couldn't "
2113                 "find serviceinfo for service: ["
2114                 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2115                 << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"
2116                 << " is_local_service=" << _is_local_service;
2117         return;
2118     }
2119     if (configuration_) {
2120         // Create server endpoints for local services only
2121         if (_is_local_service) {
2122             const bool is_someip = configuration_->is_someip(_service, _instance);
2123             uint16_t its_reliable_port = configuration_->get_reliable_port(
2124                     _service, _instance);
2125             bool _is_found(false);
2126             if (ILLEGAL_PORT != its_reliable_port) {
2127                 std::shared_ptr<endpoint> its_reliable_endpoint  =
2128                         ep_mgr_impl_->find_or_create_server_endpoint(
2129                                 its_reliable_port, true, is_someip, _service,
2130                                 _instance, _is_found);
2131                 if (its_reliable_endpoint) {
2132                     its_info->set_endpoint(its_reliable_endpoint, true);
2133                 }
2134             }
2135             uint16_t its_unreliable_port = configuration_->get_unreliable_port(
2136                     _service, _instance);
2137             if (ILLEGAL_PORT != its_unreliable_port) {
2138                 std::shared_ptr<endpoint> its_unreliable_endpoint =
2139                         ep_mgr_impl_->find_or_create_server_endpoint(
2140                                 its_unreliable_port, false, is_someip, _service,
2141                                 _instance, _is_found);
2142                 if (its_unreliable_endpoint) {
2143                     its_info->set_endpoint(its_unreliable_endpoint, false);
2144                 }
2145             }
2146 
2147             if (ILLEGAL_PORT == its_reliable_port
2148                    && ILLEGAL_PORT == its_unreliable_port) {
2149                    VSOMEIP_INFO << "Port configuration missing for ["
2150                            << std::hex << _service << "." << _instance
2151                            << "]. Service is internal.";
2152             }
2153         }
2154     } else {
2155         VSOMEIP_ERROR << "Missing vsomeip configuration.";
2156     }
2157 }
2158 
remove_local(client_t _client,bool _remove_uid)2159 void routing_manager_impl::remove_local(client_t _client, bool _remove_uid) {
2160     auto clients_subscriptions = get_subscriptions(_client);
2161     {
2162         std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
2163         for (const auto& s : clients_subscriptions) {
2164             remote_subscription_state_.erase(std::tuple_cat(s, std::make_tuple(_client)));
2165         }
2166     }
2167     routing_manager_base::remove_local(_client, clients_subscriptions, _remove_uid);
2168 
2169     std::forward_list<std::pair<service_t, instance_t>> services_to_release_;
2170     {
2171         std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
2172         auto its_client = requested_services_.find(_client);
2173         if (its_client != requested_services_.end()) {
2174             for (const auto& its_service : its_client->second) {
2175                 for (const auto& its_instance : its_service.second) {
2176                     services_to_release_.push_front(
2177                         { its_service.first, its_instance.first });
2178                 }
2179             }
2180         }
2181     }
2182     for (const auto &s : services_to_release_) {
2183         release_service(_client, s.first, s.second);
2184     }
2185 }
2186 
is_field(service_t _service,instance_t _instance,event_t _event) const2187 bool routing_manager_impl::is_field(service_t _service, instance_t _instance,
2188         event_t _event) const {
2189     std::lock_guard<std::mutex> its_lock(events_mutex_);
2190     auto find_service = events_.find(_service);
2191     if (find_service != events_.end()) {
2192         auto find_instance = find_service->second.find(_instance);
2193         if (find_instance != find_service->second.end()) {
2194             auto find_event = find_instance->second.find(_event);
2195             if (find_event != find_instance->second.end())
2196                 return find_event->second->is_field();
2197         }
2198     }
2199     return false;
2200 }
2201 
2202 //only called from the SD
add_routing_info(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,ttl_t _ttl,const boost::asio::ip::address & _reliable_address,uint16_t _reliable_port,const boost::asio::ip::address & _unreliable_address,uint16_t _unreliable_port)2203 void routing_manager_impl::add_routing_info(
2204         service_t _service, instance_t _instance,
2205         major_version_t _major, minor_version_t _minor, ttl_t _ttl,
2206         const boost::asio::ip::address &_reliable_address,
2207         uint16_t _reliable_port,
2208         const boost::asio::ip::address &_unreliable_address,
2209         uint16_t _unreliable_port) {
2210 
2211     std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
2212     if (routing_state_ == routing_state_e::RS_SUSPENDED) {
2213         VSOMEIP_INFO << "rmi::" << __func__ << " We are suspened --> do nothing.";
2214         return;
2215     }
2216 
2217     // Create/Update service info
2218     std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
2219     if (!its_info) {
2220         boost::asio::ip::address its_unicast_address
2221             = configuration_->get_unicast_address();
2222         bool is_local(false);
2223         if (_reliable_port != ILLEGAL_PORT
2224                 && its_unicast_address == _reliable_address)
2225             is_local = true;
2226         else if (_unreliable_port != ILLEGAL_PORT
2227                 && its_unicast_address == _unreliable_address)
2228             is_local = true;
2229 
2230         its_info = create_service_info(_service, _instance, _major, _minor, _ttl, is_local);
2231         init_service_info(_service, _instance, is_local);
2232     } else if (its_info->is_local()) {
2233         // We received a service info for a service which is already offered locally
2234         VSOMEIP_ERROR << "routing_manager_impl::add_routing_info: "
2235             << "rejecting routing info. Remote: "
2236             << ((_reliable_port != ILLEGAL_PORT) ? _reliable_address.to_string()
2237                     : _unreliable_address.to_string()) << " is trying to offer ["
2238             << std::hex << std::setfill('0') << std::setw(4) << _service << "."
2239             << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
2240             << std::dec << static_cast<std::uint32_t>(_major) << "."
2241             << std::dec << _minor
2242             << "] on port " << ((_reliable_port != ILLEGAL_PORT) ? _reliable_port
2243                     : _unreliable_port) << " offered previously on this node: ["
2244             << std::hex << std::setfill('0') << std::setw(4) << _service << "."
2245             << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
2246             << std::dec << static_cast<std::uint32_t>(its_info->get_major())
2247             << "." << its_info->get_minor() << "]";
2248         return;
2249     } else {
2250         its_info->set_ttl(_ttl);
2251     }
2252 
2253     // Check whether remote services are unchanged
2254     bool is_reliable_known(false);
2255     bool is_unreliable_known(false);
2256     ep_mgr_impl_->is_remote_service_known(_service, _instance, _major,
2257             _minor, _reliable_address, _reliable_port, &is_reliable_known,
2258             _unreliable_address, _unreliable_port, &is_unreliable_known);
2259 
2260     bool udp_inserted(false);
2261     bool tcp_inserted(false);
2262     // Add endpoint(s) if necessary
2263     if (_reliable_port != ILLEGAL_PORT && !is_reliable_known) {
2264         std::shared_ptr<endpoint_definition> endpoint_def_tcp
2265             = endpoint_definition::get(_reliable_address, _reliable_port, true, _service, _instance);
2266         if (_unreliable_port != ILLEGAL_PORT && !is_unreliable_known) {
2267             std::shared_ptr<endpoint_definition> endpoint_def_udp
2268                 = endpoint_definition::get(_unreliable_address, _unreliable_port, false, _service, _instance);
2269             ep_mgr_impl_->add_remote_service_info(_service, _instance,
2270                     endpoint_def_tcp, endpoint_def_udp);
2271             udp_inserted = true;
2272             tcp_inserted = true;
2273         } else {
2274             ep_mgr_impl_->add_remote_service_info(_service, _instance,
2275                     endpoint_def_tcp);
2276             tcp_inserted = true;
2277         }
2278 
2279         // check if service was requested and establish TCP connection if necessary
2280         {
2281             bool connected(false);
2282             std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
2283             for(const auto &client_id : requested_services_) {
2284                 auto found_service = client_id.second.find(_service);
2285                 if (found_service != client_id.second.end()) {
2286                     auto found_instance = found_service->second.find(_instance);
2287                     if (found_instance != found_service->second.end()) {
2288                         for (const auto &major_minor_pair : found_instance->second) {
2289                             if ((major_minor_pair.first == _major
2290                                     || _major == DEFAULT_MAJOR
2291                                     || major_minor_pair.first == ANY_MAJOR)
2292                                     && (major_minor_pair.second <= _minor
2293                                             || _minor == DEFAULT_MINOR
2294                                             || major_minor_pair.second == ANY_MINOR)) {
2295                                 // SWS_SD_00376 establish TCP connection to service
2296                                 // service is marked as available later in on_connect()
2297                                 if(!connected) {
2298                                     if (udp_inserted) {
2299                                         // atomically create reliable and unreliable endpoint
2300                                         ep_mgr_impl_->find_or_create_remote_client(
2301                                                 _service, _instance);
2302                                     } else {
2303                                         ep_mgr_impl_->find_or_create_remote_client(
2304                                                 _service, _instance, true);
2305                                     }
2306                                     connected = true;
2307                                 }
2308                                 its_info->add_client(client_id.first);
2309                                 break;
2310                             }
2311                         }
2312                     }
2313                 }
2314             }
2315         }
2316     } else if (_reliable_port != ILLEGAL_PORT && is_reliable_known) {
2317         std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
2318         bool connected(false);
2319         for(const auto &client_id : requested_services_) {
2320             auto found_service = client_id.second.find(_service);
2321             if (found_service != client_id.second.end()) {
2322                 auto found_instance = found_service->second.find(_instance);
2323                 if (found_instance != found_service->second.end()) {
2324                     for (const auto &major_minor_pair : found_instance->second) {
2325                         if ((major_minor_pair.first == _major
2326                                 || _major == DEFAULT_MAJOR
2327                                 || major_minor_pair.first == ANY_MAJOR)
2328                                 && (major_minor_pair.second <= _minor
2329                                         || _minor == DEFAULT_MINOR
2330                                         || major_minor_pair.second == ANY_MINOR)) {
2331                             std::shared_ptr<endpoint> ep = its_info->get_endpoint(true);
2332                             if (ep) {
2333                                 if (ep->is_established() &&
2334                                     !stub_->contained_in_routing_info(
2335                                     VSOMEIP_ROUTING_CLIENT, _service, _instance,
2336                                     its_info->get_major(),
2337                                     its_info->get_minor())) {
2338                                     on_availability(_service, _instance,
2339                                             true, its_info->get_major(), its_info->get_minor());
2340                                     stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT,
2341                                             _service, _instance,
2342                                             its_info->get_major(),
2343                                             its_info->get_minor());
2344                                     if (discovery_) {
2345                                         discovery_->on_endpoint_connected(
2346                                                 _service, _instance, ep);
2347                                     }
2348                                 }
2349                             } else {
2350                                 // no endpoint yet, but requested -> create one
2351 
2352                                 // SWS_SD_00376 establish TCP connection to service
2353                                 // service is marked as available later in on_connect()
2354                                 if (!connected) {
2355                                     ep_mgr_impl_->find_or_create_remote_client(
2356                                             _service, _instance, true);
2357                                     connected = true;
2358                                 }
2359                                 its_info->add_client(client_id.first);
2360                             }
2361                             break;
2362                         }
2363                     }
2364                 }
2365             }
2366         }
2367     }
2368 
2369     if (_unreliable_port != ILLEGAL_PORT && !is_unreliable_known) {
2370         if (!udp_inserted) {
2371             std::shared_ptr<endpoint_definition> endpoint_def
2372                 = endpoint_definition::get(_unreliable_address, _unreliable_port, false, _service, _instance);
2373             ep_mgr_impl_->add_remote_service_info(_service, _instance, endpoint_def);
2374             // check if service was requested and increase requester count if necessary
2375             {
2376                 bool connected(false);
2377                 std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
2378                 for (const auto &client_id : requested_services_) {
2379                     const auto found_service = client_id.second.find(_service);
2380                     if (found_service != client_id.second.end()) {
2381                         const auto found_instance = found_service->second.find(
2382                                 _instance);
2383                         if (found_instance != found_service->second.end()) {
2384                             for (const auto &major_minor_pair : found_instance->second) {
2385                                 if ((major_minor_pair.first == _major
2386                                         || _major == DEFAULT_MAJOR
2387                                         || major_minor_pair.first == ANY_MAJOR)
2388                                         && (major_minor_pair.second <= _minor
2389                                                 || _minor == DEFAULT_MINOR
2390                                                 || major_minor_pair.second
2391                                                         == ANY_MINOR)) {
2392                                     if(!connected) {
2393                                         ep_mgr_impl_->find_or_create_remote_client(_service, _instance,
2394                                                 false);
2395                                         connected = true;
2396                                     }
2397                                     its_info->add_client(client_id.first);
2398                                     break;
2399                                 }
2400                             }
2401                         }
2402                     }
2403                 }
2404             }
2405         }
2406         if (!is_reliable_known && !tcp_inserted) {
2407             // UDP only service can be marked as available instantly
2408             on_availability(_service, _instance, true, _major, _minor);
2409             stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance, _major, _minor);
2410         }
2411         if (discovery_) {
2412             std::shared_ptr<endpoint> ep = its_info->get_endpoint(false);
2413             if (ep && ep->is_established()) {
2414                 discovery_->on_endpoint_connected(_service, _instance, ep);
2415             }
2416         }
2417     } else if (_unreliable_port != ILLEGAL_PORT && is_unreliable_known) {
2418         std::lock_guard<std::mutex> its_lock(requested_services_mutex_);
2419         for(const auto &client_id : requested_services_) {
2420             auto found_service = client_id.second.find(_service);
2421             if (found_service != client_id.second.end()) {
2422                 auto found_instance = found_service->second.find(_instance);
2423                 if (found_instance != found_service->second.end()) {
2424                     for (const auto &major_minor_pair : found_instance->second) {
2425                         if ((major_minor_pair.first == _major
2426                                 || _major == DEFAULT_MAJOR
2427                                 || major_minor_pair.first == ANY_MAJOR)
2428                                 && (major_minor_pair.second <= _minor
2429                                         || _minor == DEFAULT_MINOR
2430                                         || major_minor_pair.second == ANY_MINOR)) {
2431                             if (_reliable_port == ILLEGAL_PORT && !is_reliable_known &&
2432                                     !stub_->contained_in_routing_info(
2433                                     VSOMEIP_ROUTING_CLIENT, _service, _instance,
2434                                     its_info->get_major(),
2435                                     its_info->get_minor())) {
2436                                 on_availability(_service, _instance,
2437                                         true, its_info->get_major(), its_info->get_minor());
2438                                 stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT,
2439                                         _service, _instance,
2440                                         its_info->get_major(),
2441                                         its_info->get_minor());
2442                                 if (discovery_) {
2443                                     std::shared_ptr<endpoint> ep = its_info->get_endpoint(false);
2444                                     if (ep && ep->is_established()) {
2445                                         discovery_->on_endpoint_connected(
2446                                                 _service, _instance,
2447                                                 ep);
2448                                     }
2449                                 }
2450                             }
2451                             break;
2452                         }
2453                     }
2454                 }
2455             }
2456         }
2457     }
2458 }
2459 
del_routing_info(service_t _service,instance_t _instance,bool _has_reliable,bool _has_unreliable)2460 void routing_manager_impl::del_routing_info(service_t _service, instance_t _instance,
2461         bool _has_reliable, bool _has_unreliable) {
2462 
2463     std::shared_ptr<serviceinfo> its_info(find_service(_service, _instance));
2464     if(!its_info)
2465         return;
2466 
2467     on_availability(_service, _instance, false,
2468             its_info->get_major(), its_info->get_minor());
2469     stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance,
2470             its_info->get_major(), its_info->get_minor());
2471     // Implicit unsubscribe
2472 
2473     std::vector<std::shared_ptr<event>> its_events;
2474     {
2475         std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
2476         auto found_service = eventgroups_.find(_service);
2477         if (found_service != eventgroups_.end()) {
2478             auto found_instance = found_service->second.find(_instance);
2479             if (found_instance != found_service->second.end()) {
2480                 for (auto &its_eventgroup : found_instance->second) {
2481                     // As the service is gone, all subscriptions to its events
2482                     // do no longer exist and the last received payload is no
2483                     // longer valid.
2484                     for (auto &its_event : its_eventgroup.second->get_events()) {
2485                         const auto& its_subscribers = its_event->get_subscribers();
2486                         for (const auto its_subscriber : its_subscribers) {
2487                             if (its_subscriber != get_client()) {
2488                                 its_event->remove_subscriber(
2489                                         its_eventgroup.first, its_subscriber);
2490                             }
2491                         }
2492                         its_events.push_back(its_event);
2493                     }
2494                 }
2495             }
2496         }
2497     }
2498     for (const auto& e : its_events) {
2499         e->unset_payload(true);
2500     }
2501 
2502     {
2503         std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
2504         std::set<std::tuple<
2505             service_t, instance_t, eventgroup_t, client_t> > its_invalid;
2506 
2507         for (const auto &its_state : remote_subscription_state_) {
2508             if (std::get<0>(its_state.first) == _service
2509                     && std::get<1>(its_state.first) == _instance) {
2510                 its_invalid.insert(its_state.first);
2511             }
2512         }
2513 
2514         for (const auto &its_key : its_invalid)
2515             remote_subscription_state_.erase(its_key);
2516     }
2517 
2518     {
2519         std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
2520         auto found_service = remote_subscribers_.find(_service);
2521         if (found_service != remote_subscribers_.end()) {
2522             if (found_service->second.erase(_instance) > 0 &&
2523                     !found_service->second.size()) {
2524                 remote_subscribers_.erase(found_service);
2525             }
2526         }
2527     }
2528 
2529     if (_has_reliable) {
2530         ep_mgr_impl_->clear_client_endpoints(_service, _instance, true);
2531         ep_mgr_impl_->clear_remote_service_info(_service, _instance, true);
2532     }
2533     if (_has_unreliable) {
2534         ep_mgr_impl_->clear_client_endpoints(_service, _instance, false);
2535         ep_mgr_impl_->clear_remote_service_info(_service, _instance, false);
2536     }
2537 
2538     ep_mgr_impl_->clear_multicast_endpoints(_service, _instance);
2539 
2540     if (_has_reliable)
2541         clear_service_info(_service, _instance, true);
2542     if (_has_unreliable)
2543         clear_service_info(_service, _instance, false);
2544 
2545     // For expired services using only unreliable endpoints that have never been created before
2546     if (!_has_reliable && !_has_unreliable) {
2547         ep_mgr_impl_->clear_remote_service_info(_service, _instance, true);
2548         ep_mgr_impl_->clear_remote_service_info(_service, _instance, false);
2549         clear_service_info(_service, _instance, true);
2550         clear_service_info(_service, _instance, false);
2551     }
2552 }
2553 
update_routing_info(std::chrono::milliseconds _elapsed)2554 void routing_manager_impl::update_routing_info(std::chrono::milliseconds _elapsed) {
2555     std::map<service_t, std::vector<instance_t> > its_expired_offers;
2556 
2557     {
2558         std::lock_guard<std::mutex> its_lock(services_remote_mutex_);
2559         for (const auto &s : services_remote_) {
2560             for (const auto &i : s.second) {
2561                 ttl_t its_ttl = i.second->get_ttl();
2562                 if (its_ttl < DEFAULT_TTL) { // do not touch "forever"
2563                     std::chrono::milliseconds precise_ttl = i.second->get_precise_ttl();
2564                     if (precise_ttl.count() < _elapsed.count() || precise_ttl.count() == 0) {
2565                         i.second->set_ttl(0);
2566                         its_expired_offers[s.first].push_back(i.first);
2567                     } else {
2568                         std::chrono::milliseconds its_new_ttl(precise_ttl - _elapsed);
2569                         i.second->set_precise_ttl(its_new_ttl);
2570                     }
2571                 }
2572             }
2573         }
2574     }
2575 
2576     for (const auto &s : its_expired_offers) {
2577         for (const auto &i : s.second) {
2578             if (discovery_) {
2579                 discovery_->unsubscribe_all(s.first, i);
2580             }
2581             del_routing_info(s.first, i, true, true);
2582             VSOMEIP_INFO << "update_routing_info: elapsed=" << _elapsed.count()
2583                     << " : delete service/instance "
2584                     << std::hex << std::setw(4) << std::setfill('0') << s.first
2585                     << "." << std::hex << std::setw(4) << std::setfill('0') << i;
2586         }
2587     }
2588 }
2589 
expire_services(const boost::asio::ip::address & _address)2590 void routing_manager_impl::expire_services(
2591         const boost::asio::ip::address &_address) {
2592     expire_services(_address, configuration::port_range_t(ANY_PORT, ANY_PORT),
2593             false);
2594 }
2595 
expire_services(const boost::asio::ip::address & _address,std::uint16_t _port,bool _reliable)2596 void routing_manager_impl::expire_services(
2597         const boost::asio::ip::address &_address, std::uint16_t _port,
2598         bool _reliable) {
2599     expire_services(_address, configuration::port_range_t(_port, _port),
2600             _reliable);
2601 }
2602 
expire_services(const boost::asio::ip::address & _address,const configuration::port_range_t & _range,bool _reliable)2603 void routing_manager_impl::expire_services(
2604         const boost::asio::ip::address &_address,
2605         const configuration::port_range_t& _range, bool _reliable) {
2606     std::map<service_t, std::vector<instance_t> > its_expired_offers;
2607 
2608     const bool expire_all = (_range.first == ANY_PORT
2609             && _range.second == ANY_PORT);
2610 
2611     for (auto &s : get_services_remote()) {
2612         for (auto &i : s.second) {
2613             boost::asio::ip::address its_address;
2614             std::shared_ptr<client_endpoint> its_client_endpoint =
2615                     std::dynamic_pointer_cast<client_endpoint>(
2616                             i.second->get_endpoint(_reliable));
2617             if (!its_client_endpoint && expire_all) {
2618                 its_client_endpoint = std::dynamic_pointer_cast<client_endpoint>(
2619                                 i.second->get_endpoint(!_reliable));
2620             }
2621             if (its_client_endpoint) {
2622                 if ((expire_all || (its_client_endpoint->get_remote_port() >= _range.first
2623                                     && its_client_endpoint->get_remote_port() <= _range.second))
2624                         && its_client_endpoint->get_remote_address(its_address)
2625                         && its_address == _address) {
2626                     if (discovery_) {
2627                         discovery_->unsubscribe_all(s.first, i.first);
2628                     }
2629                     its_expired_offers[s.first].push_back(i.first);
2630                 }
2631             }
2632         }
2633     }
2634 
2635     for (auto &s : its_expired_offers) {
2636         for (auto &i : s.second) {
2637             VSOMEIP_INFO << "expire_services for address: " << _address
2638                     << " : delete service/instance "
2639                     << std::hex << std::setw(4) << std::setfill('0') << s.first
2640                     << "." << std::hex << std::setw(4) << std::setfill('0') << i
2641                     << " port [" << std::dec << _range.first << "," << _range.second
2642                     << "] reliability=" << std::boolalpha << _reliable;
2643             del_routing_info(s.first, i, true, true);
2644         }
2645     }
2646 }
2647 
2648 void
expire_subscriptions(const boost::asio::ip::address & _address)2649 routing_manager_impl::expire_subscriptions(
2650         const boost::asio::ip::address &_address) {
2651     expire_subscriptions(_address,
2652             configuration::port_range_t(ANY_PORT, ANY_PORT), false);
2653 }
2654 
2655 void
expire_subscriptions(const boost::asio::ip::address & _address,std::uint16_t _port,bool _reliable)2656 routing_manager_impl::expire_subscriptions(
2657         const boost::asio::ip::address &_address, std::uint16_t _port,
2658         bool _reliable) {
2659     expire_subscriptions(_address, configuration::port_range_t(_port, _port),
2660             _reliable);
2661 }
2662 
2663 void
expire_subscriptions(const boost::asio::ip::address & _address,const configuration::port_range_t & _range,bool _reliable)2664 routing_manager_impl::expire_subscriptions(
2665         const boost::asio::ip::address &_address,
2666         const configuration::port_range_t& _range, bool _reliable) {
2667     const bool expire_all = (_range.first == ANY_PORT
2668             && _range.second == ANY_PORT);
2669 
2670     std::map<service_t,
2671         std::map<instance_t,
2672             std::map<eventgroup_t,
2673                 std::shared_ptr<eventgroupinfo> > > >its_eventgroups;
2674     {
2675         std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
2676         its_eventgroups = eventgroups_;
2677     }
2678     for (const auto &its_service : its_eventgroups) {
2679         for (const auto &its_instance : its_service.second) {
2680             for (const auto &its_eventgroup : its_instance.second) {
2681                 const auto its_info = its_eventgroup.second;
2682                 for (auto its_subscription
2683                         : its_info->get_remote_subscriptions()) {
2684                     // Note: get_remote_subscription delivers a copied
2685                     // set of subscriptions. Thus, its is possible to
2686                     // to remove them within the loop.
2687                     auto its_ep_definition = (_reliable) ?
2688                                     its_subscription->get_reliable() :
2689                                     its_subscription->get_unreliable();
2690 
2691                     if (!its_ep_definition && expire_all)
2692                         its_ep_definition = (!_reliable) ?
2693                                 its_subscription->get_reliable() :
2694                                 its_subscription->get_unreliable();
2695 
2696                     if (its_ep_definition
2697                             && its_ep_definition->get_address() == _address
2698                             && (expire_all ||
2699                                     (its_ep_definition->get_remote_port() >= _range.first
2700                                     && its_ep_definition->get_remote_port() <= _range.second))) {
2701 
2702                         // TODO: Check whether subscriptions to different hosts are valid.
2703                         // IF yes, we probably need to simply reset the corresponding
2704                         // endpoint instead of removing the subscription...
2705                         VSOMEIP_INFO << __func__
2706                                 << ": removing subscription to "
2707                                 << std::hex << its_info->get_service() << "."
2708                                 << std::hex << its_info->get_instance() << "."
2709                                 << std::hex << its_info->get_eventgroup()
2710                                 << " from target "
2711                                 << its_ep_definition->get_address() << ":"
2712                                 << std::dec << its_ep_definition->get_port()
2713                                 << " reliable="
2714                                 << std::boolalpha << its_ep_definition->is_reliable();
2715                         if (expire_all) {
2716                             its_ep_definition = (!its_ep_definition->is_reliable()) ?
2717                                     its_subscription->get_reliable() :
2718                                     its_subscription->get_unreliable();
2719                             if (its_ep_definition) {
2720                                 VSOMEIP_INFO << __func__
2721                                         << ": removing subscription to "
2722                                         << std::hex << its_info->get_service() << "."
2723                                         << std::hex << its_info->get_instance() << "."
2724                                         << std::hex << its_info->get_eventgroup()
2725                                         << " from target "
2726                                         << its_ep_definition->get_address() << ":"
2727                                         << std::dec << its_ep_definition->get_port()
2728                                         << " reliable="
2729                                         << std::boolalpha << its_ep_definition->is_reliable();
2730                             }
2731                         }
2732                         on_remote_unsubscribe(its_subscription);
2733                     }
2734                 }
2735             }
2736         }
2737     }
2738 }
2739 
init_routing_info()2740 void routing_manager_impl::init_routing_info() {
2741     VSOMEIP_INFO<< "Service Discovery disabled. Using static routing information.";
2742     for (auto i : configuration_->get_remote_services()) {
2743         boost::asio::ip::address its_address(
2744                 boost::asio::ip::address::from_string(
2745                     configuration_->get_unicast_address(i.first, i.second)));
2746         uint16_t its_reliable_port
2747             = configuration_->get_reliable_port(i.first, i.second);
2748         uint16_t its_unreliable_port
2749             = configuration_->get_unreliable_port(i.first, i.second);
2750         major_version_t its_major
2751             = configuration_->get_major_version(i.first, i.second);
2752         minor_version_t its_minor
2753             = configuration_->get_minor_version(i.first, i.second);
2754         ttl_t its_ttl
2755             = configuration_->get_ttl(i.first, i.second);
2756 
2757         if (its_reliable_port != ILLEGAL_PORT
2758                 || its_unreliable_port != ILLEGAL_PORT) {
2759 
2760             VSOMEIP_INFO << "Adding static remote service ["
2761                 << std::hex << std::setw(4) << std::setfill('0')
2762                 << i.first << "." << i.second
2763                 << std::dec << ":" <<  +its_major  << "." << its_minor
2764                 << "]";
2765 
2766             add_routing_info(i.first, i.second,
2767                     its_major, its_minor, its_ttl,
2768                     its_address, its_reliable_port,
2769                     its_address, its_unreliable_port);
2770 
2771             if(its_reliable_port != ILLEGAL_PORT) {
2772                 ep_mgr_impl_->find_or_create_remote_client(
2773                         i.first, i.second, true);
2774             }
2775             if(its_unreliable_port != ILLEGAL_PORT) {
2776                 ep_mgr_impl_->find_or_create_remote_client(
2777                         i.first, i.second, false);
2778             }
2779         }
2780     }
2781 }
2782 
on_remote_subscribe(std::shared_ptr<remote_subscription> & _subscription,const remote_subscription_callback_t & _callback)2783 void routing_manager_impl::on_remote_subscribe(
2784         std::shared_ptr<remote_subscription> &_subscription,
2785         const remote_subscription_callback_t &_callback) {
2786     auto its_eventgroupinfo = _subscription->get_eventgroupinfo();
2787     if (!its_eventgroupinfo) {
2788         VSOMEIP_ERROR << __func__ << " eventgroupinfo is invalid";
2789         return;
2790     }
2791     const ttl_t its_ttl = _subscription->get_ttl();
2792 
2793     const auto its_service = its_eventgroupinfo->get_service();
2794     const auto its_instance = its_eventgroupinfo->get_instance();
2795     const auto its_eventgroup = its_eventgroupinfo->get_eventgroup();
2796     const auto its_major = its_eventgroupinfo->get_major();
2797 
2798     // Get remote port(s)
2799     auto its_reliable = _subscription->get_reliable();
2800     if (its_reliable) {
2801         uint16_t its_port
2802             = configuration_->get_reliable_port(its_service, its_instance);
2803         its_reliable->set_remote_port(its_port);
2804     }
2805 
2806     auto its_unreliable = _subscription->get_unreliable();
2807     if (its_unreliable) {
2808         uint16_t its_port
2809             = configuration_->get_unreliable_port(its_service, its_instance);
2810         its_unreliable->set_remote_port(its_port);
2811     }
2812 
2813     // Calculate expiration time
2814     const std::chrono::steady_clock::time_point its_expiration
2815         = std::chrono::steady_clock::now() + std::chrono::seconds(its_ttl);
2816 
2817     // Try to update the subscription. This will fail, if the subscription does
2818     // not exist or is still (partly) pending.
2819     remote_subscription_id_t its_id;
2820     std::set<client_t> its_added;
2821     update_remote_subscription_mutex_.lock();
2822     auto its_result = its_eventgroupinfo->update_remote_subscription(
2823             _subscription, its_expiration, its_added, its_id, true);
2824     if (its_result) {
2825         if (!_subscription->is_pending()) { // resubscription without change
2826             update_remote_subscription_mutex_.unlock();
2827             _callback(_subscription);
2828         } else if (!its_added.empty()) { // new clients for a selective subscription
2829             const client_t its_offering_client
2830                 = find_local_client(its_service, its_instance);
2831             send_subscription(its_offering_client,
2832                     its_service, its_instance, its_eventgroup, its_major,
2833                     its_added, _subscription->get_id());
2834             update_remote_subscription_mutex_.unlock();
2835         } else { // identical subscription is not yet processed
2836             std::stringstream its_warning;
2837             its_warning << __func__ << " a remote subscription is already pending ["
2838                 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
2839                 << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
2840                 << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]"
2841                 << " from ";
2842             if (its_reliable && its_unreliable)
2843                 its_warning << "[";
2844             if (its_reliable)
2845                 its_warning << its_reliable->get_address().to_string()
2846                     << ":" << std::dec << its_reliable->get_port();
2847             if (its_reliable && its_unreliable)
2848                 its_warning << ", ";
2849             if (its_unreliable)
2850                 its_warning << its_unreliable->get_address().to_string()
2851                     << ":" << std::dec << its_unreliable->get_port();
2852             if (its_reliable && its_unreliable)
2853                 its_warning << "]";
2854             VSOMEIP_WARNING << its_warning.str();
2855 
2856             update_remote_subscription_mutex_.unlock();
2857             _callback(_subscription);
2858         }
2859     } else { // new subscription
2860         if (its_eventgroupinfo->is_remote_subscription_limit_reached(
2861                 _subscription)) {
2862             _subscription->set_all_client_states(
2863                     remote_subscription_state_e::SUBSCRIPTION_NACKED);
2864 
2865             update_remote_subscription_mutex_.unlock();
2866             _callback(_subscription);
2867             return;
2868         }
2869 
2870         auto its_id
2871             = its_eventgroupinfo->add_remote_subscription(_subscription);
2872 
2873         const client_t its_offering_client
2874             = find_local_client(its_service, its_instance);
2875         send_subscription(its_offering_client,
2876                 its_service, its_instance, its_eventgroup, its_major,
2877                 _subscription->get_clients(), its_id);
2878         update_remote_subscription_mutex_.unlock();
2879     }
2880 }
2881 
on_remote_unsubscribe(std::shared_ptr<remote_subscription> & _subscription)2882 void routing_manager_impl::on_remote_unsubscribe(
2883         std::shared_ptr<remote_subscription> &_subscription) {
2884     std::shared_ptr<eventgroupinfo> its_info
2885         = _subscription->get_eventgroupinfo();
2886     if (!its_info) {
2887         VSOMEIP_ERROR << __func__
2888                 << ": Received Unsubscribe for unregistered eventgroup.";
2889         return;
2890     }
2891 
2892     const auto its_service = its_info->get_service();
2893     const auto its_instance = its_info->get_instance();
2894     const auto its_eventgroup = its_info->get_eventgroup();
2895     const auto its_major = its_info->get_major();
2896 
2897     // Get remote port(s)
2898     auto its_reliable = _subscription->get_reliable();
2899     if (its_reliable) {
2900         uint16_t its_port
2901             = configuration_->get_reliable_port(its_service, its_instance);
2902         its_reliable->set_remote_port(its_port);
2903     }
2904 
2905     auto its_unreliable = _subscription->get_unreliable();
2906     if (its_unreliable) {
2907         uint16_t its_port
2908             = configuration_->get_unreliable_port(its_service, its_instance);
2909         its_unreliable->set_remote_port(its_port);
2910     }
2911 
2912     remote_subscription_id_t its_id(0);
2913     std::set<client_t> its_removed;
2914     update_remote_subscription_mutex_.lock();
2915     auto its_result = its_info->update_remote_subscription(
2916             _subscription, std::chrono::steady_clock::now(),
2917             its_removed, its_id, false);
2918 
2919     if (its_result) {
2920         const client_t its_offering_client
2921             = find_local_client(its_service, its_instance);
2922         send_unsubscription(its_offering_client,
2923                 its_service, its_instance, its_eventgroup, its_major,
2924                 its_removed, its_id);
2925     }
2926 
2927     update_remote_subscription_mutex_.unlock();
2928 }
2929 
on_subscribe_ack_with_multicast(service_t _service,instance_t _instance,const boost::asio::ip::address & _sender,const boost::asio::ip::address & _address,uint16_t _port)2930 void routing_manager_impl::on_subscribe_ack_with_multicast(
2931         service_t _service, instance_t _instance,
2932         const boost::asio::ip::address &_sender,
2933         const boost::asio::ip::address &_address, uint16_t _port) {
2934     ep_mgr_impl_->find_or_create_multicast_endpoint(_service,
2935             _instance, _sender, _address, _port);
2936 }
2937 
on_subscribe_ack(client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,remote_subscription_id_t _id)2938 void routing_manager_impl::on_subscribe_ack(client_t _client,
2939         service_t _service, instance_t _instance, eventgroup_t _eventgroup,
2940         event_t _event, remote_subscription_id_t _id) {
2941     std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
2942     auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
2943     if (its_eventgroup) {
2944         auto its_subscription = its_eventgroup->get_remote_subscription(_id);
2945         if (its_subscription) {
2946             its_subscription->set_client_state(_client,
2947                     remote_subscription_state_e::SUBSCRIPTION_ACKED);
2948 
2949             auto its_parent = its_subscription->get_parent();
2950             if (its_parent) {
2951                 its_parent->set_client_state(_client,
2952                         remote_subscription_state_e::SUBSCRIPTION_ACKED);
2953                 if (!its_subscription->is_pending()) {
2954                     its_eventgroup->remove_remote_subscription(_id);
2955                 }
2956             }
2957 
2958             if (discovery_) {
2959                 std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
2960                 remote_subscribers_[_service][_instance][VSOMEIP_ROUTING_CLIENT].insert(
2961                         its_subscription->get_subscriber());
2962                 discovery_->update_remote_subscription(its_subscription);
2963 
2964                 VSOMEIP_INFO << "REMOTE SUBSCRIBE("
2965                     << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
2966                     << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2967                     << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2968                     << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
2969                     << " from " << its_subscription->get_subscriber()->get_address()
2970                     << ":" << std::dec << its_subscription->get_subscriber()->get_port()
2971                     << (its_subscription->get_subscriber()->is_reliable() ? " reliable" : " unreliable")
2972                     << " was accepted";
2973 
2974                 return;
2975             }
2976         } else {
2977             const auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, _client);
2978             const auto its_state = remote_subscription_state_.find(its_tuple);
2979             if (its_state != remote_subscription_state_.end()) {
2980                 if (its_state->second == subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED) {
2981                     // Already notified!
2982                     return;
2983                 }
2984             }
2985             remote_subscription_state_[its_tuple] = subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED;
2986         }
2987 
2988         std::set<client_t> subscribed_clients;
2989         if (_client == VSOMEIP_ROUTING_CLIENT) {
2990             for (const auto &its_event : its_eventgroup->get_events()) {
2991                 if (_event == ANY_EVENT || _event == its_event->get_event()) {
2992                     const auto &its_subscribers = its_event->get_subscribers();
2993                     subscribed_clients.insert(its_subscribers.begin(), its_subscribers.end());
2994                 }
2995             }
2996         } else {
2997             subscribed_clients.insert(_client);
2998         }
2999 
3000         for (const auto &its_subscriber : subscribed_clients) {
3001             if (its_subscriber == get_client()) {
3002                 if (_event == ANY_EVENT) {
3003                     for (const auto &its_event : its_eventgroup->get_events()) {
3004                         host_->on_subscription_status(_service, _instance,
3005                                 _eventgroup, its_event->get_event(),
3006                                 0x0 /*OK*/);
3007                     }
3008                 } else {
3009                     host_->on_subscription_status(_service, _instance,
3010                             _eventgroup, _event, 0x0 /*OK*/);
3011                 }
3012             } else {
3013                 stub_->send_subscribe_ack(its_subscriber, _service,
3014                         _instance, _eventgroup, _event);
3015             }
3016         }
3017      }
3018 }
3019 
find_or_create_remote_client(service_t _service,instance_t _instance,bool _reliable)3020 std::shared_ptr<endpoint> routing_manager_impl::find_or_create_remote_client(
3021         service_t _service, instance_t _instance, bool _reliable) {
3022     return ep_mgr_impl_->find_or_create_remote_client(_service,
3023             _instance, _reliable);
3024 }
3025 
on_subscribe_nack(client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,remote_subscription_id_t _id,bool _simulated)3026 void routing_manager_impl::on_subscribe_nack(client_t _client,
3027         service_t _service, instance_t _instance, eventgroup_t _eventgroup,
3028         event_t _event, remote_subscription_id_t _id, bool _simulated) {
3029     (void)_event; // TODO: Remove completely?
3030 
3031     auto its_eventgroup = find_eventgroup(_service, _instance, _eventgroup);
3032     if (its_eventgroup) {
3033         auto its_subscription = its_eventgroup->get_remote_subscription(_id);
3034         if (its_subscription) {
3035             if (_simulated) {
3036                 // method was called because a subscription for unoffered
3037                 // service was received. Therefore, remove the remote_subscription
3038                 // from the eventgroupinfo to ensure subsequent similar
3039                 // subscriptions are handled like a new/unknown subscription
3040                 its_eventgroup->remove_remote_subscription(_id);
3041             }
3042             its_subscription->set_client_state(_client,
3043                     remote_subscription_state_e::SUBSCRIPTION_NACKED);
3044 
3045             auto its_parent = its_subscription->get_parent();
3046             if (its_parent) {
3047                 its_parent->set_client_state(_client,
3048                         remote_subscription_state_e::SUBSCRIPTION_NACKED);
3049                 if (!its_subscription->is_pending()) {
3050                     its_eventgroup->remove_remote_subscription(_id);
3051                 }
3052             }
3053 
3054             if (discovery_) {
3055                 discovery_->update_remote_subscription(its_subscription);
3056                 VSOMEIP_INFO << "REMOTE SUBSCRIBE("
3057                     << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
3058                     << std::hex << std::setw(4) << std::setfill('0') << _service << "."
3059                     << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
3060                     << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
3061                     << " from " << its_subscription->get_subscriber()->get_address()
3062                     << ":" << std::dec << its_subscription->get_subscriber()->get_port()
3063                     << (its_subscription->get_subscriber()->is_reliable() ? " reliable" : " unreliable")
3064                     << " was not accepted";
3065             }
3066         }
3067     }
3068 }
3069 
check_error(const byte_t * _data,length_t _size,instance_t _instance)3070 return_code_e routing_manager_impl::check_error(const byte_t *_data, length_t _size,
3071         instance_t _instance) {
3072 
3073     service_t its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
3074             _data[VSOMEIP_SERVICE_POS_MAX]);
3075 
3076     if (_size >= VSOMEIP_PAYLOAD_POS) {
3077         if (utility::is_request(_data[VSOMEIP_MESSAGE_TYPE_POS])
3078                 || utility::is_request_no_return(_data[VSOMEIP_MESSAGE_TYPE_POS]) ) {
3079             if (_data[VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
3080                 VSOMEIP_WARNING << "Received a message with unsupported protocol version for service 0x"
3081                         << std::hex << its_service;
3082                 return return_code_e::E_WRONG_PROTOCOL_VERSION;
3083             }
3084             if (_instance == 0xFFFF) {
3085                 VSOMEIP_WARNING << "Receiving endpoint is not configured for service 0x"
3086                         << std::hex << its_service;
3087                 return return_code_e::E_UNKNOWN_SERVICE;
3088             }
3089             // Check interface version of service/instance
3090             auto its_info = find_service(its_service, _instance);
3091             if (its_info) {
3092                 major_version_t its_version = _data[VSOMEIP_INTERFACE_VERSION_POS];
3093                 if (its_version != its_info->get_major()) {
3094                     VSOMEIP_WARNING << "Received a message with unsupported interface version for service 0x"
3095                             << std::hex << its_service;
3096                     return return_code_e::E_WRONG_INTERFACE_VERSION;
3097                 }
3098             }
3099             if (_data[VSOMEIP_RETURN_CODE_POS] != static_cast<byte_t> (return_code_e::E_OK)) {
3100                 // Request calls must to have return code E_OK set!
3101                 VSOMEIP_WARNING << "Received a message with unsupported return code set for service 0x"
3102                         << std::hex << its_service;
3103                 return return_code_e::E_NOT_OK;
3104             }
3105         }
3106     } else {
3107         // Message shorter than vSomeIP message header
3108         VSOMEIP_WARNING << "Received a message message which is shorter than vSomeIP message header!";
3109         return return_code_e::E_MALFORMED_MESSAGE;
3110     }
3111     return return_code_e::E_OK;
3112 }
3113 
send_error(return_code_e _return_code,const byte_t * _data,length_t _size,instance_t _instance,bool _reliable,endpoint * const _receiver,const boost::asio::ip::address & _remote_address,std::uint16_t _remote_port)3114 void routing_manager_impl::send_error(return_code_e _return_code,
3115         const byte_t *_data, length_t _size,
3116         instance_t _instance, bool _reliable,
3117         endpoint* const _receiver,
3118         const boost::asio::ip::address &_remote_address,
3119         std::uint16_t _remote_port) {
3120 
3121     client_t its_client = 0;
3122     service_t its_service = 0;
3123     method_t its_method = 0;
3124     session_t its_session = 0;
3125     major_version_t its_version = 0;
3126 
3127     if (_size >= VSOMEIP_CLIENT_POS_MAX) {
3128         its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
3129                 _data[VSOMEIP_CLIENT_POS_MAX]);
3130     }
3131     if (_size >= VSOMEIP_SERVICE_POS_MAX) {
3132         its_service = VSOMEIP_BYTES_TO_WORD(
3133                 _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
3134     }
3135     if (_size >= VSOMEIP_METHOD_POS_MAX) {
3136         its_method = VSOMEIP_BYTES_TO_WORD(
3137                 _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
3138     }
3139     if (_size >= VSOMEIP_SESSION_POS_MAX) {
3140         its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
3141                 _data[VSOMEIP_SESSION_POS_MAX]);
3142     }
3143     if( _size >= VSOMEIP_INTERFACE_VERSION_POS) {
3144         its_version = _data[VSOMEIP_INTERFACE_VERSION_POS];
3145     }
3146 
3147     auto error_message = runtime::get()->create_message(_reliable);
3148     error_message->set_client(its_client);
3149     error_message->set_instance(_instance);
3150     error_message->set_interface_version(its_version);
3151     error_message->set_message_type(message_type_e::MT_ERROR);
3152     error_message->set_method(its_method);
3153     error_message->set_return_code(_return_code);
3154     error_message->set_service(its_service);
3155     error_message->set_session(its_session);
3156     {
3157         std::shared_ptr<serializer> its_serializer(get_serializer());
3158         if (its_serializer->serialize(error_message.get())) {
3159             if (_receiver) {
3160                 auto its_endpoint_def = std::make_shared<endpoint_definition>(
3161                         _remote_address, _remote_port,
3162                         _receiver->is_reliable());
3163                 its_endpoint_def->set_remote_port(_receiver->get_local_port());
3164                 std::shared_ptr<endpoint> its_endpoint =
3165                         ep_mgr_impl_->find_server_endpoint(
3166                                 its_endpoint_def->get_remote_port(),
3167                                 its_endpoint_def->is_reliable());
3168                 if (its_endpoint) {
3169                     #ifdef USE_DLT
3170                         const uint16_t its_data_size
3171                             = uint16_t(_size > USHRT_MAX ? USHRT_MAX : _size);
3172 
3173                         trace::header its_header;
3174                         if (its_header.prepare(its_endpoint, true, _instance))
3175                             tc_->trace(its_header.data_, VSOMEIP_TRACE_HEADER_SIZE,
3176                                     _data, its_data_size);
3177                     #else
3178                         (void) _instance;
3179                     #endif
3180                     its_endpoint->send_error(its_endpoint_def,
3181                             its_serializer->get_data(), its_serializer->get_size());
3182                 }
3183             }
3184             its_serializer->reset();
3185             put_serializer(its_serializer);
3186         } else {
3187             VSOMEIP_ERROR<< "Failed to serialize error message.";
3188         }
3189     }
3190 }
3191 
clear_remote_subscriber(service_t _service,instance_t _instance,client_t _client,const std::shared_ptr<endpoint_definition> & _target)3192 void routing_manager_impl::clear_remote_subscriber(
3193         service_t _service, instance_t _instance, client_t _client,
3194         const std::shared_ptr<endpoint_definition> &_target) {
3195     std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
3196     auto its_service = remote_subscribers_.find(_service);
3197     if (its_service != remote_subscribers_.end()) {
3198         auto its_instance = its_service->second.find(_instance);
3199         if (its_instance != its_service->second.end()) {
3200             auto its_client = its_instance->second.find(_client);
3201             if (its_client != its_instance->second.end()) {
3202                 if (its_client->second.erase(_target)) {
3203                     if (!its_client->second.size()) {
3204                         its_instance->second.erase(_client);
3205                     }
3206                 }
3207             }
3208         }
3209     }
3210 }
3211 
3212 std::chrono::steady_clock::time_point
expire_subscriptions(bool _force)3213 routing_manager_impl::expire_subscriptions(bool _force) {
3214     std::map<service_t,
3215         std::map<instance_t,
3216             std::map<eventgroup_t,
3217                 std::shared_ptr<eventgroupinfo> > > >its_eventgroups;
3218     std::map<std::shared_ptr<remote_subscription>,
3219         std::set<client_t> > its_expired_subscriptions;
3220 
3221     std::chrono::steady_clock::time_point now
3222         = std::chrono::steady_clock::now();
3223     std::chrono::steady_clock::time_point its_next_expiration
3224         = std::chrono::steady_clock::now() + std::chrono::hours(24);
3225     {
3226         std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
3227         its_eventgroups = eventgroups_;
3228     }
3229 
3230     for (auto &its_service : its_eventgroups) {
3231         for (auto &its_instance : its_service.second) {
3232             for (auto &its_eventgroup : its_instance.second) {
3233                 auto its_subscriptions
3234                     = its_eventgroup.second->get_remote_subscriptions();
3235                 for (auto &s : its_subscriptions) {
3236                     for (auto its_client : s->get_clients()) {
3237                         if (_force) {
3238                             its_expired_subscriptions[s].insert(its_client);
3239                         } else {
3240                             auto its_expiration = s->get_expiration(its_client);
3241                             if (its_expiration != std::chrono::steady_clock::time_point()) {
3242                                 if (its_expiration < now) {
3243                                     its_expired_subscriptions[s].insert(its_client);
3244                                 } else if (its_expiration < its_next_expiration) {
3245                                     its_next_expiration = its_expiration;
3246                                 }
3247                             }
3248                         }
3249                     }
3250                 }
3251             }
3252         }
3253     }
3254 
3255     for (auto &s : its_expired_subscriptions) {
3256         auto its_info = s.first->get_eventgroupinfo();
3257         if (its_info) {
3258             auto its_service = its_info->get_service();
3259             auto its_instance = its_info->get_instance();
3260             auto its_eventgroup = its_info->get_eventgroup();
3261 
3262             remote_subscription_id_t its_id;
3263             update_remote_subscription_mutex_.lock();
3264             auto its_result = its_info->update_remote_subscription(
3265                     s.first, std::chrono::steady_clock::now(),
3266                     s.second, its_id, false);
3267             if (its_result) {
3268                 const client_t its_offering_client
3269                     = find_local_client(its_service, its_instance);
3270                 const auto its_subscription = its_info->get_remote_subscription(its_id);
3271                 if (its_subscription) {
3272                     its_info->remove_remote_subscription(its_id);
3273 
3274                     std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
3275                     remote_subscribers_[its_service][its_instance].erase(its_offering_client);
3276 
3277                     if (its_info->get_remote_subscriptions().size() == 0) {
3278                         for (const auto &its_event : its_info->get_events()) {
3279                             bool has_remote_subscriber(false);
3280                             for (const auto &its_eventgroup : its_event->get_eventgroups()) {
3281                                const auto its_eventgroup_info
3282                                    = find_eventgroup(its_service, its_instance, its_eventgroup);
3283                                 if (its_eventgroup_info
3284                                         && its_eventgroup_info->get_remote_subscriptions().size() > 0) {
3285                                     has_remote_subscriber = true;
3286                                 }
3287                             }
3288                             if (!has_remote_subscriber && its_event->is_shadow()) {
3289                                 its_event->unset_payload();
3290                             }
3291                         }
3292                     }
3293                 } else {
3294                     VSOMEIP_ERROR << __func__
3295                         << ": Unknown expired subscription " << std::dec << its_id << " for eventgroup ["
3296                         << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
3297                         << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
3298                         << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup << "]";
3299                 }
3300                 send_expired_subscription(its_offering_client,
3301                         its_service, its_instance, its_eventgroup,
3302                         s.second, s.first->get_id());
3303             }
3304             update_remote_subscription_mutex_.unlock();
3305 
3306             if (s.first->get_unreliable()) {
3307                 VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription ["
3308                         << std::hex << std::setfill('0') << std::setw(4) << its_service << "."
3309                         << std::hex << std::setfill('0') << std::setw(4) << its_instance << "."
3310                         << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] unreliable from "
3311                         << s.first->get_unreliable()->get_address() << ":"
3312                         << std::dec << s.first->get_unreliable()->get_port();
3313             }
3314 
3315             if (s.first->get_reliable()) {
3316                 VSOMEIP_INFO << (_force ? "Removed" : "Expired") << " subscription ["
3317                         << std::hex << std::setfill('0') << std::setw(4) << its_service << "."
3318                         << std::hex << std::setfill('0') << std::setw(4) << its_instance << "."
3319                         << std::hex << std::setfill('0') << std::setw(4) << its_eventgroup << "] reliable from "
3320                         << s.first->get_reliable()->get_address() << ":"
3321                         << std::dec << s.first->get_reliable()->get_port();
3322             }
3323         }
3324     }
3325 
3326     return its_next_expiration;
3327 }
3328 
log_version_timer_cbk(boost::system::error_code const & _error)3329 void routing_manager_impl::log_version_timer_cbk(boost::system::error_code const & _error) {
3330     if (!_error) {
3331 
3332 #ifndef VSOMEIP_VERSION
3333 #define VSOMEIP_VERSION "unknown version"
3334 #endif
3335         static int its_counter(0);
3336         static uint32_t its_interval = configuration_->get_log_version_interval();
3337 
3338         bool is_diag_mode(false);
3339 
3340         if (discovery_) {
3341             is_diag_mode = discovery_->get_diagnosis_mode();
3342         }
3343         std::stringstream its_last_resume;
3344         {
3345             std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
3346             if (last_resume_ != std::chrono::steady_clock::time_point::min()) {
3347                 its_last_resume << " | " << std::dec
3348                         << std::chrono::duration_cast<std::chrono::seconds>(
3349                         std::chrono::steady_clock::now() - last_resume_).count() << "s";
3350             }
3351         }
3352 
3353         VSOMEIP_INFO << "vSomeIP " << VSOMEIP_VERSION << " | ("
3354                 << ((is_diag_mode == true) ? "diagnosis)" : "default)")
3355                 << its_last_resume.str();
3356 
3357         its_counter++;
3358         if (its_counter == 6) {
3359             ep_mgr_->log_client_states();
3360             ep_mgr_impl_->log_client_states();
3361             its_counter = 0;
3362         }
3363 
3364         {
3365             std::lock_guard<std::mutex> its_lock(version_log_timer_mutex_);
3366             version_log_timer_.expires_from_now(std::chrono::seconds(its_interval));
3367             version_log_timer_.async_wait(
3368                     std::bind(&routing_manager_impl::log_version_timer_cbk,
3369                               this, std::placeholders::_1));
3370         }
3371     }
3372 }
3373 
handle_local_offer_service(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)3374 bool routing_manager_impl::handle_local_offer_service(client_t _client, service_t _service,
3375         instance_t _instance, major_version_t _major,minor_version_t _minor) {
3376     {
3377         std::lock_guard<std::mutex> its_lock(local_services_mutex_);
3378         auto found_service = local_services_.find(_service);
3379         if (found_service != local_services_.end()) {
3380             auto found_instance = found_service->second.find(_instance);
3381             if (found_instance != found_service->second.end()) {
3382                 const major_version_t its_stored_major(std::get<0>(found_instance->second));
3383                 const minor_version_t its_stored_minor(std::get<1>(found_instance->second));
3384                 const client_t its_stored_client(std::get<2>(found_instance->second));
3385                 if (   its_stored_major == _major
3386                     && its_stored_minor == _minor
3387                     && its_stored_client == _client) {
3388                     VSOMEIP_WARNING << "routing_manager_impl::handle_local_offer_service: "
3389                         << "Application: " << std::hex << std::setfill('0')
3390                         << std::setw(4) << _client << " is offering: ["
3391                         << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3392                         << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3393                         << std::dec << static_cast<std::uint32_t>(_major) << "."
3394                         << _minor << "] offered previously by itself.";
3395                     return false;
3396                 } else if (   its_stored_major == _major
3397                            && its_stored_minor == _minor
3398                            && its_stored_client != _client) {
3399                     // check if previous offering application is still alive
3400                     bool already_pinged(false);
3401                     {
3402                         std::lock_guard<std::mutex> its_lock(pending_offers_mutex_);
3403                         auto found_service2 = pending_offers_.find(_service);
3404                         if (found_service2 != pending_offers_.end()) {
3405                             auto found_instance2 = found_service2->second.find(_instance);
3406                             if (found_instance2 != found_service2->second.end()) {
3407                                 if(std::get<2>(found_instance2->second) == _client) {
3408                                     already_pinged = true;
3409                                 } else {
3410                                     VSOMEIP_ERROR << "routing_manager_impl::handle_local_offer_service: "
3411                                         << "rejecting service registration. Application: "
3412                                         << std::hex << std::setfill('0') << std::setw(4)
3413                                         << _client << " is trying to offer ["
3414                                         << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3415                                         << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3416                                         << std::dec << static_cast<std::uint32_t>(_major) << "."
3417                                         << std::dec << _minor
3418                                         << "] current pending offer by application: " << std::hex
3419                                         << std::setfill('0') << std::setw(4)
3420                                         << its_stored_client << ": ["
3421                                         << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3422                                         << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3423                                         << std::dec << static_cast<std::uint32_t>(its_stored_major)
3424                                         << "." << its_stored_minor << "]";
3425                                     return false;
3426                                 }
3427                             }
3428                         }
3429                     }
3430                     if (!already_pinged) {
3431                         // find out endpoint of previously offering application
3432                         std::shared_ptr<local_client_endpoint_base_impl>
3433                             its_old_endpoint
3434                                 = std::dynamic_pointer_cast<local_client_endpoint_base_impl>(
3435                                         find_local(its_stored_client));
3436                         if (its_old_endpoint) {
3437                             std::lock_guard<std::mutex> its_lock(pending_offers_mutex_);
3438                             if(stub_->send_ping(its_stored_client)) {
3439                                 pending_offers_[_service][_instance] =
3440                                         std::make_tuple(_major, _minor, _client,
3441                                                         its_stored_client);
3442                                 VSOMEIP_WARNING << "OFFER("
3443                                     << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
3444                                     << std::hex << std::setw(4) << std::setfill('0') << _service << "."
3445                                     << std::hex << std::setw(4) << std::setfill('0') << _instance
3446                                     << ":" << std::dec << int(_major) << "." << std::dec << _minor
3447                                     << "] is now pending. Waiting for pong from application: "
3448                                     << std::hex << std::setw(4) << std::setfill('0') << its_stored_client;
3449                                 return false;
3450                             }
3451                         } else if (its_stored_client == host_->get_client()) {
3452                             VSOMEIP_ERROR << "routing_manager_impl::handle_local_offer_service: "
3453                                 << "rejecting service registration. Application: "
3454                                 << std::hex << std::setfill('0') << std::setw(4)
3455                                 << _client << " is trying to offer ["
3456                                 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3457                                 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3458                                 << std::dec << static_cast<std::uint32_t>(_major) << "."
3459                                 << std::dec << _minor
3460                                 << "] offered previously by routing manager stub itself with application: "
3461                                 << std::hex << std::setfill('0') << std::setw(4)
3462                                 << its_stored_client << ": ["
3463                                 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3464                                 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3465                                 << std::dec << static_cast<std::uint32_t>(its_stored_major)
3466                                 << "." << its_stored_minor << "] which is still alive";
3467                             return false;
3468                         }
3469                     } else {
3470                         return false;
3471                     }
3472                 } else {
3473                     VSOMEIP_ERROR << "routing_manager_impl::handle_local_offer_service: "
3474                         << "rejecting service registration. Application: "
3475                         << std::hex << std::setfill('0') << std::setw(4)
3476                         << _client << " is trying to offer ["
3477                         << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3478                         << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3479                         << std::dec << static_cast<std::uint32_t>(_major) << "."
3480                         << std::dec << _minor
3481                         << "] offered previously by application: " << std::hex
3482                         << std::setfill('0') << std::setw(4)
3483                         << its_stored_client << ": ["
3484                         << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3485                         << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3486                         << std::dec << static_cast<std::uint32_t>(its_stored_major)
3487                         << "." << its_stored_minor << "]";
3488                     return false;
3489                 }
3490             }
3491         }
3492 
3493         // check if the same service instance is already offered remotely
3494         if (routing_manager_base::offer_service(_client, _service, _instance,
3495                 _major, _minor)) {
3496             local_services_[_service][_instance] = std::make_tuple(_major,
3497                     _minor, _client);
3498         } else {
3499             VSOMEIP_ERROR << "routing_manager_impl::handle_local_offer_service: "
3500                 << "rejecting service registration. Application: "
3501                 << std::hex << std::setfill('0') << std::setw(4)
3502                 << _client << " is trying to offer ["
3503                 << std::hex << std::setfill('0') << std::setw(4) << _service << "."
3504                 << std::hex << std::setfill('0') << std::setw(4) << _instance << "."
3505                 << std::dec << static_cast<std::uint32_t>(_major) << "."
3506                 << std::dec << _minor << "]"
3507                 << "] already offered remotely";
3508             return false;
3509         }
3510     }
3511     return true;
3512 }
3513 
on_pong(client_t _client)3514 void routing_manager_impl::on_pong(client_t _client) {
3515     std::lock_guard<std::mutex> its_lock(pending_offers_mutex_);
3516     if (pending_offers_.size() == 0) {
3517         return;
3518     }
3519     for (auto service_iter = pending_offers_.begin();
3520             service_iter != pending_offers_.end(); ) {
3521         for (auto instance_iter = service_iter->second.begin();
3522                 instance_iter != service_iter->second.end(); ) {
3523             if (std::get<3>(instance_iter->second) == _client) {
3524                 // received pong from an application were another application wants
3525                 // to offer its service, delete the other applications offer as
3526                 // the current offering application is still alive
3527                 VSOMEIP_WARNING << "OFFER("
3528                     << std::hex << std::setw(4) << std::setfill('0')
3529                     << std::get<2>(instance_iter->second) <<"): ["
3530                     << std::hex << std::setw(4) << std::setfill('0')
3531                     << service_iter->first << "."
3532                     << std::hex << std::setw(4) << std::setfill('0')
3533                     << instance_iter->first << ":" << std::dec
3534                     << std::uint32_t(std::get<0>(instance_iter->second))
3535                     << "." << std::dec << std::get<1>(instance_iter->second)
3536                     << "] was rejected as application: "
3537                     << std::hex << std::setw(4) << std::setfill('0') << _client
3538                     << " is still alive";
3539                 instance_iter = service_iter->second.erase(instance_iter);
3540             } else {
3541                 ++instance_iter;
3542             }
3543         }
3544 
3545         if (service_iter->second.size() == 0) {
3546             service_iter = pending_offers_.erase(service_iter);
3547         } else {
3548             ++service_iter;
3549         }
3550     }
3551 }
3552 
register_client_error_handler(client_t _client,const std::shared_ptr<endpoint> & _endpoint)3553 void routing_manager_impl::register_client_error_handler(client_t _client,
3554         const std::shared_ptr<endpoint> &_endpoint) {
3555     _endpoint->register_error_handler(
3556         std::bind(&routing_manager_impl::handle_client_error, this, _client));
3557 }
3558 
handle_client_error(client_t _client)3559 void routing_manager_impl::handle_client_error(client_t _client) {
3560     VSOMEIP_INFO << "Client 0x" << std::hex << get_client()
3561             << " handles a client error(" << std::hex << _client << ")";
3562     if (stub_)
3563         stub_->update_registration(_client, registration_type_e::DEREGISTER_ON_ERROR);
3564 
3565     std::forward_list<std::tuple<client_t, service_t, instance_t, major_version_t,
3566                                         minor_version_t>> its_offers;
3567     {
3568         std::lock_guard<std::mutex> its_lock(pending_offers_mutex_);
3569         if (pending_offers_.size() == 0) {
3570             return;
3571         }
3572 
3573         for (auto service_iter = pending_offers_.begin();
3574                 service_iter != pending_offers_.end(); ) {
3575             for (auto instance_iter = service_iter->second.begin();
3576                     instance_iter != service_iter->second.end(); ) {
3577                 if (std::get<3>(instance_iter->second) == _client) {
3578                     VSOMEIP_WARNING << "OFFER("
3579                         << std::hex << std::setw(4) << std::setfill('0')
3580                         << std::get<2>(instance_iter->second) <<"): ["
3581                         << std::hex << std::setw(4) << std::setfill('0')
3582                         << service_iter->first << "."
3583                         << std::hex << std::setw(4) << std::setfill('0')
3584                         << instance_iter->first << ":" << std::dec
3585                         << std::uint32_t(std::get<0>(instance_iter->second))
3586                         << "." << std::dec << std::get<1>(instance_iter->second)
3587                         << "] is not pending anymore as application: "
3588                         << std::hex << std::setw(4) << std::setfill('0')
3589                         << std::get<3>(instance_iter->second)
3590                         << " is dead. Offering again!";
3591                     its_offers.push_front(std::make_tuple(
3592                                     std::get<2>(instance_iter->second),
3593                                     service_iter->first,
3594                                     instance_iter->first,
3595                                     std::get<0>(instance_iter->second),
3596                                     std::get<1>(instance_iter->second)));
3597                     instance_iter = service_iter->second.erase(instance_iter);
3598                 } else {
3599                     ++instance_iter;
3600                 }
3601             }
3602 
3603             if (service_iter->second.size() == 0) {
3604                 service_iter = pending_offers_.erase(service_iter);
3605             } else {
3606                 ++service_iter;
3607             }
3608         }
3609     }
3610     for (const auto &offer : its_offers) {
3611         offer_service(std::get<0>(offer), std::get<1>(offer), std::get<2>(offer),
3612                 std::get<3>(offer), std::get<4>(offer), true);
3613     }
3614 }
3615 
get_endpoint_manager() const3616 std::shared_ptr<endpoint_manager_impl> routing_manager_impl::get_endpoint_manager() const {
3617     return ep_mgr_impl_;
3618 }
3619 
send_subscribe(client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,major_version_t _major,event_t _event)3620 void routing_manager_impl::send_subscribe(client_t _client, service_t _service,
3621         instance_t _instance, eventgroup_t _eventgroup, major_version_t _major,
3622         event_t _event) {
3623     auto endpoint = ep_mgr_->find_local(_service, _instance);
3624     if (endpoint) {
3625         stub_->send_subscribe(endpoint, _client,
3626                 _service, _instance, _eventgroup, _major, _event, PENDING_SUBSCRIPTION_ID);
3627     }
3628 }
3629 
set_routing_state(routing_state_e _routing_state)3630 void routing_manager_impl::set_routing_state(routing_state_e _routing_state) {
3631 
3632     // Ignore setting to the current routing state
3633     {
3634         std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
3635         if (routing_state_ == _routing_state) {
3636             VSOMEIP_INFO << "rmi::" << __func__ << " No routing state change --> do nothing.";
3637             return;
3638         }
3639 
3640         routing_state_ = _routing_state;
3641     }
3642 
3643     if(discovery_) {
3644         switch (_routing_state) {
3645             case routing_state_e::RS_SUSPENDED:
3646             {
3647                 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode, diagnosis mode is "
3648                     << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3649 
3650                 // stop processing of incoming SD messages
3651                 discovery_->stop();
3652 
3653                 VSOMEIP_INFO << "rmi::" << __func__ << " Inform all applications that we are going to suspend";
3654                 send_suspend();
3655 
3656                 // remove all remote subscriptions to remotely offered services on this node
3657                 expire_subscriptions(true);
3658 
3659                 // send StopOffer messages for remotely offered services on this node
3660                 for (const auto &its_service : get_offered_services()) {
3661                     for (const auto &its_instance : its_service.second) {
3662                         if (its_instance.second->get_endpoint(true) || its_instance.second->get_endpoint(false)) {
3663                             const client_t its_client(find_local_client(its_service.first, its_instance.first));
3664                             VSOMEIP_WARNING << "service "
3665                                 << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "."
3666                                 << std::hex << std::setw(4) << std::setfill('0') << its_instance.first << " still offered by "
3667                                 << std::hex << std::setw(4) << std::setfill('0') << its_client;
3668                         }
3669                         discovery_->stop_offer_service(its_instance.second);
3670                     }
3671                 }
3672                 {
3673                     std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
3674                     remote_subscription_state_.clear();
3675                 }
3676 
3677                 // send StopSubscribes and clear subscribed_ map
3678                 discovery_->unsubscribe_all_on_suspend();
3679 
3680                 // mark all external services as offline
3681                 services_t its_remote_services;
3682                 {
3683                     std::lock_guard<std::mutex> its_lock(services_remote_mutex_);
3684                     its_remote_services = services_remote_;
3685                 }
3686                 for (const auto &s : its_remote_services) {
3687                     for (const auto &i : s.second) {
3688                         const bool has_reliable(i.second->get_endpoint(true));
3689                         const bool has_unreliable(i.second->get_endpoint(false));
3690                         del_routing_info(s.first, i.first, has_reliable, has_unreliable);
3691 
3692                         // clear all cached payloads of remote services
3693                         unset_all_eventpayloads(s.first, i.first);
3694                     }
3695                 }
3696 
3697                 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to suspend mode done, diagnosis mode is "
3698                     << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3699 
3700                 break;
3701             }
3702             case routing_state_e::RS_RESUMED:
3703             {
3704                 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode, diagnosis mode was "
3705                     << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3706                 {
3707                     std::lock_guard<std::mutex> its_lock(routing_state_mutex_);
3708                     last_resume_ = std::chrono::steady_clock::now();
3709                 }
3710 
3711                 // Reset relevant in service info
3712                 for (const auto &its_service : get_offered_services()) {
3713                     for (const auto &its_instance : its_service.second) {
3714                         its_instance.second->set_ttl(DEFAULT_TTL);
3715                         its_instance.second->set_is_in_mainphase(false);
3716                     }
3717                 }
3718                 // Switch SD back to normal operation
3719                 discovery_->set_diagnosis_mode(false);
3720 
3721                 if (routing_state_handler_) {
3722                     routing_state_handler_(_routing_state);
3723                 }
3724 
3725                 // start processing of SD messages (incoming remote offers should lead to new subscribe messages)
3726                 discovery_->start();
3727 
3728                 // Trigger initial offer phase for relevant services
3729                 for (const auto &its_service : get_offered_services()) {
3730                     for (const auto &its_instance : its_service.second) {
3731                         discovery_->offer_service(its_instance.second);
3732                     }
3733                 }
3734 
3735                 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to resume mode done, diagnosis mode was "
3736                     << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3737                 break;
3738             }
3739             case routing_state_e::RS_DIAGNOSIS:
3740             {
3741                 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode.";
3742                 discovery_->set_diagnosis_mode(true);
3743 
3744                 // send StopOffer messages for all someip protocol services
3745                 for (const auto &its_service : get_offered_services()) {
3746                     for (const auto &its_instance : its_service.second) {
3747                         if (host_->get_configuration()->is_someip(
3748                                 its_service.first, its_instance.first)) {
3749                             discovery_->stop_offer_service(its_instance.second);
3750                         }
3751                     }
3752                 }
3753 
3754                 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to diagnosis mode done.";
3755                 break;
3756             }
3757             case routing_state_e::RS_RUNNING:
3758                 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode, diagnosis mode was "
3759                     << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3760 
3761                 // Reset relevant in service info
3762                 for (const auto &its_service : get_offered_services()) {
3763                     for (const auto &its_instance : its_service.second) {
3764                         if (host_->get_configuration()->is_someip(
3765                                 its_service.first, its_instance.first)) {
3766                             its_instance.second->set_ttl(DEFAULT_TTL);
3767                             its_instance.second->set_is_in_mainphase(false);
3768                         }
3769                     }
3770                 }
3771                 // Switch SD back to normal operation
3772                 discovery_->set_diagnosis_mode(false);
3773 
3774                 // Trigger initial phase for relevant services
3775                 for (const auto &its_service : get_offered_services()) {
3776                     for (const auto &its_instance : its_service.second) {
3777                         if (host_->get_configuration()->is_someip(
3778                                 its_service.first, its_instance.first)) {
3779                             discovery_->offer_service(its_instance.second);
3780                         }
3781                     }
3782                 }
3783 
3784                 VSOMEIP_INFO << "rmi::" << __func__ << " Set routing to running mode done, diagnosis mode was "
3785                     << ((discovery_->get_diagnosis_mode() == true) ? "active." : "inactive.");
3786                 break;
3787             default:
3788                 break;
3789         }
3790     }
3791 }
3792 
on_net_interface_or_route_state_changed(bool _is_interface,std::string _if,bool _available)3793 void routing_manager_impl::on_net_interface_or_route_state_changed(
3794         bool _is_interface, std::string _if, bool _available) {
3795     std::lock_guard<std::mutex> its_lock(pending_sd_offers_mutex_);
3796     auto log_change_message = [&_if, _available, _is_interface](bool _warning) {
3797         std::stringstream ss;
3798         ss << (_is_interface ? "Network interface" : "Route") << " \"" << _if
3799                 << "\" state changed: " << (_available ? "up" : "down");
3800         if (_warning) {
3801             VSOMEIP_WARNING << ss.str();
3802         } else {
3803             VSOMEIP_INFO << ss.str();
3804         }
3805     };
3806     if (_is_interface) {
3807         if (if_state_running_
3808                 || (_available && !if_state_running_ && routing_running_)) {
3809             log_change_message(true);
3810         } else if (!if_state_running_) {
3811             log_change_message(false);
3812         }
3813         if (_available && !if_state_running_) {
3814             if_state_running_ = true;
3815             if (!routing_running_) {
3816                 if(configuration_->is_sd_enabled()) {
3817                     if (sd_route_set_) {
3818                         start_ip_routing();
3819                     }
3820                 } else {
3821                     // Static routing, don't wait for route!
3822                     start_ip_routing();
3823                 }
3824             }
3825         }
3826     } else {
3827         if (sd_route_set_
3828                 || (_available && !sd_route_set_ && routing_running_)) {
3829             log_change_message(true);
3830         } else if (!sd_route_set_) {
3831             log_change_message(false);
3832         }
3833         if (_available && !sd_route_set_) {
3834             sd_route_set_ = true;
3835             if (!routing_running_) {
3836                 if (if_state_running_) {
3837                     start_ip_routing();
3838                 }
3839             }
3840         }
3841     }
3842 }
3843 
start_ip_routing()3844 void routing_manager_impl::start_ip_routing() {
3845 #ifdef _WIN32
3846     if_state_running_ = true;
3847 #endif
3848 
3849     if (routing_ready_handler_) {
3850         routing_ready_handler_();
3851     }
3852     if (discovery_) {
3853         discovery_->start();
3854     } else {
3855         init_routing_info();
3856     }
3857 
3858     for (auto its_service : pending_sd_offers_) {
3859         init_service_info(its_service.first, its_service.second, true);
3860     }
3861     pending_sd_offers_.clear();
3862 
3863     routing_running_ = true;
3864     VSOMEIP_INFO << VSOMEIP_ROUTING_READY_MESSAGE;
3865 }
3866 
requested_service_add(client_t _client,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)3867 void routing_manager_impl::requested_service_add(client_t _client,
3868                                              service_t _service,
3869                                              instance_t _instance,
3870                                              major_version_t _major,
3871                                              minor_version_t _minor) {
3872     std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
3873     requested_services_[_client][_service][_instance].insert({ _major, _minor });
3874 }
3875 
requested_service_remove(client_t _client,service_t _service,instance_t _instance)3876 void routing_manager_impl::requested_service_remove(client_t _client,
3877                                              service_t _service,
3878                                              instance_t _instance) {
3879     std::lock_guard<std::mutex> ist_lock(requested_services_mutex_);
3880     auto found_client = requested_services_.find(_client);
3881     if (found_client != requested_services_.end()) {
3882         auto found_service = found_client->second.find(_service);
3883         if (found_service != found_client->second.end()) {
3884             auto found_instance = found_service->second.find(_instance);
3885             if (found_instance != found_service->second.end()) {
3886                 // delete all requested major/minor versions
3887                 found_service->second.erase(_instance);
3888                 if (!found_service->second.size()) {
3889                     found_client->second.erase(_service);
3890                     if (!found_client->second.size()) {
3891                         requested_services_.erase(_client);
3892                     }
3893                 }
3894             }
3895         }
3896     }
3897 }
3898 
3899 std::set<eventgroup_t>
get_subscribed_eventgroups(service_t _service,instance_t _instance)3900 routing_manager_impl::get_subscribed_eventgroups(
3901         service_t _service, instance_t _instance) {
3902     std::set<eventgroup_t> its_eventgroups;
3903 
3904     std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
3905     auto found_service = eventgroups_.find(_service);
3906     if (found_service != eventgroups_.end()) {
3907         auto found_instance = found_service->second.find(_instance);
3908         if (found_instance != found_service->second.end()) {
3909             for (const auto& its_group : found_instance->second) {
3910                 for (const auto& its_event : its_group.second->get_events()) {
3911                     if (its_event->has_subscriber(its_group.first, ANY_CLIENT)) {
3912                         its_eventgroups.insert(its_group.first);
3913                     }
3914                 }
3915             }
3916         }
3917     }
3918 
3919     return its_eventgroups;
3920 }
3921 
clear_targets_and_pending_sub_from_eventgroups(service_t _service,instance_t _instance)3922 void routing_manager_impl::clear_targets_and_pending_sub_from_eventgroups(
3923         service_t _service, instance_t _instance) {
3924     std::vector<std::shared_ptr<event>> its_events;
3925     {
3926         std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
3927         auto found_service = eventgroups_.find(_service);
3928         if (found_service != eventgroups_.end()) {
3929             auto found_instance = found_service->second.find(_instance);
3930             if (found_instance != found_service->second.end()) {
3931                 for (const auto &its_eventgroup : found_instance->second) {
3932                     // As the service is gone, all subscriptions to its events
3933                     // do no longer exist and the last received payload is no
3934                     // longer valid.
3935                     for (auto &its_event : its_eventgroup.second->get_events()) {
3936                         const auto its_subscribers = its_event->get_subscribers();
3937                         for (const auto& its_subscriber : its_subscribers) {
3938                             if (its_subscriber != get_client()) {
3939                                 its_event->remove_subscriber(
3940                                         its_eventgroup.first, its_subscriber);
3941                             }
3942 
3943                             client_t its_client = VSOMEIP_ROUTING_CLIENT; //is_specific_endpoint_client(its_subscriber, _service, _instance);
3944                             {
3945                                 std::lock_guard<std::mutex> its_lock(remote_subscription_state_mutex_);
3946                                 const auto its_tuple =
3947                                     std::make_tuple(found_service->first, found_instance->first,
3948                                                     its_eventgroup.first, its_client);
3949                                 remote_subscription_state_.erase(its_tuple);
3950                             }
3951                         }
3952                         its_events.push_back(its_event);
3953                     }
3954                     // TODO dn: find out why this was commented out
3955                     //its_eventgroup.second->clear_targets();
3956                     //its_eventgroup.second->clear_pending_subscriptions();
3957                 }
3958             }
3959         }
3960     }
3961     for (const auto& e : its_events) {
3962         e->unset_payload(true);
3963     }
3964 }
3965 
clear_remote_subscriber(service_t _service,instance_t _instance)3966 void routing_manager_impl::clear_remote_subscriber(service_t _service,
3967                                                    instance_t _instance) {
3968     std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
3969     auto found_service = remote_subscribers_.find(_service);
3970     if (found_service != remote_subscribers_.end()) {
3971         if (found_service->second.erase(_instance) > 0 &&
3972                 !found_service->second.size()) {
3973             remote_subscribers_.erase(found_service);
3974         }
3975     }
3976 }
3977 
3978 
call_sd_endpoint_connected(const boost::system::error_code & _error,service_t _service,instance_t _instance,const std::shared_ptr<endpoint> & _endpoint,std::shared_ptr<boost::asio::steady_timer> _timer)3979 void routing_manager_impl::call_sd_endpoint_connected(
3980         const boost::system::error_code& _error,
3981         service_t _service, instance_t _instance,
3982         const std::shared_ptr<endpoint>& _endpoint,
3983         std::shared_ptr<boost::asio::steady_timer> _timer) {
3984     (void)_timer;
3985     if (_error) {
3986         return;
3987     }
3988     _endpoint->set_established(true);
3989     if (discovery_) {
3990         discovery_->on_endpoint_connected(_service, _instance,
3991                 _endpoint);
3992     }
3993 }
3994 
create_placeholder_event_and_subscribe(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,client_t _client)3995 bool routing_manager_impl::create_placeholder_event_and_subscribe(
3996         service_t _service, instance_t _instance, eventgroup_t _eventgroup,
3997         event_t _event, client_t _client) {
3998     bool is_inserted(false);
3999     // we received a event which was not yet requested/offered
4000     // create a placeholder field until someone requests/offers this event with
4001     // full information like eventgroup, field or not etc.
4002     std::set<eventgroup_t> its_eventgroups({_eventgroup});
4003 
4004     const client_t its_local_client(find_local_client(_service, _instance));
4005     if (its_local_client == host_->get_client()) {
4006         // received subscription for event of a service instance hosted by
4007         // application acting as rm_impl register with own client id and shadow = false
4008         register_event(host_->get_client(),
4009                 _service, _instance,
4010                 _event,
4011                 its_eventgroups, event_type_e::ET_UNKNOWN, reliability_type_e::RT_UNKNOWN,
4012                 std::chrono::milliseconds::zero(), false, true,
4013                 nullptr, false, false, true);
4014     } else if (its_local_client != VSOMEIP_ROUTING_CLIENT) {
4015         // received subscription for event of a service instance hosted on
4016         // this node register with client id of local_client and set shadow to true
4017         register_event(its_local_client,
4018                 _service, _instance,
4019                 _event, its_eventgroups, event_type_e::ET_UNKNOWN,
4020                 reliability_type_e::RT_UNKNOWN,
4021                 std::chrono::milliseconds::zero(), false, true,
4022                 nullptr, false, true, true);
4023     } else {
4024         // received subscription for event of a unknown or remote service instance
4025         std::shared_ptr<serviceinfo> its_info = find_service(_service,
4026                 _instance);
4027         if (its_info && !its_info->is_local()) {
4028             // remote service, register shadow event with client ID of subscriber
4029             // which should have called register_event
4030             register_event(_client,
4031                     _service, _instance,
4032                     _event, its_eventgroups, event_type_e::ET_UNKNOWN,
4033                     reliability_type_e::RT_UNKNOWN,
4034                     std::chrono::milliseconds::zero(),
4035                     false, true, nullptr, false, true, true);
4036         } else {
4037             VSOMEIP_WARNING
4038                 << "routing_manager_impl::create_placeholder_event_and_subscribe("
4039                 << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
4040                 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
4041                 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
4042                 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
4043                 << std::hex << std::setw(4) << std::setfill('0') << _event << "]"
4044                 << " received subscription for unknown service instance.";
4045         }
4046     }
4047 
4048     std::shared_ptr<event> its_event = find_event(_service, _instance, _event);
4049     if (its_event) {
4050         is_inserted = its_event->add_subscriber(_eventgroup, _client, false);
4051     }
4052     return is_inserted;
4053 }
4054 
handle_subscription_state(client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)4055 void routing_manager_impl::handle_subscription_state(
4056         client_t _client, service_t _service, instance_t _instance,
4057         eventgroup_t _eventgroup, event_t _event) {
4058 #if 0
4059     VSOMEIP_ERROR << "routing_manager_impl::" << __func__
4060             << "(" << std::hex << _client << "): "
4061             << "event="
4062             << std::hex << _service << "."
4063             << std::hex << _instance << "."
4064             << std::hex << _eventgroup << "."
4065             << std::hex << _event
4066             << " me="
4067             << std::hex << get_client();
4068 #endif
4069     // Note: remote_subscription_state_mutex_ is already locked as this
4070     // method builds a critical section together with insert_subscription
4071     // from routing_manager_base.
4072     // Todo: Improve this situation...
4073     auto its_event = find_event(_service, _instance, _event);
4074     client_t its_client(VSOMEIP_ROUTING_CLIENT);
4075     if (its_event &&
4076             its_event->get_type() == event_type_e::ET_SELECTIVE_EVENT) {
4077         its_client = _client;
4078     }
4079 
4080     auto its_tuple
4081         = std::make_tuple(_service, _instance, _eventgroup, its_client);
4082     auto its_state = remote_subscription_state_.find(its_tuple);
4083     if (its_state != remote_subscription_state_.end()) {
4084 #if 0
4085         VSOMEIP_ERROR << "routing_manager_impl::" << __func__
4086                 << "(" << std::hex << _client << "): "
4087                 << "event="
4088                 << std::hex << _service << "."
4089                 << std::hex << _instance << "."
4090                 << std::hex << _eventgroup << "."
4091                 << std::hex << _event
4092                 << " state=" << std::hex << (int)its_state->second
4093                 << " me="
4094                 << std::hex << get_client();
4095 #endif
4096         if (its_state->second == subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED) {
4097             // Subscription already acknowledged!
4098             if (_client == get_client()) {
4099                 host_->on_subscription_status(_service, _instance, _eventgroup, _event, 0x0 /*OK*/);
4100             } else {
4101                 stub_->send_subscribe_ack(_client, _service, _instance, _eventgroup, _event);
4102             }
4103         }
4104     }
4105 }
4106 
register_sd_acceptance_handler(const sd_acceptance_handler_t & _handler) const4107 void routing_manager_impl::register_sd_acceptance_handler(
4108         const sd_acceptance_handler_t& _handler) const {
4109     if (discovery_) {
4110         discovery_->register_sd_acceptance_handler(_handler);
4111     }
4112 }
4113 
register_reboot_notification_handler(const reboot_notification_handler_t & _handler) const4114 void routing_manager_impl::register_reboot_notification_handler(
4115         const reboot_notification_handler_t& _handler) const {
4116     if (discovery_) {
4117         discovery_->register_reboot_notification_handler(_handler);
4118     }
4119 }
4120 
register_routing_ready_handler(const routing_ready_handler_t & _handler)4121 void routing_manager_impl::register_routing_ready_handler(
4122         const routing_ready_handler_t& _handler) {
4123     routing_ready_handler_ = _handler;
4124 }
4125 
register_routing_state_handler(const routing_state_handler_t & _handler)4126 void routing_manager_impl::register_routing_state_handler(
4127         const routing_state_handler_t& _handler) {
4128     routing_state_handler_ = _handler;
4129 }
4130 
sd_acceptance_enabled(const boost::asio::ip::address & _address,const configuration::port_range_t & _range,bool _reliable)4131 void routing_manager_impl::sd_acceptance_enabled(
4132         const boost::asio::ip::address& _address,
4133         const configuration::port_range_t& _range, bool _reliable) {
4134     expire_subscriptions(_address, _range, _reliable);
4135     expire_services(_address, _range, _reliable);
4136 }
4137 
memory_log_timer_cbk(boost::system::error_code const & _error)4138 void routing_manager_impl::memory_log_timer_cbk(
4139         boost::system::error_code const & _error) {
4140     if (_error) {
4141         return;
4142     }
4143 #ifndef _WIN32
4144     static const std::uint32_t its_pagesize = static_cast<std::uint32_t>(getpagesize() / 1024);
4145 #else
4146     static const std::uint32_t its_pagesize = 4096 / 1024;
4147 #endif
4148     std::FILE *its_file = std::fopen("/proc/self/statm", "r");
4149     if (!its_file) {
4150         VSOMEIP_ERROR << "memory_log_timer_cbk: couldn't open:"
4151                 << std::string(std::strerror(errno));
4152         return;
4153     }
4154     std::uint64_t its_size(0);
4155     std::uint64_t its_rsssize(0);
4156     std::uint64_t its_sharedpages(0);
4157     std::uint64_t its_text(0);
4158     std::uint64_t its_lib(0);
4159     std::uint64_t its_data(0);
4160     std::uint64_t its_dirtypages(0);
4161 
4162     if (EOF == std::fscanf(its_file, "%lu %lu %lu %lu %lu %lu %lu", &its_size,
4163                     &its_rsssize, &its_sharedpages, &its_text, &its_lib,
4164                     &its_data, &its_dirtypages)) {
4165         VSOMEIP_ERROR<< "memory_log_timer_cbk: error reading:"
4166                 << std::string(std::strerror(errno));
4167     }
4168     std::fclose(its_file);
4169 #ifndef _WIN32
4170     struct timespec cputs, monots;
4171     clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &cputs);
4172     clock_gettime(CLOCK_MONOTONIC, &monots);
4173 #endif
4174 
4175     VSOMEIP_INFO << "memory usage: "
4176             << "VmSize " << std::dec << its_size * its_pagesize << " kB, "
4177             << "VmRSS " << std::dec << its_rsssize * its_pagesize << " kB, "
4178             << "shared pages " << std::dec << its_sharedpages * its_pagesize << " kB, "
4179             << "text " << std::dec << its_text * its_pagesize << " kB, "
4180             << "data " << std::dec << its_data * its_pagesize << " kB "
4181 #ifndef _WIN32
4182             << "| monotonic time: " << std::dec << monots.tv_sec << "."
4183             << std::dec << monots.tv_nsec << " cpu time: "
4184             << std::dec << cputs.tv_sec << "." << std::dec << cputs.tv_nsec
4185 #endif
4186             ;
4187 
4188     {
4189         std::lock_guard<std::mutex> its_lock(memory_log_timer_mutex_);
4190         boost::system::error_code ec;
4191         memory_log_timer_.expires_from_now(std::chrono::seconds(
4192                 configuration_->get_log_memory_interval()), ec);
4193         memory_log_timer_.async_wait(
4194                 std::bind(&routing_manager_impl::memory_log_timer_cbk, this,
4195                         std::placeholders::_1));
4196     }
4197 }
4198 
status_log_timer_cbk(boost::system::error_code const & _error)4199 void routing_manager_impl::status_log_timer_cbk(
4200         boost::system::error_code const & _error) {
4201     if (_error) {
4202         return;
4203     }
4204 
4205     ep_mgr_impl_->print_status();
4206     {
4207         std::lock_guard<std::mutex> its_lock(status_log_timer_mutex_);
4208         boost::system::error_code ec;
4209         status_log_timer_.expires_from_now(std::chrono::seconds(
4210                 configuration_->get_log_status_interval()), ec);
4211         status_log_timer_.async_wait(
4212                 std::bind(&routing_manager_impl::status_log_timer_cbk, this,
4213                         std::placeholders::_1));
4214     }
4215 }
4216 
4217 void
on_unsubscribe_ack(client_t _client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,remote_subscription_id_t _id)4218 routing_manager_impl::on_unsubscribe_ack(client_t _client,
4219         service_t _service, instance_t _instance, eventgroup_t _eventgroup,
4220         remote_subscription_id_t _id) {
4221     std::shared_ptr<eventgroupinfo> its_info
4222         = find_eventgroup(_service, _instance, _eventgroup);
4223     if (its_info) {
4224         update_remote_subscription_mutex_.lock();
4225         const auto its_subscription = its_info->get_remote_subscription(_id);
4226         if (its_subscription) {
4227             its_info->remove_remote_subscription(_id);
4228 
4229             std::lock_guard<std::mutex> its_lock(remote_subscribers_mutex_);
4230             remote_subscribers_[_service][_instance].erase(_client);
4231 
4232             if (its_info->get_remote_subscriptions().size() == 0) {
4233                 for (const auto &its_event : its_info->get_events()) {
4234                     bool has_remote_subscriber(false);
4235                     for (const auto &its_eventgroup : its_event->get_eventgroups()) {
4236                        const auto its_eventgroup_info
4237                            = find_eventgroup(_service, _instance, its_eventgroup);
4238                         if (its_eventgroup_info
4239                                 && its_eventgroup_info->get_remote_subscriptions().size() > 0) {
4240                             has_remote_subscriber = true;
4241                         }
4242                     }
4243 
4244                     if (!has_remote_subscriber && its_event->is_shadow()) {
4245                         its_event->unset_payload();
4246                     }
4247                 }
4248             }
4249         } else {
4250             VSOMEIP_ERROR << __func__
4251                 << ": Unknown StopSubscribe " << std::dec << _id << " for eventgroup ["
4252                 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
4253                 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
4254                 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
4255         }
4256         update_remote_subscription_mutex_.unlock();
4257     } else {
4258         VSOMEIP_ERROR << __func__
4259                 << ": Received StopSubscribe for unknown eventgroup: ("
4260                 << std::hex << std::setw(4) << std::setfill('0') << _client << "): ["
4261                 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
4262                 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
4263                 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
4264     }
4265 }
4266 
on_connect(const std::shared_ptr<endpoint> & _endpoint)4267 void routing_manager_impl::on_connect(const std::shared_ptr<endpoint>& _endpoint) {
4268     (void)_endpoint;
4269 }
on_disconnect(const std::shared_ptr<endpoint> & _endpoint)4270 void routing_manager_impl::on_disconnect(const std::shared_ptr<endpoint>& _endpoint) {
4271     (void)_endpoint;
4272 }
send_subscription(const client_t _offering_client,const service_t _service,const instance_t _instance,const eventgroup_t _eventgroup,const major_version_t _major,const std::set<client_t> & _clients,const remote_subscription_id_t _id)4273 void routing_manager_impl::send_subscription(
4274         const client_t _offering_client,
4275         const service_t _service, const instance_t _instance,
4276         const eventgroup_t _eventgroup, const major_version_t _major,
4277         const std::set<client_t> &_clients,
4278         const remote_subscription_id_t _id) {
4279     if (host_->get_client() == _offering_client) {
4280         auto self = shared_from_this();
4281         for (const auto& its_client : _clients) {
4282             host_->on_subscription(_service, _instance, _eventgroup, its_client, own_uid_, own_gid_, true,
4283                 [this, self, _service, _instance, _eventgroup, its_client, _id]
4284                         (const bool _is_accepted) {
4285                 try {
4286                     if (!_is_accepted) {
4287                         const auto its_callback = std::bind(
4288                                 &routing_manager_stub_host::on_subscribe_nack,
4289                                 std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
4290                                 its_client, _service, _instance,
4291                                 _eventgroup, ANY_EVENT, _id, false);
4292                         io_.post(its_callback);
4293                     } else {
4294                         const auto its_callback = std::bind(
4295                                 &routing_manager_stub_host::on_subscribe_ack,
4296                                 std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
4297                                 its_client, _service, _instance,
4298                                 _eventgroup, ANY_EVENT, _id);
4299                         io_.post(its_callback);
4300                     }
4301                 } catch (const std::exception &e) {
4302                     VSOMEIP_ERROR << __func__ << e.what();
4303                 }
4304             });
4305         }
4306     } else { // service hosted by local client
4307         for (const auto& its_client : _clients) {
4308             if (!stub_->send_subscribe(find_local(_offering_client), its_client,
4309                     _service, _instance, _eventgroup, _major, ANY_EVENT, _id)) {
4310                 try {
4311                     const auto its_callback = std::bind(
4312                             &routing_manager_stub_host::on_subscribe_nack,
4313                             std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
4314                             its_client, _service, _instance, _eventgroup,
4315                             ANY_EVENT, _id, true);
4316                     io_.post(its_callback);
4317                 } catch (const std::exception &e) {
4318                     VSOMEIP_ERROR << __func__ << e.what();
4319                 }
4320             }
4321         }
4322     }
4323 }
4324 
cleanup_server_endpoint(service_t _service,const std::shared_ptr<endpoint> & _endpoint)4325 void routing_manager_impl::cleanup_server_endpoint(
4326         service_t _service, const std::shared_ptr<endpoint>& _endpoint) {
4327     if (_endpoint) {
4328         // Clear service_instances_, check whether any service still
4329         // uses this endpoint and clear server endpoint if no service
4330         // remains using it
4331         if (ep_mgr_impl_->remove_instance(_service, _endpoint.get())) {
4332             if (ep_mgr_impl_->remove_server_endpoint(
4333                     _endpoint->get_local_port(), _endpoint->is_reliable())) {
4334                 // Stop endpoint (close socket) to release its async_handlers!
4335                 _endpoint->stop();
4336             }
4337         }
4338     }
4339 }
4340 
pending_remote_offer_add(service_t _service,instance_t _instance)4341 pending_remote_offer_id_t routing_manager_impl::pending_remote_offer_add(
4342         service_t _service, instance_t _instance) {
4343     std::lock_guard<std::mutex> its_lock(pending_remote_offers_mutex_);
4344     if (++pending_remote_offer_id_ == 0) {
4345         pending_remote_offer_id_++;
4346     }
4347     pending_remote_offers_[pending_remote_offer_id_] = std::make_pair(_service,
4348             _instance);
4349     return pending_remote_offer_id_;
4350 }
4351 
pending_remote_offer_remove(pending_remote_offer_id_t _id)4352 std::pair<service_t, instance_t> routing_manager_impl::pending_remote_offer_remove(
4353         pending_remote_offer_id_t _id) {
4354     std::lock_guard<std::mutex> its_lock(pending_remote_offers_mutex_);
4355     std::pair<service_t, instance_t> ret = std::make_pair(ANY_SERVICE,
4356                                                           ANY_INSTANCE);
4357     auto found_si = pending_remote_offers_.find(_id);
4358     if (found_si != pending_remote_offers_.end()) {
4359         ret = found_si->second;
4360         pending_remote_offers_.erase(found_si);
4361     }
4362     return ret;
4363 }
4364 
on_resend_provided_events_response(pending_remote_offer_id_t _id)4365 void routing_manager_impl::on_resend_provided_events_response(
4366         pending_remote_offer_id_t _id) {
4367     const std::pair<service_t, instance_t> its_service =
4368             pending_remote_offer_remove(_id);
4369     if (its_service.first != ANY_SERVICE) {
4370         // create server endpoint
4371         std::shared_ptr<serviceinfo> its_info = find_service(its_service.first,
4372                 its_service.second);
4373         if (its_info) {
4374             its_info->set_ttl(DEFAULT_TTL);
4375             init_service_info(its_service.first, its_service.second, true);
4376         }
4377     }
4378 }
4379 
print_stub_status() const4380 void routing_manager_impl::print_stub_status() const {
4381     stub_->print_endpoint_status();
4382 }
4383 
service_endpoint_connected(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,const std::shared_ptr<endpoint> & _endpoint,bool _unreliable_only)4384 void routing_manager_impl::service_endpoint_connected(
4385         service_t _service, instance_t _instance, major_version_t _major,
4386         minor_version_t _minor, const std::shared_ptr<endpoint>& _endpoint,
4387         bool _unreliable_only) {
4388 
4389     if (!_unreliable_only) {
4390         // Mark only TCP-only and TCP+UDP services available here
4391         // UDP-only services are already marked as available in add_routing_info
4392         on_availability(_service, _instance, true, _major, _minor);
4393         stub_->on_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance,
4394                 _major, _minor);
4395     }
4396 
4397     std::shared_ptr<boost::asio::steady_timer> its_timer =
4398             std::make_shared<boost::asio::steady_timer>(io_);
4399     boost::system::error_code ec;
4400     its_timer->expires_from_now(std::chrono::milliseconds(3), ec);
4401     if (!ec) {
4402         its_timer->async_wait(
4403                 std::bind(&routing_manager_impl::call_sd_endpoint_connected,
4404                         std::static_pointer_cast<routing_manager_impl>(
4405                                 shared_from_this()), std::placeholders::_1,
4406                         _service, _instance, _endpoint, its_timer));
4407     } else {
4408         VSOMEIP_ERROR << __func__ << " " << ec.message();
4409     }
4410 }
4411 
service_endpoint_disconnected(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,const std::shared_ptr<endpoint> & _endpoint)4412 void routing_manager_impl::service_endpoint_disconnected(
4413         service_t _service, instance_t _instance, major_version_t _major,
4414         minor_version_t _minor, const std::shared_ptr<endpoint>& _endpoint) {
4415     (void)_endpoint;
4416     on_availability(_service, _instance, false, _major, _minor);
4417     stub_->on_stop_offer_service(VSOMEIP_ROUTING_CLIENT, _service, _instance,
4418             _major, _minor);
4419     VSOMEIP_WARNING << __func__ << ": lost connection to remote service: ["
4420             << std::hex << std::setw(4) << std::setfill('0') << _service << "."
4421             << std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
4422 }
4423 
4424 void
send_unsubscription(client_t _offering_client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,major_version_t _major,const std::set<client_t> & _removed,remote_subscription_id_t _id)4425 routing_manager_impl::send_unsubscription(client_t _offering_client,
4426         service_t _service, instance_t _instance,
4427         eventgroup_t _eventgroup, major_version_t _major,
4428         const std::set<client_t> &_removed,
4429         remote_subscription_id_t _id) {
4430 
4431     (void)_major; // TODO: Remove completely?
4432 
4433     if (host_->get_client() == _offering_client) {
4434         auto self = shared_from_this();
4435         for (const auto& its_client : _removed) {
4436             host_->on_subscription(_service, _instance,
4437                     _eventgroup, its_client, own_uid_, own_gid_, false,
4438                 [this, self, _service, _instance, _eventgroup,
4439                  its_client, _id]
4440                  (const bool _is_accepted) {
4441                     (void)_is_accepted;
4442                     try {
4443                         const auto its_callback = std::bind(
4444                             &routing_manager_stub_host::on_unsubscribe_ack,
4445                             std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
4446                             its_client, _service, _instance, _eventgroup, _id);
4447                         io_.post(its_callback);
4448                     } catch (const std::exception &e) {
4449                         VSOMEIP_ERROR << __func__ << e.what();
4450                     }
4451                 }
4452             );
4453         }
4454     } else {
4455         for (const auto& its_client : _removed) {
4456             if (!stub_->send_unsubscribe(find_local(_offering_client), its_client,
4457                     _service, _instance, _eventgroup, ANY_EVENT, _id)) {
4458                 try {
4459                     const auto its_callback = std::bind(
4460                         &routing_manager_stub_host::on_unsubscribe_ack,
4461                         std::dynamic_pointer_cast<routing_manager_stub_host>(shared_from_this()),
4462                         its_client, _service, _instance, _eventgroup, _id);
4463                     io_.post(its_callback);
4464                 } catch (const std::exception &e) {
4465                     VSOMEIP_ERROR << __func__ << e.what();
4466                 }
4467             }
4468         }
4469     }
4470 }
4471 
4472 void
send_expired_subscription(client_t _offering_client,service_t _service,instance_t _instance,eventgroup_t _eventgroup,const std::set<client_t> & _removed,remote_subscription_id_t _id)4473 routing_manager_impl::send_expired_subscription(client_t _offering_client,
4474         service_t _service, instance_t _instance,
4475         eventgroup_t _eventgroup,
4476         const std::set<client_t> &_removed,
4477         remote_subscription_id_t _id) {
4478 
4479     if (host_->get_client() == _offering_client) {
4480         auto self = shared_from_this();
4481         for (const auto its_client : _removed) {
4482             host_->on_subscription(_service, _instance,
4483                     _eventgroup, its_client, own_uid_, own_gid_, false,
4484                     [] (const bool _subscription_accepted){
4485                         (void)_subscription_accepted;
4486                     });
4487         }
4488     } else {
4489         for (const auto its_client : _removed) {
4490             stub_->send_expired_subscription(find_local(_offering_client), its_client,
4491                     _service, _instance, _eventgroup, ANY_EVENT, _id);
4492         }
4493     }
4494 }
4495 
4496 bool
update_security_policy_configuration(uint32_t _uid,uint32_t _gid,const std::shared_ptr<policy> & _policy,const std::shared_ptr<payload> & _payload,const security_update_handler_t & _handler)4497 routing_manager_impl::update_security_policy_configuration(
4498         uint32_t _uid, uint32_t _gid,
4499         const std::shared_ptr<policy> &_policy,
4500         const std::shared_ptr<payload> &_payload,
4501         const security_update_handler_t &_handler) {
4502 
4503     if (stub_)
4504         return stub_->update_security_policy_configuration(_uid, _gid,
4505                           _policy, _payload, _handler);
4506 
4507     return (false);
4508 }
4509 
4510 bool
remove_security_policy_configuration(uint32_t _uid,uint32_t _gid,const security_update_handler_t & _handler)4511 routing_manager_impl::remove_security_policy_configuration(
4512         uint32_t _uid, uint32_t _gid,
4513         const security_update_handler_t &_handler) {
4514 
4515     if (stub_)
4516         return stub_->remove_security_policy_configuration(_uid, _gid,
4517                           _handler);
4518 
4519     return (false);
4520 }
4521 
insert_event_statistics(service_t _service,instance_t _instance,method_t _method,length_t _length)4522 bool routing_manager_impl::insert_event_statistics(service_t _service, instance_t _instance,
4523         method_t _method, length_t _length) {
4524 
4525     static uint32_t its_max_messages = configuration_->get_statistics_max_messages();
4526     std::lock_guard<std::mutex> its_lock(message_statistics_mutex_);
4527     const auto its_tuple = std::make_tuple(_service, _instance, _method);
4528     const auto its_main_s = message_statistics_.find(its_tuple);
4529     if (its_main_s != message_statistics_.end()) {
4530         // increase counter and calculate moving avergae for payload length
4531         its_main_s->second.avg_length_ =
4532                 (its_main_s->second.avg_length_ * its_main_s->second.counter_  + _length) /
4533                 (its_main_s->second.counter_ + 1);
4534         its_main_s->second.counter_++;
4535 
4536         if (its_tuple == message_to_discard_) {
4537             // check list for entry with least counter value
4538             uint32_t its_min_count(0xFFFFFFFF);
4539             auto its_tuple_to_discard = std::make_tuple(0xFFFF, 0xFFFF, 0xFFFF);
4540             for (const auto &it : message_statistics_) {
4541                 if (it.second.counter_ < its_min_count) {
4542                     its_min_count = it.second.counter_;
4543                     its_tuple_to_discard = it.first;
4544                 }
4545             }
4546             if (its_min_count != 0xFFFF
4547                     && its_min_count < its_main_s->second.counter_) {
4548                 // update message to discard with current message
4549                 message_to_discard_ = its_tuple;
4550             }
4551         }
4552     } else {
4553         if (message_statistics_.size() < its_max_messages) {
4554             message_statistics_[its_tuple] = {1, _length};
4555             message_to_discard_ = its_tuple;
4556         } else {
4557             // no slot empty
4558             const auto it = message_statistics_.find(message_to_discard_);
4559             if (it != message_statistics_.end()
4560                     && it->second.counter_ == 1) {
4561                 message_statistics_.erase(message_to_discard_);
4562                 message_statistics_[its_tuple] = {1, _length};
4563                 message_to_discard_ = its_tuple;
4564             } else {
4565                 // ignore message
4566                 ignored_statistics_counter_++;
4567                 return false;
4568             }
4569         }
4570     }
4571     return true;
4572 }
4573 
statistics_log_timer_cbk(boost::system::error_code const & _error)4574 void routing_manager_impl::statistics_log_timer_cbk(boost::system::error_code const & _error) {
4575     if (!_error) {
4576         static uint32_t its_interval = configuration_->get_statistics_interval();
4577         its_interval = its_interval >= 1000 ? its_interval : 1000;
4578         static uint32_t its_min_freq = configuration_->get_statistics_min_freq();
4579         std::stringstream its_log;
4580         {
4581             std::lock_guard<std::mutex> its_lock(message_statistics_mutex_);
4582             for (const auto &s : message_statistics_) {
4583                 if (s.second.counter_ / (its_interval / 1000) >= its_min_freq) {
4584                     uint16_t its_subscribed(0);
4585                     std::shared_ptr<event> its_event = find_event(std::get<0>(s.first), std::get<1>(s.first), std::get<2>(s.first));
4586                     if (its_event) {
4587                         if (!its_event->is_provided()) {
4588                             its_subscribed = static_cast<std::uint16_t>(its_event->get_subscribers().size());
4589                         }
4590                     }
4591                     its_log << std::hex << std::setw(4) << std::setfill('0')
4592                                     << std::get<0>(s.first) << "."
4593                                     << std::get<1>(s.first) << "."
4594                                     << std::get<2>(s.first) << ": #="
4595                                     << std::dec << s.second.counter_ << " L="
4596                                     << s.second.avg_length_ << " S="
4597                                     << std::dec << its_subscribed << ", ";
4598                 }
4599             }
4600 
4601             if (ignored_statistics_counter_) {
4602                 its_log << std::dec << " #ignored: " << ignored_statistics_counter_;
4603             }
4604 
4605             message_statistics_.clear();
4606             message_to_discard_ = std::make_tuple(0x00, 0x00, 0x00);
4607             ignored_statistics_counter_ = 0;
4608         }
4609 
4610         if (its_log.str().length() > 0) {
4611             VSOMEIP_INFO << "Received events statistics: [" << its_log.str() << "]";
4612         }
4613 
4614         {
4615             std::lock_guard<std::mutex> its_lock(statistics_log_timer_mutex_);
4616             statistics_log_timer_.expires_from_now(std::chrono::milliseconds(its_interval));
4617             statistics_log_timer_.async_wait(
4618                     std::bind(&routing_manager_impl::statistics_log_timer_cbk,
4619                               this, std::placeholders::_1));
4620         }
4621     }
4622 }
4623 
send_suspend() const4624 void routing_manager_impl::send_suspend() const {
4625 
4626     stub_->send_suspend();
4627 }
4628 
4629 } // namespace vsomeip_v3
4630