1 // Copyright (C) 2014-2018 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 
6 #include <future>
7 #include <thread>
8 #include <iomanip>
9 #include <iostream>
10 
11 #include <boost/exception/diagnostic_information.hpp>
12 
13 #ifndef _WIN32
14 #include <dlfcn.h>
15 #include <sys/syscall.h>
16 #endif
17 
18 #include <vsomeip/defines.hpp>
19 #include <vsomeip/runtime.hpp>
20 #include <vsomeip/plugins/application_plugin.hpp>
21 #include <vsomeip/plugins/pre_configuration_plugin.hpp>
22 #include <vsomeip/internal/logger.hpp>
23 
24 #include "../include/application_impl.hpp"
25 #ifdef VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
26 #include "../../configuration/include/configuration_impl.hpp"
27 #else
28 #include "../../configuration/include/configuration.hpp"
29 #include "../../configuration/include/configuration_plugin.hpp"
30 #endif // VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
31 #include "../../message/include/serializer.hpp"
32 #include "../../routing/include/routing_manager_impl.hpp"
33 #include "../../routing/include/routing_manager_proxy.hpp"
34 #include "../../utility/include/utility.hpp"
35 #include "../../tracing/include/connector_impl.hpp"
36 #include "../../plugin/include/plugin_manager_impl.hpp"
37 #include "../../endpoints/include/endpoint.hpp"
38 #include "../../security/include/security.hpp"
39 
40 namespace vsomeip_v3 {
41 
42 #ifdef ANDROID
~configuration()43 configuration::~configuration() {}
44 #endif
45 
46 uint32_t application_impl::app_counter__ = 0;
47 std::mutex application_impl::app_counter_mutex__;
48 
application_impl(const std::string & _name)49 application_impl::application_impl(const std::string &_name)
50         : runtime_(runtime::get()),
51           client_(VSOMEIP_CLIENT_UNSET),
52           session_(0),
53           is_initialized_(false), name_(_name),
54           work_(std::make_shared<boost::asio::io_service::work>(io_)),
55           routing_(0),
56           state_(state_type_e::ST_DEREGISTERED),
57           security_mode_(security_mode_e::SM_OFF),
58 #ifdef VSOMEIP_ENABLE_SIGNAL_HANDLING
59           signals_(io_, SIGINT, SIGTERM),
60           catched_signal_(false),
61 #endif
62           is_dispatching_(false),
63           max_dispatchers_(VSOMEIP_MAX_DISPATCHERS),
64           max_dispatch_time_(VSOMEIP_MAX_DISPATCH_TIME),
65           stopped_(false),
66           block_stopping_(false),
67           is_routing_manager_host_(false),
68           stopped_called_(false),
69           watchdog_timer_(io_),
70           client_side_logging_(false)
71 #ifdef VSOMEIP_HAS_SESSION_HANDLING_CONFIG
72           , has_session_handling_(true)
73 #endif // VSOMEIP_HAS_SESSION_HANDLING_CONFIG
74 {
75     own_uid_ = ANY_UID;
76     own_gid_ = ANY_GID;
77 #ifndef _WIN32
78     own_uid_ = getuid();
79     own_gid_ = getgid();
80 #endif
81 }
82 
~application_impl()83 application_impl::~application_impl() {
84     runtime_->remove_application(name_);
85     try {
86         if (stop_thread_.joinable()) {
87             stop_thread_.detach();
88         }
89     } catch (const std::exception& e) {
90         std::cerr << __func__ << " catched exception (shutdown): " << e.what() << std::endl;
91     }
92 
93     try {
94         std::lock_guard<std::mutex> its_lock_start_stop(start_stop_mutex_);
95         for (const auto& t : io_threads_) {
96             if (t->joinable()) {
97                 t->detach();
98             }
99         }
100         io_threads_.clear();
101     } catch (const std::exception& e) {
102         std::cerr << __func__ << " catched exception (io threads): " << e.what() << std::endl;
103     }
104 
105     try {
106         std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
107         for (const auto& its_dispatcher : dispatchers_) {
108             if (its_dispatcher.second->joinable()) {
109                 its_dispatcher.second->detach();
110             }
111         }
112         dispatchers_.clear();
113     } catch (const std::exception& e) {
114         std::cerr << __func__ << " catched exception (dispatchers): " << e.what() << std::endl;
115     }
116 }
117 
init()118 bool application_impl::init() {
119     if(is_initialized_) {
120         VSOMEIP_WARNING << "Trying to initialize an already initialized application.";
121         return true;
122     }
123     // Application name
124     if (name_ == "") {
125         const char *its_name = getenv(VSOMEIP_ENV_APPLICATION_NAME);
126         if (nullptr != its_name) {
127             name_ = its_name;
128         }
129     }
130 
131     std::string configuration_path;
132 
133     // load configuration from module
134     std::string config_module = "";
135     const char *its_config_module = getenv(VSOMEIP_ENV_CONFIGURATION_MODULE);
136     if (nullptr != its_config_module) {
137         // TODO: Add loading of custom configuration module
138     } else { // load default module
139 #ifndef VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
140         auto its_plugin = plugin_manager::get()->get_plugin(
141                 plugin_type_e::CONFIGURATION_PLUGIN, VSOMEIP_CFG_LIBRARY);
142         if (its_plugin) {
143             auto its_configuration_plugin
144                 = std::dynamic_pointer_cast<configuration_plugin>(its_plugin);
145             if (its_configuration_plugin) {
146                 configuration_ = its_configuration_plugin->get_configuration(name_);
147                 VSOMEIP_INFO << "Configuration module loaded.";
148             } else {
149                 std::cerr << "Invalid configuration module!" << std::endl;
150                 std::exit(EXIT_FAILURE);
151             }
152         } else {
153             std::cerr << "Configuration module could not be loaded!" << std::endl;
154             std::exit(EXIT_FAILURE);
155         }
156 #else
157         configuration_ = std::dynamic_pointer_cast<configuration>(
158                 std::make_shared<vsomeip_v3::cfg::configuration_impl>());
159         if (configuration_path.length()) {
160             configuration_->set_configuration_path(configuration_path);
161         }
162         configuration_->load(name_);
163 #endif // VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
164     }
165 
166     // Set security mode
167     auto its_security = security::get();
168     if (its_security->is_enabled()) {
169         if (its_security->is_audit()) {
170             security_mode_ = security_mode_e::SM_AUDIT;
171         } else {
172             security_mode_ = security_mode_e::SM_ON;
173         }
174     } else {
175         security_mode_ = security_mode_e::SM_OFF;
176     }
177 
178     const char *client_side_logging = getenv(VSOMEIP_ENV_CLIENTSIDELOGGING);
179     if (client_side_logging != nullptr) {
180         client_side_logging_ = true;
181         VSOMEIP_INFO << "Client side logging for application: " << name_
182                 << " is enabled";
183 
184         if ('\0' != *client_side_logging) {
185             std::stringstream its_converter(client_side_logging);
186             if ('"' == its_converter.peek()) {
187                 its_converter.get(); // skip quote
188             }
189             uint16_t val(0xffffu);
190             bool stop_parsing(false);
191             do {
192                 const uint16_t prev_val(val);
193                 its_converter >> std::hex >> std::setw(4) >> val;
194                 if (its_converter.good()) {
195                     const std::stringstream::int_type c = its_converter.eof()?'\0':its_converter.get();
196                     switch (c) {
197                     case '"':
198                     case '.':
199                     case ':':
200                     case ' ':
201                     case '\0': {
202                             if ('.' != c) {
203                                 if (0xffffu == prev_val) {
204                                     VSOMEIP_INFO << "+filter "
205                                     << std::hex << std::setw(4) << std::setfill('0') << val;
206                                     client_side_logging_filter_.insert(std::make_tuple(val, ANY_INSTANCE));
207                                 } else {
208                                     VSOMEIP_INFO << "+filter "
209                                     << std::hex << std::setw(4) << std::setfill('0') << prev_val << "."
210                                     << std::hex << std::setw(4) << std::setfill('0') << val;
211                                     client_side_logging_filter_.insert(std::make_tuple(prev_val, val));
212                                 }
213                                 val = 0xffffu;
214                             }
215                         }
216                         break;
217                     default:
218                         stop_parsing = true;
219                         break;
220                     }
221                 }
222             }
223             while (!stop_parsing && its_converter.good());
224         }
225     }
226 
227     std::shared_ptr<configuration> its_configuration = get_configuration();
228     if (its_configuration) {
229         VSOMEIP_INFO << "Initializing vsomeip application \"" << name_ << "\".";
230         client_ = its_configuration->get_id(name_);
231 
232         // Max dispatchers is the configured maximum number of dispatchers and
233         // the main dispatcher
234         max_dispatchers_ = its_configuration->get_max_dispatchers(name_) + 1;
235         max_dispatch_time_ = its_configuration->get_max_dispatch_time(name_);
236 
237 #ifdef VSOMEIP_HAS_SESSION_HANDLING_CONFIG
238         has_session_handling_ = its_configuration->has_session_handling(name_);
239         if (!has_session_handling_)
240             VSOMEIP_INFO << "application: " << name_
241                 << " has session handling switched off!";
242 #endif // VSOMEIP_HAS_SESSION_HANDLING_CONFIG
243 
244         std::string its_routing_host = its_configuration->get_routing_host();
245         if (its_routing_host != "") {
246             is_routing_manager_host_ = (its_routing_host == name_);
247             if (is_routing_manager_host_ &&
248                     !utility::is_routing_manager(configuration_)) {
249 #ifndef VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
250                 VSOMEIP_ERROR << "application: " << name_ << " configured as "
251                         "routing but other routing manager present. Won't "
252                         "instantiate routing";
253                 is_routing_manager_host_ = false;
254                 return false;
255 #else
256             is_routing_manager_host_ = true;
257 #endif // VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
258             }
259         } else {
260             is_routing_manager_host_ = utility::is_routing_manager(configuration_);
261         }
262 
263         if (is_routing_manager_host_) {
264             VSOMEIP_INFO << "Instantiating routing manager [Host].";
265             if (client_ == VSOMEIP_CLIENT_UNSET) {
266                 client_ = static_cast<client_t>(
267                           (configuration_->get_diagnosis_address() << 8)
268                         & configuration_->get_diagnosis_mask());
269                 utility::request_client_id(configuration_, name_, client_);
270             }
271             routing_ = std::make_shared<routing_manager_impl>(this);
272         } else {
273             VSOMEIP_INFO << "Instantiating routing manager [Proxy].";
274             routing_ = std::make_shared<routing_manager_proxy>(this, client_side_logging_, client_side_logging_filter_);
275         }
276 
277         routing_->set_client(client_);
278         routing_->init();
279 
280 #ifdef USE_DLT
281         // Tracing
282         std::shared_ptr<trace::connector_impl> its_connector
283             = trace::connector_impl::get();
284         std::shared_ptr<cfg::trace> its_trace_configuration
285             = its_configuration->get_trace();
286         its_connector->configure(its_trace_configuration);
287 #endif
288 
289         VSOMEIP_INFO << "Application(" << (name_ != "" ? name_ : "unnamed")
290                 << ", " << std::hex << std::setw(4) << std::setfill('0') << client_
291                 << ") is initialized ("
292                 << std::dec << max_dispatchers_ << ", "
293                 << std::dec << max_dispatch_time_ << ").";
294 
295         is_initialized_ = true;
296     }
297 
298 #ifdef VSOMEIP_ENABLE_SIGNAL_HANDLING
299     if (is_initialized_) {
300         signals_.add(SIGINT);
301         signals_.add(SIGTERM);
302 
303         // Register signal handler
304         auto its_signal_handler =
305                 [this] (boost::system::error_code const &_error, int _signal) {
306                     if (!_error) {
307                         switch (_signal) {
308                             case SIGTERM:
309                             case SIGINT:
310                                 catched_signal_ = true;
311                                 stop();
312                                 break;
313                             default:
314                                 break;
315                         }
316                     }
317                 };
318         signals_.async_wait(its_signal_handler);
319     }
320 #endif
321 
322     if (configuration_) {
323         auto its_plugins = configuration_->get_plugins(name_);
324         auto its_app_plugin_info = its_plugins.find(plugin_type_e::APPLICATION_PLUGIN);
325         if (its_app_plugin_info != its_plugins.end()) {
326             for (auto its_library : its_app_plugin_info->second) {
327                 auto its_application_plugin = plugin_manager::get()->get_plugin(
328                         plugin_type_e::APPLICATION_PLUGIN, its_library);
329                 if (its_application_plugin) {
330                     VSOMEIP_INFO << "Client 0x" << std::hex << get_client()
331                             << " Loading plug-in library: " << its_library << " succeeded!";
332                     std::dynamic_pointer_cast<application_plugin>(its_application_plugin)->
333                             on_application_state_change(name_, application_plugin_state_e::STATE_INITIALIZED);
334                 }
335             }
336         }
337     } else {
338         std::cerr << "Configuration module could not be loaded!" << std::endl;
339         std::exit(EXIT_FAILURE);
340     }
341 
342     return is_initialized_;
343 }
344 
start()345 void application_impl::start() {
346 #ifndef _WIN32
347     if (getpid() != static_cast<pid_t>(syscall(SYS_gettid))) {
348         // only set threadname if calling thread isn't the main thread
349         std::stringstream s;
350         s << std::hex << std::setw(4) << std::setfill('0') << client_
351                 << "_io" << std::setw(2) << std::setfill('0') << 0;
352         pthread_setname_np(pthread_self(),s.str().c_str());
353     }
354 #endif
355     if (!is_initialized_) {
356         VSOMEIP_ERROR << "Trying to start an unintialized application.";
357         return;
358     }
359 
360     const size_t io_thread_count = configuration_->get_io_thread_count(name_);
361     const int io_thread_nice_level = configuration_->get_io_thread_nice_level(name_);
362     {
363         std::lock_guard<std::mutex> its_lock(start_stop_mutex_);
364         if (io_.stopped()) {
365             io_.reset();
366         } else if(stop_thread_.joinable()) {
367             VSOMEIP_ERROR << "Trying to start an already started application.";
368             return;
369         }
370         if (stopped_) {
371             {
372                 std::lock_guard<std::mutex> its_lock_start_stop(block_stop_mutex_);
373                 block_stopping_ = true;
374                 block_stop_cv_.notify_all();
375             }
376 
377             stopped_ = false;
378             return;
379         }
380         stopped_ = false;
381         stopped_called_ = false;
382         VSOMEIP_INFO << "Starting vsomeip application \"" << name_ << "\" ("
383                 << std::hex << std::setw(4) << std::setfill('0') << client_
384                 << ") using "  << std::dec << io_thread_count << " threads"
385 #ifndef _WIN32
386                 << " I/O nice " << io_thread_nice_level
387 #endif
388         ;
389 
390         start_caller_id_ = std::this_thread::get_id();
391         {
392             std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
393             is_dispatching_ = true;
394             auto its_main_dispatcher = std::make_shared<std::thread>(
395                     std::bind(&application_impl::main_dispatch, shared_from_this()));
396             dispatchers_[its_main_dispatcher->get_id()] = its_main_dispatcher;
397         }
398 
399         if (stop_thread_.joinable()) {
400             stop_thread_.join();
401         }
402         stop_thread_= std::thread(&application_impl::shutdown, shared_from_this());
403 
404         if (routing_)
405             routing_->start();
406 
407         for (size_t i = 0; i < io_thread_count - 1; i++) {
408             std::shared_ptr<std::thread> its_thread
409                 = std::make_shared<std::thread>([this, i, io_thread_nice_level] {
410                     VSOMEIP_INFO << "io thread id from application: "
411                             << std::hex << std::setw(4) << std::setfill('0')
412                             << client_ << " (" << name_ << ") is: " << std::hex
413                             << std::this_thread::get_id()
414                     #ifndef _WIN32
415                             << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
416                     #endif
417                             ;
418                     #ifndef _WIN32
419                         {
420                             std::stringstream s;
421                             s << std::hex << std::setw(4) << std::setfill('0')
422                                 << client_ << "_io" << std::setw(2)
423                                 << std::setfill('0') << i+1;
424                             pthread_setname_np(pthread_self(),s.str().c_str());
425                         }
426                         if ((VSOMEIP_IO_THREAD_NICE_LEVEL != io_thread_nice_level) && (io_thread_nice_level != nice(io_thread_nice_level))) {
427                             VSOMEIP_WARNING << "nice(" << io_thread_nice_level << ") failed " << errno << " for " << std::this_thread::get_id();
428                         }
429                     #endif
430                     try {
431                       io_.run();
432                     } catch (const std::exception &e) {
433                         VSOMEIP_ERROR << "application_impl::start() "
434                                 "catched exception: " << e.what();
435                     }
436                   });
437             io_threads_.insert(its_thread);
438         }
439     }
440 
441     auto its_plugins = configuration_->get_plugins(name_);
442     auto its_app_plugin_info = its_plugins.find(plugin_type_e::APPLICATION_PLUGIN);
443     if (its_app_plugin_info != its_plugins.end()) {
444         for (const auto& its_library : its_app_plugin_info->second) {
445             auto its_application_plugin = plugin_manager::get()->get_plugin(
446                     plugin_type_e::APPLICATION_PLUGIN, its_library);
447             if (its_application_plugin) {
448                 std::dynamic_pointer_cast<application_plugin>(its_application_plugin)->
449                         on_application_state_change(name_, application_plugin_state_e::STATE_STARTED);
450             }
451         }
452 
453     }
454 
455     app_counter_mutex__.lock();
456     app_counter__++;
457     app_counter_mutex__.unlock();
458     VSOMEIP_INFO << "io thread id from application: "
459             << std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
460             << name_ << ") is: " << std::hex << std::this_thread::get_id()
461 #ifndef _WIN32
462             << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
463 #endif
464     ;
465 #ifndef _WIN32
466     if ((VSOMEIP_IO_THREAD_NICE_LEVEL != io_thread_nice_level) && (io_thread_nice_level != nice(io_thread_nice_level))) {
467         VSOMEIP_WARNING << "nice(" << io_thread_nice_level << ") failed " << errno << " for " << std::this_thread::get_id();
468     }
469 #endif
470     try {
471         io_.run();
472 
473         if (stop_thread_.joinable()) {
474             stop_thread_.join();
475         }
476 
477     } catch (const std::exception &e) {
478         VSOMEIP_ERROR << "application_impl::start() catched exception: " << e.what();
479     }
480 
481     {
482         std::lock_guard<std::mutex> its_lock_start_stop(block_stop_mutex_);
483         block_stopping_ = true;
484         block_stop_cv_.notify_all();
485     }
486 
487     {
488         std::lock_guard<std::mutex> its_lock(start_stop_mutex_);
489         stopped_ = false;
490     }
491 
492     app_counter_mutex__.lock();
493     app_counter__--;
494 
495 #ifdef VSOMEIP_ENABLE_SIGNAL_HANDLING
496     if (catched_signal_ && !app_counter__) {
497         app_counter_mutex__.unlock();
498         VSOMEIP_INFO << "Exiting vsomeip application...";
499         exit(0);
500     }
501 #endif
502     app_counter_mutex__.unlock();
503 }
504 
stop()505 void application_impl::stop() {
506 
507     VSOMEIP_INFO << "Stopping vsomeip application \"" << name_ << "\" ("
508                 << std::hex << std::setw(4) << std::setfill('0') << client_ << ").";
509 
510     bool block = true;
511     {
512         std::lock_guard<std::mutex> its_lock_start_stop(start_stop_mutex_);
513         if (stopped_ || stopped_called_) {
514             return;
515         }
516         stop_caller_id_ = std::this_thread::get_id();
517         stopped_ = true;
518         stopped_called_ = true;
519         for (const auto& thread : io_threads_) {
520             if (thread->get_id() == std::this_thread::get_id()) {
521                 block = false;
522             }
523         }
524         if (start_caller_id_ == stop_caller_id_) {
525             block = false;
526         }
527     }
528     auto its_plugins = configuration_->get_plugins(name_);
529     auto its_app_plugin_info = its_plugins.find(plugin_type_e::APPLICATION_PLUGIN);
530     if (its_app_plugin_info != its_plugins.end()) {
531         for (const auto& its_library : its_app_plugin_info->second) {
532             auto its_application_plugin = plugin_manager::get()->get_plugin(
533                     plugin_type_e::APPLICATION_PLUGIN, its_library);
534             if (its_application_plugin) {
535                 std::dynamic_pointer_cast<application_plugin>(its_application_plugin)->
536                         on_application_state_change(name_, application_plugin_state_e::STATE_STOPPED);
537             }
538         }
539 
540     }
541 
542     {
543         std::lock_guard<std::mutex> its_lock_start_stop(start_stop_mutex_);
544         stop_cv_.notify_one();
545     }
546 
547     if (block) {
548         std::unique_lock<std::mutex> block_stop_lock(block_stop_mutex_);
549         while (!block_stopping_) {
550             block_stop_cv_.wait(block_stop_lock);
551         }
552         block_stopping_ = false;
553     }
554 }
555 
process(int _number)556 void application_impl::process(int _number) {
557     (void)_number;
558     VSOMEIP_ERROR << "application::process is not (yet) implemented.";
559 }
560 
get_security_mode() const561 security_mode_e application_impl::get_security_mode() const {
562     return security_mode_;
563 }
564 
offer_service(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)565 void application_impl::offer_service(service_t _service, instance_t _instance,
566         major_version_t _major, minor_version_t _minor) {
567     if (routing_)
568         routing_->offer_service(client_, _service, _instance, _major, _minor);
569 }
570 
stop_offer_service(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)571 void application_impl::stop_offer_service(service_t _service, instance_t _instance,
572     major_version_t _major, minor_version_t _minor) {
573     if (routing_)
574         routing_->stop_offer_service(client_, _service, _instance, _major, _minor);
575 }
576 
request_service(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)577 void application_impl::request_service(service_t _service, instance_t _instance,
578         major_version_t _major, minor_version_t _minor) {
579     if (routing_)
580         routing_->request_service(client_, _service, _instance, _major, _minor);
581 }
582 
release_service(service_t _service,instance_t _instance)583 void application_impl::release_service(service_t _service,
584         instance_t _instance) {
585     if (routing_)
586         routing_->release_service(client_, _service, _instance);
587 }
588 
subscribe(service_t _service,instance_t _instance,eventgroup_t _eventgroup,major_version_t _major,event_t _event)589 void application_impl::subscribe(service_t _service, instance_t _instance,
590                                  eventgroup_t _eventgroup,
591                                  major_version_t _major,
592                                  event_t _event) {
593     if (routing_) {
594         bool send_back_cached(false);
595         bool send_back_cached_group(false);
596         check_send_back_cached_event(_service, _instance, _event, _eventgroup,
597                 &send_back_cached, &send_back_cached_group);
598 
599         if (send_back_cached) {
600             send_back_cached_event(_service, _instance, _event);
601         } else if(send_back_cached_group) {
602             send_back_cached_eventgroup(_service, _instance, _eventgroup);
603         }
604 
605         if (check_subscription_state(_service, _instance, _eventgroup, _event)) {
606             routing_->subscribe(client_, own_uid_, own_gid_, _service, _instance, _eventgroup, _major,
607                     _event);
608         }
609     }
610 }
611 
unsubscribe(service_t _service,instance_t _instance,eventgroup_t _eventgroup)612 void application_impl::unsubscribe(service_t _service, instance_t _instance,
613         eventgroup_t _eventgroup) {
614     remove_subscription(_service, _instance, _eventgroup, ANY_EVENT);
615     if (routing_)
616         routing_->unsubscribe(client_, own_uid_, own_gid_, _service, _instance, _eventgroup, ANY_EVENT);
617 }
618 
unsubscribe(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)619 void application_impl::unsubscribe(service_t _service, instance_t _instance,
620         eventgroup_t _eventgroup, event_t _event) {
621     remove_subscription(_service, _instance, _eventgroup, _event);
622     if (routing_)
623         routing_->unsubscribe(client_, own_uid_, own_gid_, _service, _instance, _eventgroup, _event);
624 }
625 
is_available(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor) const626 bool application_impl::is_available(
627         service_t _service, instance_t _instance,
628         major_version_t _major, minor_version_t _minor) const {
629     std::lock_guard<std::recursive_mutex> its_lock(availability_mutex_);
630     return is_available_unlocked(_service, _instance, _major, _minor);
631 }
632 
is_available_unlocked(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor) const633 bool application_impl::is_available_unlocked(
634         service_t _service, instance_t _instance,
635         major_version_t _major, minor_version_t _minor) const {
636 
637     bool is_available(false);
638 
639     auto check_major_minor = [&](const std::map<instance_t,
640                 std::map<major_version_t,
641                     minor_version_t >>::const_iterator &_found_instance) {
642         auto found_major = _found_instance->second.find(_major);
643         if (found_major != _found_instance->second.end()) {
644             if (_minor <= found_major->second || _minor == ANY_MINOR
645                     || _minor == DEFAULT_MINOR) {
646                 is_available = true;
647             }
648         } else if ((_major == DEFAULT_MAJOR || _major == ANY_MAJOR)) {
649             for (const auto &found_major : _found_instance->second) {
650                 if (_minor == DEFAULT_MINOR || _minor == ANY_MINOR) {
651                     is_available = true;
652                     break;
653                 } else if (_minor <= found_major.second) {
654                     is_available = true;
655                     break;
656                 }
657             }
658         }
659     };
660     auto found_service = available_.find(_service);
661     if (found_service != available_.end()) {
662         auto found_instance = found_service->second.find(_instance);
663         if (found_instance != found_service->second.end()) {
664             check_major_minor(found_instance);
665         } else if (_instance == ANY_INSTANCE) {
666             for (auto it = found_service->second.cbegin();
667                     it != found_service->second.cend(); it++) {
668                 check_major_minor(it);
669                 if (is_available) {
670                     break;
671                 }
672             }
673         }
674     } else if (_service == ANY_SERVICE) {
675         for (const auto &found_service : available_) {
676             auto found_instance = found_service.second.find(_instance);
677             if (found_instance != found_service.second.end()) {
678                 check_major_minor(found_instance);
679                 if (is_available) {
680                     break;
681                 }
682             } else if (_instance == ANY_INSTANCE) {
683                 for (auto it = found_service.second.cbegin();
684                         it != found_service.second.cend(); it++) {
685                     check_major_minor(it);
686                     if (is_available) {
687                         break;
688                     }
689                 }
690             }
691             if (is_available) {
692                 break;
693             }
694         }
695     }
696     return is_available;
697 }
698 
are_available(available_t & _available,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor) const699 bool application_impl::are_available(
700         available_t &_available,
701         service_t _service, instance_t _instance,
702         major_version_t _major, minor_version_t _minor) const {
703     std::lock_guard<std::recursive_mutex> its_lock(availability_mutex_);
704     return are_available_unlocked(_available, _service, _instance, _major, _minor);
705 }
706 
are_available_unlocked(available_t & _available,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor) const707 bool application_impl::are_available_unlocked(available_t &_available,
708                             service_t _service, instance_t _instance,
709                             major_version_t _major, minor_version_t _minor) const {
710 
711     //find available services
712     if(_service == ANY_SERVICE) {
713         //add all available services
714         for(auto its_available_services_it = available_.begin();
715                 its_available_services_it != available_.end();
716                 ++its_available_services_it)
717             _available[its_available_services_it->first];
718     } else {
719         // check if specific service is available
720         if(available_.find(_service) != available_.end())
721             _available[_service];
722     }
723 
724     //find available instances
725     //iterate through found available services
726     for(auto its_available_services_it = _available.begin();
727             its_available_services_it != _available.end();
728             ++its_available_services_it) {
729         //get available service
730         auto found_available_service = available_.find(its_available_services_it->first);
731         if (found_available_service != available_.end()) {
732             if(_instance == ANY_INSTANCE) {
733                 //add all available instances
734                 for(auto its_available_instances_it = found_available_service->second.begin();
735                         its_available_instances_it != found_available_service->second.end();
736                         ++its_available_instances_it)
737                     _available[its_available_services_it->first][its_available_instances_it->first];
738             } else {
739                 if(found_available_service->second.find(_instance) != found_available_service->second.end())
740                     _available[its_available_services_it->first][_instance];
741             }
742         }
743     }
744 
745     //find major versions
746     //iterate through found available services
747     for(auto its_available_services_it = _available.begin();
748             its_available_services_it != _available.end();
749             ++its_available_services_it) {
750         //get available service
751          auto found_available_service = available_.find(its_available_services_it->first);
752          if (found_available_service != available_.end()) {
753              //iterate through found available instances
754              for(auto its_available_instances_it = found_available_service->second.begin();
755                      its_available_instances_it != found_available_service->second.end();
756                      ++its_available_instances_it) {
757                  //get available instance
758                  auto found_available_instance = found_available_service->second.find(its_available_instances_it->first);
759                  if(found_available_instance != found_available_service->second.end()) {
760                      if(_major == ANY_MAJOR || _major == DEFAULT_MAJOR) {
761                          //add all major versions
762                          for(auto its_available_major_it = found_available_instance->second.begin();
763                                  its_available_major_it != found_available_instance->second.end();
764                                  ++its_available_major_it)
765                              _available[its_available_services_it->first][its_available_instances_it->first][its_available_major_it->first];
766                      } else {
767                          if(found_available_instance->second.find(_major) != found_available_instance->second.end())
768                              _available[its_available_services_it->first][its_available_instances_it->first][_major];
769                      }
770                  }
771              }
772          }
773     }
774 
775     //find minor
776     //iterate through found available services
777     auto its_available_services_it = _available.begin();
778     while(its_available_services_it != _available.end()) {
779         bool found_minor(false);
780         //get available service
781          auto found_available_service = available_.find(its_available_services_it->first);
782          if (found_available_service != available_.end()) {
783              //iterate through found available instances
784              for(auto its_available_instances_it = found_available_service->second.begin();
785                      its_available_instances_it != found_available_service->second.end();
786                      ++its_available_instances_it) {
787                  //get available instance
788                  auto found_available_instance = found_available_service->second.find(its_available_instances_it->first);
789                  if(found_available_instance != found_available_service->second.end()) {
790                      //iterate through found available major version
791                      for(auto its_available_major_it = found_available_instance->second.begin();
792                              its_available_major_it != found_available_instance->second.end();
793                              ++its_available_major_it) {
794                          //get available major version
795                          auto found_available_major = found_available_instance->second.find(its_available_major_it->first);
796                          if(found_available_major != found_available_instance->second.end()) {
797                              if(_minor == ANY_MINOR || _minor == DEFAULT_MINOR
798                                      || _minor <= found_available_major->second) {
799                                  //add minor version
800                                  _available[its_available_services_it->first][its_available_instances_it->first][its_available_major_it->first] = found_available_major->second;
801                                  found_minor = true;
802                              }
803                          }
804                      }
805                  }
806              }
807          }
808          if(found_minor)
809              ++its_available_services_it;
810          else
811              its_available_services_it = _available.erase(its_available_services_it);
812     }
813 
814     if(_available.empty()) {
815         _available[_service][_instance][_major] = _minor ;
816         return false;
817     }
818     return true;
819 }
820 
send(std::shared_ptr<message> _message)821 void application_impl::send(std::shared_ptr<message> _message) {
822     bool is_request = utility::is_request(_message);
823     if (client_side_logging_
824         && (client_side_logging_filter_.empty()
825             || (1 == client_side_logging_filter_.count(std::make_tuple(_message->get_service(), ANY_INSTANCE)))
826             || (1 == client_side_logging_filter_.count(std::make_tuple(_message->get_service(), _message->get_instance()))))) {
827         VSOMEIP_INFO << "application_impl::send: ("
828             << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
829             << std::hex << std::setw(4) << std::setfill('0') << _message->get_service() << "."
830             << std::hex << std::setw(4) << std::setfill('0') << _message->get_instance() << "."
831             << std::hex << std::setw(4) << std::setfill('0') << _message->get_method() << ":"
832             << std::hex << std::setw(4) << std::setfill('0')
833             << ((is_request) ? session_ : _message->get_session()) << ":"
834             << std::hex << std::setw(4) << std::setfill('0')
835                                 << ((is_request) ? client_.load() : _message->get_client()) << "] "
836             << "type=" << std::hex << static_cast<std::uint32_t>(_message->get_message_type())
837             << " thread=" << std::hex << std::this_thread::get_id();
838     }
839     if (routing_) {
840         // in case of requests set the request-id (client-id|session-id)
841         if (is_request) {
842             _message->set_client(client_);
843             _message->set_session(get_session());
844         }
845         // Always increment the session-id
846         (void)routing_->send(client_, _message);
847     }
848 }
849 
notify(service_t _service,instance_t _instance,event_t _event,std::shared_ptr<payload> _payload,bool _force) const850 void application_impl::notify(service_t _service, instance_t _instance,
851         event_t _event, std::shared_ptr<payload> _payload, bool _force) const {
852     if (routing_)
853         routing_->notify(_service, _instance, _event, _payload, _force);
854 }
855 
notify_one(service_t _service,instance_t _instance,event_t _event,std::shared_ptr<payload> _payload,client_t _client,bool _force) const856 void application_impl::notify_one(service_t _service, instance_t _instance,
857         event_t _event, std::shared_ptr<payload> _payload,
858         client_t _client, bool _force) const {
859     if (routing_) {
860         routing_->notify_one(_service, _instance, _event, _payload, _client,
861                 _force
862 #ifdef VSOMEIP_ENABLE_COMPAT
863                 , false
864 #endif
865                 );
866     }
867 }
868 
register_state_handler(state_handler_t _handler)869 void application_impl::register_state_handler(state_handler_t _handler) {
870     std::lock_guard<std::mutex> its_lock(state_handler_mutex_);
871     handler_ = _handler;
872 }
873 
unregister_state_handler()874 void application_impl::unregister_state_handler() {
875     std::lock_guard<std::mutex> its_lock(state_handler_mutex_);
876     handler_ = nullptr;
877 }
878 
register_availability_handler(service_t _service,instance_t _instance,availability_handler_t _handler,major_version_t _major,minor_version_t _minor)879 void application_impl::register_availability_handler(service_t _service,
880         instance_t _instance, availability_handler_t _handler,
881         major_version_t _major, minor_version_t _minor) {
882     std::lock_guard<std::recursive_mutex> availability_lock(availability_mutex_);
883     if (state_ == state_type_e::ST_REGISTERED) {
884         do_register_availability_handler(_service, _instance,
885                 _handler, _major, _minor);
886     } else {
887         availability_[_service][_instance][_major][_minor] = std::make_pair(
888                 _handler, false);
889     }
890 }
891 
do_register_availability_handler(service_t _service,instance_t _instance,availability_handler_t _handler,major_version_t _major,minor_version_t _minor)892 void application_impl::do_register_availability_handler(service_t _service,
893         instance_t _instance, availability_handler_t _handler,
894         major_version_t _major, minor_version_t _minor) {
895         available_t available;
896     bool are_available = are_available_unlocked(available, _service, _instance, _major, _minor);
897     availability_[_service][_instance][_major][_minor] = std::make_pair(
898             _handler, true);
899 
900     std::lock_guard<std::mutex> handlers_lock(handlers_mutex_);
901 
902     std::shared_ptr<sync_handler> its_sync_handler
903         = std::make_shared<sync_handler>([_handler, are_available, available]() {
904                  for(const auto& available_services_it : available)
905                      for(const auto& available_instances_it : available_services_it.second)
906                          _handler(available_services_it.first, available_instances_it.first, are_available);
907              });
908     its_sync_handler->handler_type_ = handler_type_e::AVAILABILITY;
909     its_sync_handler->service_id_ = _service;
910     its_sync_handler->instance_id_ = _instance;
911     handlers_.push_back(its_sync_handler);
912 
913     dispatcher_condition_.notify_one();
914 }
915 
unregister_availability_handler(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)916 void application_impl::unregister_availability_handler(service_t _service,
917         instance_t _instance, major_version_t _major, minor_version_t _minor) {
918     std::lock_guard<std::recursive_mutex> its_lock(availability_mutex_);
919     auto found_service = availability_.find(_service);
920     if (found_service != availability_.end()) {
921         auto found_instance = found_service->second.find(_instance);
922         if (found_instance != found_service->second.end()) {
923             auto found_major = found_instance->second.find(_major);
924             if (found_major != found_instance->second.end()) {
925                 auto found_minor = found_major->second.find(_minor);
926                 if (found_minor != found_major->second.end()) {
927                     found_major->second.erase(_minor);
928 
929                     if (!found_major->second.size()) {
930                         found_instance->second.erase(_major);
931                         if (!found_instance->second.size()) {
932                             found_service->second.erase(_instance);
933                             if (!found_service->second.size()) {
934                                 availability_.erase(_service);
935                             }
936                         }
937                     }
938                 }
939             }
940         }
941     }
942 }
943 
on_subscription(service_t _service,instance_t _instance,eventgroup_t _eventgroup,client_t _client,uid_t _uid,gid_t _gid,bool _subscribed,std::function<void (bool)> _accepted_cb)944 void application_impl::on_subscription(service_t _service, instance_t _instance,
945         eventgroup_t _eventgroup, client_t _client, uid_t _uid, gid_t _gid,
946         bool _subscribed, std::function<void(bool)> _accepted_cb) {
947     bool handler_found = false;
948     std::pair<subscription_handler_t, async_subscription_handler_t> its_handlers;
949     {
950         std::lock_guard<std::mutex> its_lock(subscription_mutex_);
951         auto found_service = subscription_.find(_service);
952         if (found_service != subscription_.end()) {
953             auto found_instance = found_service->second.find(_instance);
954             if (found_instance != found_service->second.end()) {
955                 auto found_eventgroup = found_instance->second.find(_eventgroup);
956                 if (found_eventgroup != found_instance->second.end()) {
957                     its_handlers = found_eventgroup->second;
958                     handler_found = true;
959                 }
960             }
961         }
962     }
963 
964     if (handler_found) {
965         if(auto its_handler = its_handlers.first) {
966             // "normal" subscription handler exists
967             _accepted_cb(its_handler(_client, _uid, _gid, _subscribed));
968         } else if(auto its_handler = its_handlers.second) {
969             // async subscription handler exists
970             its_handler(_client, _uid, _gid, _subscribed, _accepted_cb);
971         }
972     } else {
973         _accepted_cb(true);
974     }
975 }
976 
register_subscription_handler(service_t _service,instance_t _instance,eventgroup_t _eventgroup,subscription_handler_t _handler)977 void application_impl::register_subscription_handler(service_t _service,
978         instance_t _instance, eventgroup_t _eventgroup,
979         subscription_handler_t _handler) {
980 
981     std::lock_guard<std::mutex> its_lock(subscription_mutex_);
982     subscription_[_service][_instance][_eventgroup] = std::make_pair(_handler, nullptr);
983 }
984 
unregister_subscription_handler(service_t _service,instance_t _instance,eventgroup_t _eventgroup)985 void application_impl::unregister_subscription_handler(service_t _service,
986         instance_t _instance, eventgroup_t _eventgroup) {
987     std::lock_guard<std::mutex> its_lock(subscription_mutex_);
988     auto found_service = subscription_.find(_service);
989     if (found_service != subscription_.end()) {
990         auto found_instance = found_service->second.find(_instance);
991         if (found_instance != found_service->second.end()) {
992             auto found_eventgroup = found_instance->second.find(_eventgroup);
993             if (found_eventgroup != found_instance->second.end()) {
994                 found_instance->second.erase(_eventgroup);
995             }
996         }
997     }
998 }
999 
on_subscription_status(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,uint16_t _error)1000 void application_impl::on_subscription_status(service_t _service,
1001         instance_t _instance, eventgroup_t _eventgroup, event_t _event,
1002         uint16_t _error) {
1003     bool entry_found(false);
1004     {
1005         auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, _event);
1006         std::lock_guard<std::mutex> its_lock(subscriptions_state_mutex_);
1007         auto its_subscription_state = subscription_state_.find(its_tuple);
1008         if (its_subscription_state == subscription_state_.end()) {
1009             its_tuple = std::make_tuple(_service, _instance, _eventgroup, ANY_EVENT);
1010             auto its_any_subscription_state = subscription_state_.find(its_tuple);
1011             if (its_any_subscription_state == subscription_state_.end()) {
1012                 VSOMEIP_TRACE << std::hex << get_client( )
1013                         << " application_impl::on_subscription_status: "
1014                         << "Received a subscription status without subscribe for "
1015                         << std::hex << _service << "/" << _instance << "/"
1016                         << _eventgroup << "/" << _event << "/error=" << _error;
1017             } else {
1018                 entry_found = true;
1019             }
1020         } else {
1021             entry_found = true;
1022         }
1023         if (entry_found) {
1024             if (_error) {
1025                 subscription_state_[its_tuple] =
1026                         subscription_state_e::SUBSCRIPTION_NOT_ACKNOWLEDGED;
1027             } else {
1028                 subscription_state_[its_tuple] =
1029                         subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED;
1030             }
1031         }
1032     }
1033 
1034     if (entry_found) {
1035         deliver_subscription_state(_service, _instance, _eventgroup, _event, _error);
1036     }
1037 }
1038 
deliver_subscription_state(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,uint16_t _error)1039 void application_impl::deliver_subscription_state(service_t _service, instance_t _instance,
1040         eventgroup_t _eventgroup, event_t _event, uint16_t _error) {
1041     std::vector<subscription_status_handler_t> handlers;
1042     {
1043         std::lock_guard<std::mutex> its_lock(subscription_status_handlers_mutex_);
1044         auto found_service = subscription_status_handlers_.find(_service);
1045         if (found_service != subscription_status_handlers_.end()) {
1046             auto found_instance = found_service->second.find(_instance);
1047             if (found_instance != found_service->second.end()) {
1048                 auto found_eventgroup = found_instance->second.find(_eventgroup);
1049                 if (found_eventgroup != found_instance->second.end()) {
1050                     auto found_event = found_eventgroup->second.find(_event);
1051                     if (found_event != found_eventgroup->second.end()) {
1052                         if (!_error || (_error && found_event->second.second)) {
1053                             handlers.push_back(found_event->second.first);
1054                         }
1055                     } else {
1056                         auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
1057                         if (its_any_event != found_eventgroup->second.end()) {
1058                             if (!_error || (_error && its_any_event->second.second)) {
1059                                 handlers.push_back(its_any_event->second.first);
1060                             }
1061                         }
1062                     }
1063                 }
1064             }
1065             found_instance = found_service->second.find(ANY_INSTANCE);
1066             if (found_instance != found_service->second.end()) {
1067                 auto found_eventgroup = found_instance->second.find(_eventgroup);
1068                 if (found_eventgroup != found_instance->second.end()) {
1069                     auto found_event = found_eventgroup->second.find(_event);
1070                     if (found_event != found_eventgroup->second.end()) {
1071                         if (!_error || (_error && found_event->second.second)) {
1072                             handlers.push_back(found_event->second.first);
1073                         }
1074                     } else {
1075                         auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
1076                         if (its_any_event != found_eventgroup->second.end()) {
1077                             if (!_error || (_error && its_any_event->second.second)) {
1078                                 handlers.push_back(its_any_event->second.first);
1079                             }
1080                         }
1081                     }
1082                 }
1083             }
1084         }
1085         found_service = subscription_status_handlers_.find(ANY_SERVICE);
1086         if (found_service != subscription_status_handlers_.end()) {
1087             auto found_instance = found_service->second.find(_instance);
1088             if (found_instance != found_service->second.end()) {
1089                 auto found_eventgroup = found_instance->second.find(_eventgroup);
1090                 if (found_eventgroup != found_instance->second.end()) {
1091                     auto found_event = found_eventgroup->second.find(_event);
1092                     if (found_event != found_eventgroup->second.end()) {
1093                         if (!_error || (_error && found_event->second.second)) {
1094                             handlers.push_back(found_event->second.first);
1095                         }
1096                     } else {
1097                         auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
1098                         if (its_any_event != found_eventgroup->second.end()) {
1099                             if (!_error || (_error && its_any_event->second.second)) {
1100                                 handlers.push_back(its_any_event->second.first);
1101                             }
1102                         }
1103                     }
1104                 }
1105             }
1106             found_instance = found_service->second.find(ANY_INSTANCE);
1107             if (found_instance != found_service->second.end()) {
1108                 auto found_eventgroup = found_instance->second.find(_eventgroup);
1109                 if (found_eventgroup != found_instance->second.end()) {
1110                     auto found_event = found_eventgroup->second.find(_event);
1111                     if (found_event != found_eventgroup->second.end()) {
1112                         if (!_error || (_error && found_event->second.second)) {
1113                             handlers.push_back(found_event->second.first);
1114                         }
1115                     } else {
1116                         auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
1117                         if (its_any_event != found_eventgroup->second.end()) {
1118                             if (!_error || (_error && its_any_event->second.second)) {
1119                                 handlers.push_back(its_any_event->second.first);
1120                             }
1121                         }
1122                     }
1123                 }
1124             }
1125         }
1126     }
1127     {
1128         std::unique_lock<std::mutex> handlers_lock(handlers_mutex_);
1129         for (auto &handler : handlers) {
1130             std::shared_ptr<sync_handler> its_sync_handler
1131                 = std::make_shared<sync_handler>([handler, _service,
1132                                                   _instance, _eventgroup,
1133                                                   _event, _error]() {
1134                                 handler(_service, _instance,
1135                                         _eventgroup, _event, _error);
1136                                                  });
1137             its_sync_handler->handler_type_ = handler_type_e::SUBSCRIPTION;
1138             its_sync_handler->service_id_ = _service;
1139             its_sync_handler->instance_id_ = _instance;
1140             its_sync_handler->method_id_ = _event;
1141             its_sync_handler->eventgroup_id_ = _eventgroup;
1142             handlers_.push_back(its_sync_handler);
1143         }
1144         if (handlers.size()) {
1145             dispatcher_condition_.notify_one();
1146         }
1147     }
1148 }
1149 
register_subscription_status_handler(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,subscription_status_handler_t _handler,bool _is_selective)1150 void application_impl::register_subscription_status_handler(service_t _service,
1151             instance_t _instance, eventgroup_t _eventgroup, event_t _event,
1152             subscription_status_handler_t _handler, bool _is_selective) {
1153     std::lock_guard<std::mutex> its_lock(subscription_status_handlers_mutex_);
1154     if (_handler) {
1155         subscription_status_handlers_[_service][_instance][_eventgroup][_event] =
1156                 std::make_pair(_handler, _is_selective);
1157     } else {
1158         VSOMEIP_WARNING <<
1159                 "application_impl::register_subscription_status_handler: "
1160                 "_handler is null, for unregistration please use "
1161                 "application_impl::unregister_subscription_status_handler ["
1162                 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1163                 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
1164                 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
1165                 << std::hex << std::setw(4) << std::setfill('0') << _event << "]";
1166     }
1167 }
1168 
unregister_subscription_status_handler(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)1169 void application_impl::unregister_subscription_status_handler(service_t _service,
1170             instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
1171     std::lock_guard<std::mutex> its_lock(subscription_status_handlers_mutex_);
1172     auto its_service = subscription_status_handlers_.find(_service);
1173     if (its_service != subscription_status_handlers_.end()) {
1174         auto its_instance = its_service->second.find(_instance);
1175         if (its_instance != its_service->second.end()) {
1176             auto its_eventgroup = its_instance->second.find(_eventgroup);
1177             if (its_eventgroup != its_instance->second.end()) {
1178                 its_eventgroup->second.erase(_event);
1179                 if (its_eventgroup->second.empty()) {
1180                     its_instance->second.erase(_eventgroup);
1181                     if (its_instance->second.empty()) {
1182                         its_service->second.erase(_instance);
1183                         if (its_service->second.empty()) {
1184                             subscription_status_handlers_.erase(_service);
1185                         }
1186                     }
1187                 }
1188             }
1189         }
1190     }
1191 }
1192 
register_message_handler(service_t _service,instance_t _instance,method_t _method,message_handler_t _handler)1193 void application_impl::register_message_handler(service_t _service,
1194         instance_t _instance, method_t _method, message_handler_t _handler) {
1195     std::lock_guard<std::mutex> its_lock(members_mutex_);
1196     members_[_service][_instance][_method] = _handler;
1197 }
1198 
unregister_message_handler(service_t _service,instance_t _instance,method_t _method)1199 void application_impl::unregister_message_handler(service_t _service,
1200         instance_t _instance, method_t _method) {
1201     std::lock_guard<std::mutex> its_lock(members_mutex_);
1202     auto found_service = members_.find(_service);
1203     if (found_service != members_.end()) {
1204         auto found_instance = found_service->second.find(_instance);
1205         if (found_instance != found_service->second.end()) {
1206             auto found_method = found_instance->second.find(_method);
1207             if (found_method != found_instance->second.end()) {
1208                 found_instance->second.erase(_method);
1209             }
1210         }
1211     }
1212 }
1213 
offer_event(service_t _service,instance_t _instance,event_t _notifier,const std::set<eventgroup_t> & _eventgroups,event_type_e _type,std::chrono::milliseconds _cycle,bool _change_resets_cycle,bool _update_on_change,const epsilon_change_func_t & _epsilon_change_func,reliability_type_e _reliability)1214 void application_impl::offer_event(service_t _service, instance_t _instance,
1215            event_t _notifier, const std::set<eventgroup_t> &_eventgroups,
1216            event_type_e _type,
1217            std::chrono::milliseconds _cycle, bool _change_resets_cycle,
1218            bool _update_on_change,
1219            const epsilon_change_func_t &_epsilon_change_func,
1220            reliability_type_e _reliability) {
1221        if (routing_)
1222            routing_->register_event(client_,
1223                    _service, _instance,
1224                    _notifier, _eventgroups, _type, _reliability,
1225                    _cycle, _change_resets_cycle, _update_on_change,
1226                    _epsilon_change_func, true);
1227 }
1228 
stop_offer_event(service_t _service,instance_t _instance,event_t _event)1229 void application_impl::stop_offer_event(service_t _service, instance_t _instance,
1230        event_t _event) {
1231    if (routing_)
1232        routing_->unregister_event(client_, _service, _instance, _event, true);
1233 }
1234 
request_event(service_t _service,instance_t _instance,event_t _event,const std::set<eventgroup_t> & _eventgroups,event_type_e _type,reliability_type_e _reliability)1235 void application_impl::request_event(service_t _service, instance_t _instance,
1236            event_t _event, const std::set<eventgroup_t> &_eventgroups,
1237            event_type_e _type, reliability_type_e _reliability) {
1238        if (routing_)
1239            routing_->register_event(client_,
1240                    _service, _instance,
1241                    _event, _eventgroups, _type, _reliability,
1242                    std::chrono::milliseconds::zero(), false, true,
1243                    nullptr,
1244                    false);
1245 }
1246 
release_event(service_t _service,instance_t _instance,event_t _event)1247 void application_impl::release_event(service_t _service, instance_t _instance,
1248        event_t _event) {
1249    if (routing_)
1250        routing_->unregister_event(client_, _service, _instance, _event, false);
1251 }
1252 
1253 // Interface "routing_manager_host"
get_name() const1254 const std::string & application_impl::get_name() const {
1255     return name_;
1256 }
1257 
get_client() const1258 client_t application_impl::get_client() const {
1259     return client_;
1260 }
1261 
set_client(const client_t & _client)1262 void application_impl::set_client(const client_t &_client) {
1263     client_ = _client;
1264 }
1265 
get_session()1266 session_t application_impl::get_session() {
1267 
1268 #ifdef VSOMEIP_HAS_SESSION_HANDLING_CONFIG
1269     if (!has_session_handling_)
1270         return (0);
1271 #endif // VSOMEIP_HAS_SESSION_HANDLING_CONFIG
1272 
1273     std::lock_guard<std::mutex> its_lock(session_mutex_);
1274     if (0 == ++session_) {
1275         // Smallest allowed session identifier
1276         session_ = 1;
1277     }
1278 
1279     return session_;
1280 }
1281 
get_configuration() const1282 std::shared_ptr<configuration> application_impl::get_configuration() const {
1283     return configuration_;
1284 }
1285 
get_diagnosis() const1286 diagnosis_t application_impl::get_diagnosis() const {
1287     return configuration_->get_diagnosis_address();
1288 }
1289 
get_io()1290 boost::asio::io_service & application_impl::get_io() {
1291     return io_;
1292 }
1293 
on_state(state_type_e _state)1294 void application_impl::on_state(state_type_e _state) {
1295     {
1296         std::lock_guard<std::recursive_mutex> availability_lock(availability_mutex_);
1297         if (state_ != _state) {
1298             state_ = _state;
1299             if (state_ == state_type_e::ST_REGISTERED) {
1300                 for (const auto &its_service : availability_) {
1301                     for (const auto &its_instance : its_service.second) {
1302                         for (const auto &its_major : its_instance.second) {
1303                             for (const auto &its_minor : its_major.second) {
1304                                 if (!its_minor.second.second) {
1305                                     do_register_availability_handler(
1306                                             its_service.first,
1307                                             its_instance.first,
1308                                             its_minor.second.first,
1309                                             its_major.first, its_minor.first);
1310                                 }
1311                             }
1312                         }
1313                     }
1314                 }
1315             } else {
1316                 // Call on_availability callback on each service
1317                 for (const auto &its_service : availability_) {
1318                     for (const auto &its_instance : its_service.second) {
1319                         for (const auto &its_major : its_instance.second) {
1320                             for (const auto &its_minor : its_major.second) {
1321                                 on_availability(its_service.first, its_instance.first, false, its_major.first, its_minor.first);
1322                             }
1323                         }
1324                     }
1325                 }
1326             }
1327         }
1328     }
1329     bool has_state_handler(false);
1330     state_handler_t handler = nullptr;
1331     {
1332         std::lock_guard<std::mutex> its_lock(state_handler_mutex_);
1333         if (handler_) {
1334             has_state_handler = true;
1335             handler = handler_;
1336         }
1337     }
1338     if (has_state_handler) {
1339         std::lock_guard<std::mutex> its_lock(handlers_mutex_);
1340         std::shared_ptr<sync_handler> its_sync_handler
1341             = std::make_shared<sync_handler>([handler, _state]() {
1342                                                 handler(_state);
1343                                              });
1344         its_sync_handler->handler_type_ = handler_type_e::STATE;
1345         handlers_.push_back(its_sync_handler);
1346         dispatcher_condition_.notify_one();
1347     }
1348 }
1349 
on_availability(service_t _service,instance_t _instance,bool _is_available,major_version_t _major,minor_version_t _minor)1350 void application_impl::on_availability(service_t _service, instance_t _instance,
1351         bool _is_available, major_version_t _major, minor_version_t _minor) {
1352     std::vector<availability_handler_t> its_handlers;
1353     {
1354         std::lock_guard<std::recursive_mutex> availability_lock(availability_mutex_);
1355         if (_is_available == is_available_unlocked(_service, _instance, _major, _minor)) {
1356             return;
1357         }
1358 
1359         if (_is_available) {
1360             available_[_service][_instance][_major] = _minor;
1361         } else {
1362             auto found_available_service = available_.find(_service);
1363             if (found_available_service != available_.end()) {
1364                 auto found_instance = found_available_service->second.find(_instance);
1365                 if( found_instance != found_available_service->second.end()) {
1366                     auto found_major = found_instance->second.find(_major);
1367                     if( found_major != found_instance->second.end() ){
1368                         if( _minor == found_major->second)
1369                             found_available_service->second.erase(_instance);
1370                     }
1371                 }
1372             }
1373         }
1374 
1375         auto find_matching_handler =
1376                 [&](const availability_major_minor_t& _av_ma_mi_it) {
1377             auto found_major = _av_ma_mi_it.find(_major);
1378             if (found_major != _av_ma_mi_it.end()) {
1379                 for (std::int32_t mi = static_cast<std::int32_t>(_minor); mi >= 0; mi--) {
1380                     const auto found_minor = found_major->second.find(static_cast<minor_version_t>(mi));
1381                     if (found_minor != found_major->second.end()) {
1382                         its_handlers.push_back(found_minor->second.first);
1383                     }
1384                 }
1385                 const auto found_any_minor = found_major->second.find(ANY_MINOR);
1386                 if (found_any_minor != found_major->second.end()) {
1387                     its_handlers.push_back(found_any_minor->second.first);
1388                 }
1389             }
1390             found_major = _av_ma_mi_it.find(ANY_MAJOR);
1391             if (found_major != _av_ma_mi_it.end()) {
1392                 for (std::int32_t mi = static_cast<std::int32_t>(_minor); mi >= 0; mi--) {
1393                     const auto found_minor = found_major->second.find(static_cast<minor_version_t>(mi));
1394                     if (found_minor != found_major->second.end()) {
1395                         its_handlers.push_back(found_minor->second.first);
1396                     }
1397                 }
1398                 const auto found_any_minor = found_major->second.find(ANY_MINOR);
1399                 if (found_any_minor != found_major->second.end()) {
1400                     its_handlers.push_back(found_any_minor->second.first);
1401                 }
1402             }
1403         };
1404 
1405         auto found_service = availability_.find(_service);
1406         if (found_service != availability_.end()) {
1407             auto found_instance = found_service->second.find(_instance);
1408             if (found_instance != found_service->second.end()) {
1409                 find_matching_handler(found_instance->second);
1410             }
1411             found_instance = found_service->second.find(ANY_INSTANCE);
1412             if (found_instance != found_service->second.end()) {
1413                 find_matching_handler(found_instance->second);
1414             }
1415         }
1416         found_service = availability_.find(ANY_SERVICE);
1417         if (found_service != availability_.end()) {
1418             auto found_instance = found_service->second.find(_instance);
1419             if( found_instance != found_service->second.end()) {
1420                 find_matching_handler(found_instance->second);
1421             }
1422             found_instance = found_service->second.find(ANY_INSTANCE);
1423             if( found_instance != found_service->second.end()) {
1424                 find_matching_handler(found_instance->second);
1425             }
1426         }
1427         {
1428             std::lock_guard<std::mutex> handlers_lock(handlers_mutex_);
1429             for (const auto &handler : its_handlers) {
1430                 std::shared_ptr<sync_handler> its_sync_handler =
1431                         std::make_shared<sync_handler>(
1432                                 [handler, _service, _instance, _is_available]()
1433                                 {
1434                                     handler(_service, _instance, _is_available);
1435                                 });
1436                 its_sync_handler->handler_type_ = handler_type_e::AVAILABILITY;
1437                 its_sync_handler->service_id_ = _service;
1438                 its_sync_handler->instance_id_ = _instance;
1439                 handlers_.push_back(its_sync_handler);
1440             }
1441         }
1442     }
1443     if (!_is_available) {
1444         {
1445             std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
1446             auto found_service = subscriptions_.find(_service);
1447             if (found_service != subscriptions_.end()) {
1448                 auto found_instance = found_service->second.find(_instance);
1449                 if (found_instance != found_service->second.end()) {
1450                     for (auto &event : found_instance->second) {
1451                         for (auto &eventgroup : event.second) {
1452                             eventgroup.second = false;
1453                         }
1454                     }
1455                 }
1456             }
1457         }
1458         {
1459             std::lock_guard<std::mutex> its_lock(subscriptions_state_mutex_);
1460             for (auto &its_subscription_state : subscription_state_) {
1461                 if (std::get<0>(its_subscription_state.first) == _service &&
1462                         std::get<1>(its_subscription_state.first) == _instance) {
1463                     its_subscription_state.second =
1464                             subscription_state_e::SUBSCRIPTION_NOT_ACKNOWLEDGED;
1465                 }
1466             }
1467         }
1468     }
1469 
1470     if (its_handlers.size()) {
1471         std::lock_guard<std::mutex> handlers_lock(handlers_mutex_);
1472         dispatcher_condition_.notify_one();
1473     }
1474 }
1475 
on_message(std::shared_ptr<message> && _message)1476 void application_impl::on_message(std::shared_ptr<message> &&_message) {
1477     const service_t its_service = _message->get_service();
1478     const instance_t its_instance = _message->get_instance();
1479     const method_t its_method = _message->get_method();
1480 
1481     if (_message->get_message_type() == message_type_e::MT_NOTIFICATION) {
1482         if (!check_for_active_subscription(its_service, its_instance,
1483                 static_cast<event_t>(its_method))) {
1484             VSOMEIP_ERROR << "application_impl::on_message ["
1485                 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
1486                 << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
1487                 << std::hex << std::setw(4) << std::setfill('0') << its_method << "]";
1488             return;
1489         }
1490     }
1491 
1492     {
1493         std::lock_guard<std::mutex> its_lock(members_mutex_);
1494         std::set<message_handler> its_handlers;
1495         auto found_service = members_.find(its_service);
1496         if (found_service != members_.end()) {
1497             auto found_instance = found_service->second.find(its_instance);
1498             if (found_instance != found_service->second.end()) {
1499                 auto found_method = found_instance->second.find(its_method);
1500                 if (found_method != found_instance->second.end()) {
1501                     its_handlers.insert(found_method->second);
1502                 }
1503                 auto found_any_method = found_instance->second.find(ANY_METHOD);
1504                 if (found_any_method != found_instance->second.end()) {
1505                     its_handlers.insert(found_any_method->second);
1506                 }
1507             }
1508             auto found_any_instance = found_service->second.find(ANY_INSTANCE);
1509             if (found_any_instance != found_service->second.end()) {
1510                 auto found_method = found_any_instance->second.find(its_method);
1511                 if (found_method != found_any_instance->second.end()) {
1512                     its_handlers.insert(found_method->second);
1513                 }
1514                 auto found_any_method = found_any_instance->second.find(ANY_METHOD);
1515                 if (found_any_method != found_any_instance->second.end()) {
1516                     its_handlers.insert(found_any_method->second);
1517                 }
1518             }
1519         }
1520         auto found_any_service = members_.find(ANY_SERVICE);
1521         if (found_any_service != members_.end()) {
1522             auto found_instance = found_any_service->second.find(its_instance);
1523             if (found_instance != found_any_service->second.end()) {
1524                 auto found_method = found_instance->second.find(its_method);
1525                 if (found_method != found_instance->second.end()) {
1526                     its_handlers.insert(found_method->second);
1527                 }
1528                 auto found_any_method = found_instance->second.find(ANY_METHOD);
1529                 if (found_any_method != found_instance->second.end()) {
1530                     its_handlers.insert(found_any_method->second);
1531                 }
1532             }
1533             auto found_any_instance = found_any_service->second.find(ANY_INSTANCE);
1534             if (found_any_instance != found_any_service->second.end()) {
1535                 auto found_method = found_any_instance->second.find(its_method);
1536                 if (found_method != found_any_instance->second.end()) {
1537                     its_handlers.insert(found_method->second);
1538                 }
1539                 auto found_any_method = found_any_instance->second.find(ANY_METHOD);
1540                 if (found_any_method != found_any_instance->second.end()) {
1541                     its_handlers.insert(found_any_method->second);
1542                 }
1543             }
1544         }
1545 
1546         if (its_handlers.size()) {
1547             std::lock_guard<std::mutex> its_lock(handlers_mutex_);
1548             for (const auto &its_handler : its_handlers) {
1549                 auto handler = its_handler.handler_;
1550                 std::shared_ptr<sync_handler> its_sync_handler =
1551                         std::make_shared<sync_handler>([handler, _message]() {
1552                             handler(_message);
1553                         });
1554                 its_sync_handler->handler_type_ = handler_type_e::MESSAGE;
1555                 its_sync_handler->service_id_ = _message->get_service();
1556                 its_sync_handler->instance_id_ = _message->get_instance();
1557                 its_sync_handler->method_id_ = _message->get_method();
1558                 its_sync_handler->session_id_ = _message->get_session();
1559                 handlers_.push_back(its_sync_handler);
1560             }
1561             dispatcher_condition_.notify_one();
1562         }
1563     }
1564 }
1565 
1566 // Interface "service_discovery_host"
get_routing_manager() const1567 routing_manager * application_impl::get_routing_manager() const {
1568     return routing_.get();
1569 }
1570 
main_dispatch()1571 void application_impl::main_dispatch() {
1572 #ifndef _WIN32
1573     {
1574         std::stringstream s;
1575         s << std::hex << std::setw(4) << std::setfill('0')
1576             << client_ << "_m_dispatch";
1577         pthread_setname_np(pthread_self(),s.str().c_str());
1578     }
1579 #endif
1580     const std::thread::id its_id = std::this_thread::get_id();
1581     VSOMEIP_INFO << "main dispatch thread id from application: "
1582             << std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
1583             << name_ << ") is: " << std::hex << its_id
1584 #ifndef _WIN32
1585             << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
1586 #endif
1587             ;
1588     std::unique_lock<std::mutex> its_lock(handlers_mutex_);
1589     while (is_dispatching_) {
1590         if (handlers_.empty() || !is_active_dispatcher(its_id)) {
1591             // Cancel other waiting dispatcher
1592             dispatcher_condition_.notify_all();
1593             // Wait for new handlers to execute
1594             while (is_dispatching_ && (handlers_.empty() || !is_active_dispatcher(its_id))) {
1595                 dispatcher_condition_.wait(its_lock);
1596             }
1597         } else {
1598             std::shared_ptr<sync_handler> its_handler;
1599             while (is_dispatching_  && is_active_dispatcher(its_id)
1600                    && (its_handler = get_next_handler())) {
1601                 its_lock.unlock();
1602                 invoke_handler(its_handler);
1603 
1604                 if (!is_dispatching_)
1605                     return;
1606 
1607                 its_lock.lock();
1608 
1609                 reschedule_availability_handler(its_handler);
1610                 remove_elapsed_dispatchers();
1611 
1612 #ifdef _WIN32
1613                 if(!is_dispatching_) {
1614                     its_lock.unlock();
1615                     return;
1616                 }
1617 #endif
1618             }
1619         }
1620     }
1621     its_lock.unlock();
1622 }
1623 
dispatch()1624 void application_impl::dispatch() {
1625 #ifndef _WIN32
1626     {
1627         std::stringstream s;
1628         s << std::hex << std::setw(4) << std::setfill('0')
1629             << client_ << "_dispatch";
1630         pthread_setname_np(pthread_self(),s.str().c_str());
1631     }
1632 #endif
1633     const std::thread::id its_id = std::this_thread::get_id();
1634     VSOMEIP_INFO << "dispatch thread id from application: "
1635             << std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
1636             << name_ << ") is: " << std::hex << its_id
1637 #ifndef _WIN32
1638             << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
1639 #endif
1640             ;
1641     std::unique_lock<std::mutex> its_lock(handlers_mutex_);
1642     while (is_active_dispatcher(its_id)) {
1643         if (is_dispatching_ && handlers_.empty()) {
1644              dispatcher_condition_.wait(its_lock);
1645              // Maybe woken up from main dispatcher
1646              if (handlers_.empty() && !is_active_dispatcher(its_id)) {
1647                  if (!is_dispatching_) {
1648                      return;
1649                  }
1650                  std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
1651                  elapsed_dispatchers_.insert(its_id);
1652                  return;
1653              }
1654         } else {
1655             std::shared_ptr<sync_handler> its_handler;
1656             while (is_dispatching_ && is_active_dispatcher(its_id)
1657                    && (its_handler = get_next_handler())) {
1658                 its_lock.unlock();
1659                 invoke_handler(its_handler);
1660 
1661                 if (!is_dispatching_)
1662                     return;
1663 
1664                 its_lock.lock();
1665 
1666                 reschedule_availability_handler(its_handler);
1667                 remove_elapsed_dispatchers();
1668             }
1669         }
1670     }
1671     if (is_dispatching_) {
1672         std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
1673         elapsed_dispatchers_.insert(its_id);
1674     }
1675     dispatcher_condition_.notify_all();
1676 }
1677 
get_next_handler()1678 std::shared_ptr<application_impl::sync_handler> application_impl::get_next_handler() {
1679     std::shared_ptr<sync_handler> its_next_handler;
1680     while (!handlers_.empty() && !its_next_handler) {
1681         its_next_handler = handlers_.front();
1682         handlers_.pop_front();
1683 
1684         // Check handler
1685         if (its_next_handler->handler_type_ == handler_type_e::AVAILABILITY) {
1686             const std::pair<service_t, instance_t> its_si_pair = std::make_pair(
1687                     its_next_handler->service_id_,
1688                     its_next_handler->instance_id_);
1689             auto found_si = availability_handlers_.find(its_si_pair);
1690             if (found_si != availability_handlers_.end()
1691                     && !found_si->second.empty()
1692                     && found_si->second.front() != its_next_handler) {
1693                 found_si->second.push_back(its_next_handler);
1694                 // There is a running availability handler for this service.
1695                 // Therefore, this one must wait...
1696                 its_next_handler = nullptr;
1697             } else {
1698                 availability_handlers_[its_si_pair].push_back(its_next_handler);
1699             }
1700         } else if (its_next_handler->handler_type_ == handler_type_e::MESSAGE) {
1701             const std::pair<service_t, instance_t> its_si_pair = std::make_pair(
1702                     its_next_handler->service_id_,
1703                     its_next_handler->instance_id_);
1704             auto found_si = availability_handlers_.find(its_si_pair);
1705             if (found_si != availability_handlers_.end()
1706                     && found_si->second.size() > 1) {
1707                 // The message comes after the next availability handler
1708                 // Therefore, queue it to the last one
1709                 found_si->second.push_back(its_next_handler);
1710                 its_next_handler = nullptr;
1711             }
1712         }
1713     }
1714 
1715     return its_next_handler;
1716 }
1717 
reschedule_availability_handler(const std::shared_ptr<sync_handler> & _handler)1718 void application_impl::reschedule_availability_handler(
1719         const std::shared_ptr<sync_handler> &_handler) {
1720     if (_handler->handler_type_ == handler_type_e::AVAILABILITY) {
1721         const std::pair<service_t, instance_t> its_si_pair = std::make_pair(
1722                 _handler->service_id_, _handler->instance_id_);
1723         auto found_si = availability_handlers_.find(its_si_pair);
1724         if (found_si != availability_handlers_.end()) {
1725             if (!found_si->second.empty()
1726                     && found_si->second.front() == _handler) {
1727                 found_si->second.pop_front();
1728 
1729                 // If there are other availability handlers pending, schedule
1730                 //  them and all handlers that were queued because of them
1731                 for (auto it = found_si->second.rbegin();
1732                         it != found_si->second.rend(); it++) {
1733                     handlers_.push_front(*it);
1734                 }
1735                 availability_handlers_.erase(found_si);
1736             }
1737             return;
1738         }
1739         VSOMEIP_WARNING << __func__
1740                 << ": An unknown availability handler returned!";
1741     }
1742 }
1743 
invoke_handler(std::shared_ptr<sync_handler> & _handler)1744 void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) {
1745     const std::thread::id its_id = std::this_thread::get_id();
1746 
1747     std::shared_ptr<sync_handler> its_sync_handler
1748         = std::make_shared<sync_handler>(_handler->service_id_,
1749             _handler->instance_id_, _handler->method_id_,
1750             _handler->session_id_, _handler->eventgroup_id_,
1751             _handler->handler_type_);
1752 
1753     boost::asio::steady_timer its_dispatcher_timer(io_);
1754     its_dispatcher_timer.expires_from_now(std::chrono::milliseconds(max_dispatch_time_));
1755     its_dispatcher_timer.async_wait([this, its_sync_handler](const boost::system::error_code &_error) {
1756         if (!_error) {
1757             print_blocking_call(its_sync_handler);
1758             if (has_active_dispatcher()) {
1759                 std::lock_guard<std::mutex> its_lock(handlers_mutex_);
1760                 dispatcher_condition_.notify_all();
1761             } else {
1762                 // If possible, create a new dispatcher thread to unblock.
1763                 // If this is _not_ possible, dispatching is blocked until
1764                 // at least one of the active handler calls returns.
1765                 while (is_dispatching_) {
1766                     if (dispatcher_mutex_.try_lock()) {
1767                         if (dispatchers_.size() < max_dispatchers_) {
1768                             if (is_dispatching_) {
1769                                 auto its_dispatcher = std::make_shared<std::thread>(
1770                                     std::bind(&application_impl::dispatch, shared_from_this()));
1771                                 dispatchers_[its_dispatcher->get_id()] = its_dispatcher;
1772                             } else {
1773                                 VSOMEIP_INFO << "Won't start new dispatcher "
1774                                         "thread as Client=" << std::hex
1775                                         << get_client() << " is shutting down";
1776                             }
1777                         } else {
1778                             VSOMEIP_ERROR << "Maximum number of dispatchers exceeded.";
1779                         }
1780                         dispatcher_mutex_.unlock();
1781                         break;
1782                     } else {
1783                         std::this_thread::yield();
1784                     }
1785                 }
1786             }
1787         }
1788     });
1789     if (client_side_logging_
1790         && (client_side_logging_filter_.empty()
1791             || (1 == client_side_logging_filter_.count(std::make_tuple(its_sync_handler->service_id_, ANY_INSTANCE)))
1792             || (1 == client_side_logging_filter_.count(std::make_tuple(its_sync_handler->service_id_, its_sync_handler->instance_id_))))) {
1793         VSOMEIP_INFO << "Invoking handler: ("
1794             << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
1795             << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->service_id_ << "."
1796             << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->instance_id_ << "."
1797             << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->method_id_ << ":"
1798             << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->session_id_ << "] "
1799             << "type=" << static_cast<std::uint32_t>(its_sync_handler->handler_type_)
1800             << " thread=" << std::hex << its_id;
1801     }
1802 
1803     while (is_dispatching_ ) {
1804         if (dispatcher_mutex_.try_lock()) {
1805             running_dispatchers_.insert(its_id);
1806             dispatcher_mutex_.unlock();
1807             break;
1808         }
1809         std::this_thread::yield();
1810     }
1811 
1812     if (is_dispatching_) {
1813         try {
1814             _handler->handler_();
1815         } catch (const std::exception &e) {
1816             VSOMEIP_ERROR << "application_impl::invoke_handler caught exception: "
1817                     << e.what();
1818             print_blocking_call(its_sync_handler);
1819         }
1820     }
1821     boost::system::error_code ec;
1822     its_dispatcher_timer.cancel(ec);
1823 
1824     while (is_dispatching_ ) {
1825         if (dispatcher_mutex_.try_lock()) {
1826             running_dispatchers_.erase(its_id);
1827             dispatcher_mutex_.unlock();
1828             return;
1829         }
1830         std::this_thread::yield();
1831     }
1832 }
1833 
has_active_dispatcher()1834 bool application_impl::has_active_dispatcher() {
1835     while (is_dispatching_) {
1836         if (dispatcher_mutex_.try_lock()) {
1837             for (const auto &d : dispatchers_) {
1838                 if (running_dispatchers_.find(d.first) == running_dispatchers_.end() &&
1839                     elapsed_dispatchers_.find(d.first) == elapsed_dispatchers_.end()) {
1840                     dispatcher_mutex_.unlock();
1841                     return true;
1842                 }
1843             }
1844             dispatcher_mutex_.unlock();
1845             return false;
1846         }
1847         std::this_thread::yield();
1848     }
1849     return false;
1850 }
1851 
is_active_dispatcher(const std::thread::id & _id)1852 bool application_impl::is_active_dispatcher(const std::thread::id &_id) {
1853     while (is_dispatching_) {
1854         if (dispatcher_mutex_.try_lock()) {
1855             for (const auto &d : dispatchers_) {
1856                 if (d.first != _id &&
1857                     running_dispatchers_.find(d.first) == running_dispatchers_.end() &&
1858                     elapsed_dispatchers_.find(d.first) == elapsed_dispatchers_.end()) {
1859                     dispatcher_mutex_.unlock();
1860                     return false;
1861                 }
1862             }
1863             dispatcher_mutex_.unlock();
1864             return true;
1865         }
1866         std::this_thread::yield();
1867     }
1868     return false;
1869 }
1870 
remove_elapsed_dispatchers()1871 void application_impl::remove_elapsed_dispatchers() {
1872     if (is_dispatching_) {
1873         std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
1874         for (auto id : elapsed_dispatchers_) {
1875             auto its_dispatcher = dispatchers_.find(id);
1876             if (its_dispatcher->second->joinable())
1877                 its_dispatcher->second->join();
1878             dispatchers_.erase(id);
1879         }
1880         elapsed_dispatchers_.clear();
1881     }
1882 }
1883 
clear_all_handler()1884 void application_impl::clear_all_handler() {
1885     unregister_state_handler();
1886     {
1887         std::lock_guard<std::mutex> its_lock(offered_services_handler_mutex_);
1888         offered_services_handler_ = nullptr;
1889     }
1890 
1891     {
1892         std::lock_guard<std::recursive_mutex> availability_lock(availability_mutex_);
1893         availability_.clear();
1894     }
1895 
1896     {
1897         std::lock_guard<std::mutex> its_lock(subscription_mutex_);
1898         subscription_.clear();
1899     }
1900 
1901     {
1902         std::lock_guard<std::mutex> its_lock(subscription_error_mutex_);
1903         eventgroup_error_handlers_.clear();
1904     }
1905 
1906     {
1907         std::lock_guard<std::mutex> its_lock(members_mutex_);
1908         members_.clear();
1909     }
1910     {
1911         std::lock_guard<std::mutex> its_lock(handlers_mutex_);
1912         handlers_.clear();
1913     }
1914 }
1915 
shutdown()1916 void application_impl::shutdown() {
1917     VSOMEIP_INFO << "shutdown thread id from application: "
1918             << std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
1919             << name_ << ") is: " << std::hex << std::this_thread::get_id()
1920 #ifndef _WIN32
1921             << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
1922 #endif
1923     ;
1924 #ifndef _WIN32
1925     boost::asio::detail::posix_signal_blocker blocker;
1926     {
1927         std::stringstream s;
1928         s << std::hex << std::setw(4) << std::setfill('0')
1929             << client_ << "_shutdown";
1930         pthread_setname_np(pthread_self(),s.str().c_str());
1931     }
1932 #endif
1933 
1934     {
1935         std::unique_lock<std::mutex> its_lock(start_stop_mutex_);
1936         while(!stopped_) {
1937             stop_cv_.wait(its_lock);
1938         }
1939     }
1940     {
1941         std::lock_guard<std::mutex> its_handler_lock(handlers_mutex_);
1942         is_dispatching_ = false;
1943         dispatcher_condition_.notify_all();
1944     }
1945 
1946     try {
1947         std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
1948         for (const auto& its_dispatcher : dispatchers_) {
1949             if (its_dispatcher.second->get_id() != stop_caller_id_) {
1950                 if (its_dispatcher.second->joinable()) {
1951                     its_dispatcher.second->join();
1952                 }
1953             } else {
1954                 // If the caller of stop() is one of our dispatchers
1955                 // it can happen the shutdown mechanism will block
1956                 // as that thread probably can't be joined. The reason
1957                 // is the caller of stop() probably wants to join the
1958                 // thread once call start (which got to the IO-Thread)
1959                 // and which is expected to return after stop() has been
1960                 // called.
1961                 // Therefore detach this thread instead of joining because
1962                 // after it will return to "main_dispatch" it will be
1963                 // properly shutdown anyways because "is_dispatching_"
1964                 // was set to "false" here.
1965                 its_dispatcher.second->detach();
1966             }
1967         }
1968         availability_handlers_.clear();
1969         running_dispatchers_.clear();
1970         elapsed_dispatchers_.clear();
1971         dispatchers_.clear();
1972     } catch (const std::exception &e) {
1973         VSOMEIP_ERROR << "application_impl::" << __func__ << ": stopping dispatchers, "
1974                 << " catched exception: " << e.what();
1975     }
1976 
1977     try {
1978         if (routing_)
1979             routing_->stop();
1980     } catch (const std::exception &e) {
1981         VSOMEIP_ERROR << "application_impl::" << __func__ << ": stopping routing, "
1982                 << " catched exception: " << e.what();
1983     }
1984 
1985     try {
1986         work_.reset();
1987         io_.stop();
1988     } catch (const std::exception &e) {
1989         VSOMEIP_ERROR << "application_impl::" << __func__ << ": stopping io, "
1990                 << " catched exception: " << e.what();
1991     }
1992 
1993     try {
1994         std::lock_guard<std::mutex> its_lock_start_stop(start_stop_mutex_);
1995         for (const auto& t : io_threads_) {
1996             if (t->joinable()) {
1997                 t->join();
1998             }
1999         }
2000         io_threads_.clear();
2001     } catch (const std::exception &e) {
2002         VSOMEIP_ERROR << "application_impl::" << __func__ << ": joining threads, "
2003                 << " catched exception: " << e.what();
2004     }
2005 }
2006 
is_routing() const2007 bool application_impl::is_routing() const {
2008     return is_routing_manager_host_;
2009 }
2010 
send_back_cached_event(service_t _service,instance_t _instance,event_t _event)2011 void application_impl::send_back_cached_event(service_t _service,
2012                                               instance_t _instance,
2013                                               event_t _event) {
2014     std::shared_ptr<event> its_event = routing_->find_event(_service,
2015             _instance, _event);
2016     if (its_event && its_event->is_field() && its_event->is_set()) {
2017         std::shared_ptr<message> its_message = runtime_->create_notification();
2018         its_message->set_service(_service);
2019         its_message->set_method(_event);
2020         its_message->set_instance(_instance);
2021         its_message->set_payload(its_event->get_payload());
2022         its_message->set_initial(true);
2023         on_message(std::move(its_message));
2024         VSOMEIP_INFO << "Sending back cached event ("
2025                 << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
2026                 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2027                 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2028                 << std::hex << std::setw(4) << std::setfill('0') << _event << "]";
2029     }
2030 }
2031 
send_back_cached_eventgroup(service_t _service,instance_t _instance,eventgroup_t _eventgroup)2032 void application_impl::send_back_cached_eventgroup(service_t _service,
2033                                                    instance_t _instance,
2034                                                    eventgroup_t _eventgroup) {
2035     std::set<std::shared_ptr<event>> its_events = routing_->find_events(_service, _instance,
2036             _eventgroup);
2037     for(const auto &its_event : its_events) {
2038         if (its_event && its_event->is_field() && its_event->is_set()) {
2039             std::shared_ptr<message> its_message = runtime_->create_notification();
2040             const event_t its_event_id(its_event->get_event());
2041             its_message->set_service(_service);
2042             its_message->set_method(its_event_id);
2043             its_message->set_instance(_instance);
2044             its_message->set_payload(its_event->get_payload());
2045             its_message->set_initial(true);
2046             on_message(std::move(its_message));
2047             VSOMEIP_INFO << "Sending back cached event ("
2048                     << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
2049                     << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2050                     << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2051                     << std::hex << std::setw(4) << std::setfill('0') << its_event_id
2052                     << "] from eventgroup "
2053                     << std::hex << std::setw(4) << std::setfill('0') << _eventgroup;
2054         }
2055     }
2056 }
2057 
set_routing_state(routing_state_e _routing_state)2058 void application_impl::set_routing_state(routing_state_e _routing_state) {
2059     if (routing_)
2060         routing_->set_routing_state(_routing_state);
2061 }
2062 
check_send_back_cached_event(service_t _service,instance_t _instance,event_t _event,eventgroup_t _eventgroup,bool * _send_back_cached_event,bool * _send_back_cached_eventgroup)2063 void application_impl::check_send_back_cached_event(
2064         service_t _service, instance_t _instance, event_t _event,
2065         eventgroup_t _eventgroup, bool *_send_back_cached_event,
2066         bool *_send_back_cached_eventgroup) {
2067     std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
2068     *_send_back_cached_event = false;
2069     *_send_back_cached_eventgroup = false;
2070     bool already_subscribed(false);
2071     auto found_service = subscriptions_.find(_service);
2072     if(found_service != subscriptions_.end()) {
2073         auto found_instance = found_service->second.find(_instance);
2074         if (found_instance != found_service->second.end()) {
2075             auto found_event = found_instance->second.find(_event);
2076             if (found_event != found_instance->second.end()) {
2077                 auto found_eventgroup = found_event->second.find(_eventgroup);
2078                 if (found_eventgroup != found_event->second.end()) {
2079                     already_subscribed = true;
2080                     if (found_eventgroup->second) {
2081                         // initial values for this event have already been
2082                         // received, send back cached value
2083                         if(_event == ANY_EVENT) {
2084                             *_send_back_cached_eventgroup = true;
2085                         } else {
2086                             *_send_back_cached_event = true;
2087                         }
2088                     }
2089                 }
2090             }
2091         }
2092     }
2093 
2094     if (!already_subscribed) {
2095         subscriptions_[_service][_instance][_event][_eventgroup] = false;
2096     }
2097 }
2098 
remove_subscription(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)2099 void application_impl::remove_subscription(service_t _service,
2100                                            instance_t _instance,
2101                                            eventgroup_t _eventgroup,
2102                                            event_t _event) {
2103 
2104     {
2105         auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, _event);
2106         std::lock_guard<std::mutex> its_lock(subscriptions_state_mutex_);
2107         subscription_state_.erase(its_tuple);
2108     }
2109 
2110     std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
2111 
2112     auto found_service = subscriptions_.find(_service);
2113     if(found_service != subscriptions_.end()) {
2114         auto found_instance = found_service->second.find(_instance);
2115         if (found_instance != found_service->second.end()) {
2116             auto found_event = found_instance->second.find(_event);
2117             if (found_event != found_instance->second.end()) {
2118                 if (found_event->second.erase(_eventgroup)) {
2119                     if (!found_event->second.size()) {
2120                         found_instance->second.erase(_event);
2121                         if (!found_instance->second.size()) {
2122                             found_service->second.erase(_instance);
2123                             if (!found_service->second.size()) {
2124                                 subscriptions_.erase(_service);
2125                             }
2126                         }
2127                     }
2128                 }
2129             }
2130         }
2131     }
2132 }
2133 
check_for_active_subscription(service_t _service,instance_t _instance,event_t _event)2134 bool application_impl::check_for_active_subscription(service_t _service,
2135                                                      instance_t _instance,
2136                                                      event_t _event) {
2137     std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
2138     auto found_service = subscriptions_.find(_service);
2139     if(found_service != subscriptions_.end()) {
2140         auto found_instance = found_service->second.find(_instance);
2141         if (found_instance != found_service->second.end()) {
2142             auto found_event = found_instance->second.find(_event);
2143             if (found_event != found_instance->second.end()) {
2144                 if (found_event->second.size()) {
2145                     for (auto &eventgroup : found_event->second) {
2146                         eventgroup.second = true;
2147                     }
2148                     return true;
2149                 }
2150             } else {
2151                 // Received a event which nobody yet explicitly subscribed to.
2152                 // Check if someone subscribed to ANY_EVENT for one of
2153                 // the received event's eventgroups
2154                 auto found_any_event = found_instance->second.find(ANY_EVENT);
2155                 if (found_any_event != found_instance->second.end()) {
2156                     if (routing_) {
2157                         std::shared_ptr<event> its_event = routing_->find_event(
2158                                 _service, _instance, _event);
2159                         if (its_event) {
2160                             for (const auto& eg : its_event->get_eventgroups()) {
2161                                 auto found_eventgroup = found_any_event->second.find(eg);
2162                                 if (found_eventgroup != found_any_event->second.end()) {
2163                                     // set the flag for initial event received to true
2164                                     // even if we might not already received all of the
2165                                     // eventgroups events.
2166                                     found_eventgroup->second = true;
2167                                     return true;
2168                                 }
2169                             }
2170                         }
2171                     }
2172                 }
2173             }
2174         }
2175     }
2176     // Return false if an event was received from:
2177     // - a service which nobody yet subscribed to
2178     // - a service instance which nobody yet subscribed to
2179     // - a service instance and nobody yet subscribed to one of the event's
2180     //   eventgroups
2181     return false;
2182 }
2183 
check_subscription_state(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)2184 bool application_impl::check_subscription_state(service_t _service, instance_t _instance,
2185         eventgroup_t _eventgroup, event_t _event) {
2186     bool is_acknowledged(false);
2187     bool should_subscribe(true);
2188     {
2189         auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, _event);
2190         std::lock_guard<std::mutex> its_lock(subscriptions_state_mutex_);
2191         auto its_subscription_state = subscription_state_.find(its_tuple);
2192         if (its_subscription_state != subscription_state_.end()) {
2193             if (its_subscription_state->second !=
2194                     subscription_state_e::SUBSCRIPTION_NOT_ACKNOWLEDGED) {
2195                 // only return true if subscription is NACK
2196                 // as only then we need to subscribe!
2197                 should_subscribe = false;
2198                 if (its_subscription_state->second ==
2199                         subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED) {
2200                     is_acknowledged = true;
2201                 }
2202             }
2203         } else {
2204             subscription_state_[its_tuple] = subscription_state_e::IS_SUBSCRIBING;
2205         }
2206     }
2207 
2208     if (!should_subscribe && is_acknowledged) {
2209         // Deliver subscription state only if ACK has already received
2210         deliver_subscription_state(_service, _instance, _eventgroup, _event, 0 /* OK */);
2211     }
2212 
2213     return should_subscribe;
2214 }
2215 
print_blocking_call(const std::shared_ptr<sync_handler> & _handler)2216 void application_impl::print_blocking_call(const std::shared_ptr<sync_handler>& _handler) {
2217     switch (_handler->handler_type_) {
2218         case handler_type_e::AVAILABILITY:
2219             VSOMEIP_WARNING << "BLOCKING CALL AVAILABILITY("
2220                 << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): ["
2221                 << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "."
2222                 << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "]";
2223             break;
2224         case handler_type_e::MESSAGE:
2225             VSOMEIP_WARNING << "BLOCKING CALL MESSAGE("
2226                 << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): ["
2227                 << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "."
2228                 << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "."
2229                 << std::hex << std::setw(4) << std::setfill('0') << _handler->method_id_ << ":"
2230                 << std::hex << std::setw(4) << std::setfill('0') << _handler->session_id_ << "]";
2231             break;
2232         case handler_type_e::STATE:
2233             VSOMEIP_WARNING << "BLOCKING CALL STATE("
2234                 << std::hex << std::setw(4) << std::setfill('0') << get_client() << ")";
2235             break;
2236         case handler_type_e::SUBSCRIPTION:
2237             VSOMEIP_WARNING << "BLOCKING CALL SUBSCRIPTION("
2238                 << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): ["
2239                 << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "."
2240                 << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "."
2241                 << std::hex << std::setw(4) << std::setfill('0') << _handler->eventgroup_id_ << ":"
2242                 << std::hex << std::setw(4) << std::setfill('0') << _handler->method_id_ << "]";
2243             break;
2244         case handler_type_e::OFFERED_SERVICES_INFO:
2245             VSOMEIP_WARNING << "BLOCKING CALL OFFERED_SERVICES_INFO("
2246                 << std::hex << std::setw(4) << std::setfill('0') << get_client() <<")";
2247             break;
2248         case handler_type_e::WATCHDOG:
2249             VSOMEIP_WARNING << "BLOCKING CALL WATCHDOG("
2250                 << std::hex << std::setw(4) << std::setfill('0') << get_client() <<")";
2251             break;
2252         case handler_type_e::UNKNOWN:
2253             VSOMEIP_WARNING << "BLOCKING CALL UNKNOWN("
2254                 << std::hex << std::setw(4) << std::setfill('0') << get_client() << ")";
2255             break;
2256     }
2257 }
2258 
2259 
get_offered_services_async(offer_type_e _offer_type,offered_services_handler_t _handler)2260 void application_impl::get_offered_services_async(offer_type_e _offer_type, offered_services_handler_t _handler) {
2261     {
2262         std::lock_guard<std::mutex> its_lock(offered_services_handler_mutex_);
2263         offered_services_handler_ = _handler;
2264     }
2265 
2266     if (!is_routing_manager_host_) {
2267         routing_->send_get_offered_services_info(get_client(), _offer_type);
2268     } else {
2269         std::vector<std::pair<service_t, instance_t>> its_services;
2270         auto its_routing_manager_host = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2271 
2272         for (const auto& s : its_routing_manager_host->get_offered_services()) {
2273             for (const auto& i : s.second) {
2274                 auto its_unreliable_endpoint = i.second->get_endpoint(false);
2275                 auto its_reliable_endpoint = i.second->get_endpoint(true);
2276 
2277                 if (_offer_type == offer_type_e::OT_LOCAL) {
2278                     if ( ((its_unreliable_endpoint && (its_unreliable_endpoint->get_local_port() == ILLEGAL_PORT))
2279                                 && (its_reliable_endpoint && (its_reliable_endpoint->get_local_port() == ILLEGAL_PORT)))
2280                                 || (!its_reliable_endpoint && !its_unreliable_endpoint)) {
2281                         its_services.push_back(std::make_pair(s.first, i.first));
2282                     }
2283                 } else if (_offer_type == offer_type_e::OT_REMOTE) {
2284                     if ((its_unreliable_endpoint && its_unreliable_endpoint->get_local_port() != ILLEGAL_PORT)
2285                                  || (its_reliable_endpoint && its_reliable_endpoint->get_local_port() != ILLEGAL_PORT)) {
2286                         its_services.push_back(std::make_pair(s.first, i.first));
2287                      }
2288                 } else if (_offer_type == offer_type_e::OT_ALL) {
2289                     its_services.push_back(std::make_pair(s.first, i.first));
2290                 }
2291             }
2292         }
2293         on_offered_services_info(its_services);
2294     }
2295     return;
2296 }
2297 
2298 
on_offered_services_info(std::vector<std::pair<service_t,instance_t>> & _services)2299 void application_impl::on_offered_services_info(std::vector<std::pair<service_t, instance_t>> &_services) {
2300     bool has_offered_services_handler(false);
2301     offered_services_handler_t handler = nullptr;
2302     {
2303         std::lock_guard<std::mutex> its_lock(offered_services_handler_mutex_);
2304         if (offered_services_handler_) {
2305             has_offered_services_handler = true;
2306             handler = offered_services_handler_;
2307         }
2308     }
2309     if (has_offered_services_handler) {
2310         std::lock_guard<std::mutex> its_lock(handlers_mutex_);
2311         std::shared_ptr<sync_handler> its_sync_handler
2312             = std::make_shared<sync_handler>([handler, _services]() {
2313                                                 handler(_services);
2314                                              });
2315         its_sync_handler->handler_type_ = handler_type_e::OFFERED_SERVICES_INFO;
2316         handlers_.push_back(its_sync_handler);
2317         dispatcher_condition_.notify_one();
2318     }
2319 }
2320 
watchdog_cbk(boost::system::error_code const & _error)2321 void application_impl::watchdog_cbk(boost::system::error_code const &_error) {
2322     if (!_error) {
2323 
2324         watchdog_handler_t handler = nullptr;
2325         {
2326             std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
2327             handler = watchdog_handler_;
2328             if (handler && std::chrono::seconds::zero() != watchdog_interval_) {
2329                 watchdog_timer_.expires_from_now(watchdog_interval_);
2330                 watchdog_timer_.async_wait(std::bind(&application_impl::watchdog_cbk,
2331                         this, std::placeholders::_1));
2332             }
2333         }
2334 
2335         if (handler) {
2336             std::lock_guard<std::mutex> its_lock(handlers_mutex_);
2337             std::shared_ptr<sync_handler> its_sync_handler
2338                 = std::make_shared<sync_handler>([handler]() { handler(); });
2339             its_sync_handler->handler_type_ = handler_type_e::WATCHDOG;
2340             handlers_.push_back(its_sync_handler);
2341             dispatcher_condition_.notify_one();
2342         }
2343     }
2344 }
2345 
set_watchdog_handler(watchdog_handler_t _handler,std::chrono::seconds _interval)2346 void application_impl::set_watchdog_handler(watchdog_handler_t _handler,
2347             std::chrono::seconds _interval) {
2348     if (_handler && std::chrono::seconds::zero() != _interval) {
2349         std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
2350         watchdog_handler_ = _handler;
2351         watchdog_interval_ = _interval;
2352         watchdog_timer_.expires_from_now(_interval);
2353         watchdog_timer_.async_wait(std::bind(&application_impl::watchdog_cbk,
2354                 this, std::placeholders::_1));
2355     } else {
2356         std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
2357         watchdog_timer_.cancel();
2358         watchdog_handler_ = nullptr;
2359         watchdog_interval_ = std::chrono::seconds::zero();
2360     }
2361 }
2362 
register_async_subscription_handler(service_t _service,instance_t _instance,eventgroup_t _eventgroup,async_subscription_handler_t _handler)2363 void application_impl::register_async_subscription_handler(service_t _service,
2364     instance_t _instance, eventgroup_t _eventgroup,
2365     async_subscription_handler_t _handler) {
2366 
2367     std::lock_guard<std::mutex> its_lock(subscription_mutex_);
2368     subscription_[_service][_instance][_eventgroup] = std::make_pair(nullptr, _handler);;
2369 }
2370 
register_sd_acceptance_handler(sd_acceptance_handler_t _handler)2371 void application_impl::register_sd_acceptance_handler(
2372         sd_acceptance_handler_t _handler) {
2373     if (is_routing() && routing_) {
2374         const auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2375         rm_impl->register_sd_acceptance_handler(_handler);
2376     }
2377 }
2378 
register_reboot_notification_handler(reboot_notification_handler_t _handler)2379 void application_impl::register_reboot_notification_handler(
2380         reboot_notification_handler_t _handler) {
2381     if (is_routing() && routing_) {
2382         const auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2383         rm_impl->register_reboot_notification_handler(_handler);
2384     }
2385 }
2386 
set_sd_acceptance_required(const remote_info_t & _remote,const std::string & _path,bool _enable)2387 void application_impl::set_sd_acceptance_required(
2388         const remote_info_t &_remote, const std::string &_path, bool _enable) {
2389 
2390     if (!is_routing()) {
2391         return;
2392     }
2393 
2394     const boost::asio::ip::address its_address(_remote.ip_.is_v4_ ?
2395             static_cast<boost::asio::ip::address>(boost::asio::ip::address_v4(
2396                     _remote.ip_.address_.v4_)) :
2397             static_cast<boost::asio::ip::address>(boost::asio::ip::address_v6(
2398                     _remote.ip_.address_.v6_)));
2399 
2400     if (_remote.first_ == std::numeric_limits<std::uint16_t>::max()
2401             && _remote.last_ == 0) {
2402         // special case to (de)activate rules per IP
2403         configuration_->set_sd_acceptance_rules_active(its_address, _enable);
2404         return;
2405     }
2406 
2407     configuration::port_range_t its_range { _remote.first_, _remote.last_ };
2408     configuration_->set_sd_acceptance_rule(its_address,
2409             its_range, port_type_e::PT_UNKNOWN,
2410             _path, _remote.is_reliable_, _enable, true);
2411 
2412     if (_enable && routing_) {
2413         const auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2414         rm_impl->sd_acceptance_enabled(its_address, its_range,
2415                 _remote.is_reliable_);
2416     }
2417 }
2418 
set_sd_acceptance_required(const sd_acceptance_map_type_t & _remotes,bool _enable)2419 void application_impl::set_sd_acceptance_required(
2420         const sd_acceptance_map_type_t& _remotes, bool _enable) {
2421 
2422     (void)_remotes;
2423     (void)_enable;
2424 
2425 #if 0
2426     if (!is_routing()) {
2427         return;
2428     }
2429 
2430     configuration::sd_acceptance_rules_t its_rules;
2431     for (const auto& remote_info : _remotes) {
2432         const boost::asio::ip::address its_address(remote_info.first.ip_.is_v4_ ?
2433                 static_cast<boost::asio::ip::address>(boost::asio::ip::address_v4(
2434                         remote_info.first.ip_.address_.v4_)) :
2435                 static_cast<boost::asio::ip::address>(boost::asio::ip::address_v6(
2436                         remote_info.first.ip_.address_.v6_)));
2437         const boost::icl::interval<std::uint16_t>::interval_type its_interval =
2438                 remote_info.first.is_range_ ?
2439                     boost::icl::interval<std::uint16_t>::closed(
2440                             remote_info.first.first_,
2441                             ((remote_info.first.last_ == ANY_PORT) ?
2442                                     std::numeric_limits<std::uint16_t>::max() :
2443                                     remote_info.first.last_)) :
2444                     boost::icl::interval<std::uint16_t>::closed(
2445                             remote_info.first.first_, remote_info.first.first_);
2446 
2447         const bool its_reliability = remote_info.first.is_reliable_;
2448 
2449         const auto found_address = its_rules.find(its_address);
2450         if (found_address != its_rules.end()) {
2451             const auto found_reliability = found_address->second.second.find(
2452                     its_reliability);
2453             if (found_reliability != found_address->second.second.end()) {
2454                 found_reliability->second.insert(its_interval);
2455             } else {
2456                 found_address->second.second.emplace(std::make_pair(
2457                         its_reliability,
2458                         boost::icl::interval_set<std::uint16_t>(its_interval)));
2459             }
2460         } else {
2461             its_rules.insert(std::make_pair(its_address,
2462                    std::make_pair(remote_info.second,
2463                            std::map<bool, boost::icl::interval_set<std::uint16_t>>(
2464                                   {{ its_reliability,
2465                                       boost::icl::interval_set<std::uint16_t>(
2466                                               its_interval) } }))));
2467         }
2468     }
2469 
2470     configuration_->set_sd_acceptance_rules(its_rules, _enable);
2471 #endif
2472 }
2473 
2474 application::sd_acceptance_map_type_t
get_sd_acceptance_required()2475 application_impl::get_sd_acceptance_required() {
2476 
2477     sd_acceptance_map_type_t its_ret;
2478 
2479     if (is_routing()) {
2480         for (const auto& e : configuration_->get_sd_acceptance_rules()) {
2481             remote_info_t its_remote_info;
2482             its_remote_info.ip_.is_v4_ = e.first.is_v4();
2483             if (its_remote_info.ip_.is_v4_) {
2484                 its_remote_info.ip_.address_.v4_ = e.first.to_v4().to_bytes();
2485             } else {
2486                 its_remote_info.ip_.address_.v6_ = e.first.to_v6().to_bytes();
2487             }
2488             for (const auto& reliability : e.second.second) {
2489                 its_remote_info.is_reliable_ = reliability.first;
2490                 for (const auto& port_range : reliability.second.first) {
2491                     if (port_range.lower() == port_range.upper()) {
2492                         its_remote_info.first_ = port_range.lower();
2493                         its_remote_info.last_ = port_range.lower();
2494                         its_remote_info.is_range_ = false;
2495                     } else {
2496                         its_remote_info.first_ = port_range.lower();
2497                         its_remote_info.last_ = port_range.upper();
2498                         its_remote_info.is_range_ = true;
2499                     }
2500                     its_ret[its_remote_info] = e.second.first;
2501                 }
2502                 for (const auto& port_range : reliability.second.second) {
2503                     if (port_range.lower() == port_range.upper()) {
2504                         its_remote_info.first_ = port_range.lower();
2505                         its_remote_info.last_ = port_range.lower();
2506                         its_remote_info.is_range_ = false;
2507                     } else {
2508                         its_remote_info.first_ = port_range.lower();
2509                         its_remote_info.last_ = port_range.upper();
2510                         its_remote_info.is_range_ = true;
2511                     }
2512                     its_ret[its_remote_info] = e.second.first;
2513                 }
2514             }
2515         }
2516     }
2517 
2518     return its_ret;
2519 }
2520 
register_routing_ready_handler(routing_ready_handler_t _handler)2521 void application_impl::register_routing_ready_handler(
2522         routing_ready_handler_t _handler) {
2523     if (is_routing() && routing_) {
2524         const auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2525         rm_impl->register_routing_ready_handler(_handler);
2526     }
2527 }
2528 
register_routing_state_handler(routing_state_handler_t _handler)2529 void application_impl::register_routing_state_handler(
2530         routing_state_handler_t _handler) {
2531     if (is_routing() && routing_) {
2532         const auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2533         rm_impl->register_routing_state_handler(_handler);
2534     }
2535 }
2536 
update_service_configuration(service_t _service,instance_t _instance,std::uint16_t _port,bool _reliable,bool _magic_cookies_enabled,bool _offer)2537 bool application_impl::update_service_configuration(service_t _service,
2538                                                     instance_t _instance,
2539                                                     std::uint16_t _port,
2540                                                     bool _reliable,
2541                                                     bool _magic_cookies_enabled,
2542                                                     bool _offer) {
2543     bool ret = false;
2544     if (!is_routing_manager_host_) {
2545         VSOMEIP_ERROR << __func__ << " is only intended to be called by "
2546                 "application acting as routing manager host";
2547     } else if (!routing_) {
2548         VSOMEIP_ERROR << __func__ << " routing is zero";
2549     } else {
2550         auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2551         if (rm_impl) {
2552             if (_offer) {
2553                 ret = rm_impl->offer_service_remotely(_service, _instance,
2554                         _port, _reliable, _magic_cookies_enabled);
2555             } else {
2556                 ret = rm_impl->stop_offer_service_remotely(_service, _instance,
2557                         _port, _reliable, _magic_cookies_enabled);
2558             }
2559         }
2560     }
2561     return ret;
2562 }
2563 
update_security_policy_configuration(uint32_t _uid,uint32_t _gid,::std::shared_ptr<policy> _policy,std::shared_ptr<payload> _payload,security_update_handler_t _handler)2564 void application_impl::update_security_policy_configuration(uint32_t _uid,
2565                                                   uint32_t _gid,
2566                                                   ::std::shared_ptr<policy> _policy,
2567                                                   std::shared_ptr<payload> _payload,
2568                                                   security_update_handler_t _handler) {
2569     if (!is_routing()) {
2570         VSOMEIP_ERROR << __func__ << " is only intended to be called by "
2571                 "application acting as routing manager host";
2572     } else if (!routing_) {
2573         VSOMEIP_ERROR << __func__ << " routing is zero";
2574     } else {
2575         auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2576         if (rm_impl) {
2577             rm_impl->update_security_policy_configuration(_uid, _gid, _policy, _payload, _handler);
2578         }
2579     }
2580 }
2581 
remove_security_policy_configuration(uint32_t _uid,uint32_t _gid,security_update_handler_t _handler)2582 void application_impl::remove_security_policy_configuration(uint32_t _uid,
2583                                                   uint32_t _gid,
2584                                                   security_update_handler_t _handler) {
2585     if (!is_routing()) {
2586         VSOMEIP_ERROR << __func__ << " is only intended to be called by "
2587                 "application acting as routing manager host";
2588     } else if (!routing_) {
2589         VSOMEIP_ERROR << __func__ << " routing is zero";
2590     } else {
2591         auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2592         if (rm_impl) {
2593             rm_impl->remove_security_policy_configuration(_uid, _gid, _handler);
2594         }
2595     }
2596 }
2597 
2598 } // namespace vsomeip_v3
2599