1 // Copyright (C) 2014-2018 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <future>
7 #include <thread>
8 #include <iomanip>
9 #include <iostream>
10
11 #include <boost/exception/diagnostic_information.hpp>
12
13 #ifndef _WIN32
14 #include <dlfcn.h>
15 #include <sys/syscall.h>
16 #endif
17
18 #include <vsomeip/defines.hpp>
19 #include <vsomeip/runtime.hpp>
20 #include <vsomeip/plugins/application_plugin.hpp>
21 #include <vsomeip/plugins/pre_configuration_plugin.hpp>
22 #include <vsomeip/internal/logger.hpp>
23
24 #include "../include/application_impl.hpp"
25 #ifdef VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
26 #include "../../configuration/include/configuration_impl.hpp"
27 #else
28 #include "../../configuration/include/configuration.hpp"
29 #include "../../configuration/include/configuration_plugin.hpp"
30 #endif // VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
31 #include "../../message/include/serializer.hpp"
32 #include "../../routing/include/routing_manager_impl.hpp"
33 #include "../../routing/include/routing_manager_proxy.hpp"
34 #include "../../utility/include/utility.hpp"
35 #include "../../tracing/include/connector_impl.hpp"
36 #include "../../plugin/include/plugin_manager_impl.hpp"
37 #include "../../endpoints/include/endpoint.hpp"
38 #include "../../security/include/security.hpp"
39
40 namespace vsomeip_v3 {
41
42 #ifdef ANDROID
~configuration()43 configuration::~configuration() {}
44 #endif
45
46 uint32_t application_impl::app_counter__ = 0;
47 std::mutex application_impl::app_counter_mutex__;
48
application_impl(const std::string & _name)49 application_impl::application_impl(const std::string &_name)
50 : runtime_(runtime::get()),
51 client_(VSOMEIP_CLIENT_UNSET),
52 session_(0),
53 is_initialized_(false), name_(_name),
54 work_(std::make_shared<boost::asio::io_service::work>(io_)),
55 routing_(0),
56 state_(state_type_e::ST_DEREGISTERED),
57 security_mode_(security_mode_e::SM_OFF),
58 #ifdef VSOMEIP_ENABLE_SIGNAL_HANDLING
59 signals_(io_, SIGINT, SIGTERM),
60 catched_signal_(false),
61 #endif
62 is_dispatching_(false),
63 max_dispatchers_(VSOMEIP_MAX_DISPATCHERS),
64 max_dispatch_time_(VSOMEIP_MAX_DISPATCH_TIME),
65 stopped_(false),
66 block_stopping_(false),
67 is_routing_manager_host_(false),
68 stopped_called_(false),
69 watchdog_timer_(io_),
70 client_side_logging_(false)
71 #ifdef VSOMEIP_HAS_SESSION_HANDLING_CONFIG
72 , has_session_handling_(true)
73 #endif // VSOMEIP_HAS_SESSION_HANDLING_CONFIG
74 {
75 own_uid_ = ANY_UID;
76 own_gid_ = ANY_GID;
77 #ifndef _WIN32
78 own_uid_ = getuid();
79 own_gid_ = getgid();
80 #endif
81 }
82
~application_impl()83 application_impl::~application_impl() {
84 runtime_->remove_application(name_);
85 try {
86 if (stop_thread_.joinable()) {
87 stop_thread_.detach();
88 }
89 } catch (const std::exception& e) {
90 std::cerr << __func__ << " catched exception (shutdown): " << e.what() << std::endl;
91 }
92
93 try {
94 std::lock_guard<std::mutex> its_lock_start_stop(start_stop_mutex_);
95 for (const auto& t : io_threads_) {
96 if (t->joinable()) {
97 t->detach();
98 }
99 }
100 io_threads_.clear();
101 } catch (const std::exception& e) {
102 std::cerr << __func__ << " catched exception (io threads): " << e.what() << std::endl;
103 }
104
105 try {
106 std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
107 for (const auto& its_dispatcher : dispatchers_) {
108 if (its_dispatcher.second->joinable()) {
109 its_dispatcher.second->detach();
110 }
111 }
112 dispatchers_.clear();
113 } catch (const std::exception& e) {
114 std::cerr << __func__ << " catched exception (dispatchers): " << e.what() << std::endl;
115 }
116 }
117
init()118 bool application_impl::init() {
119 if(is_initialized_) {
120 VSOMEIP_WARNING << "Trying to initialize an already initialized application.";
121 return true;
122 }
123 // Application name
124 if (name_ == "") {
125 const char *its_name = getenv(VSOMEIP_ENV_APPLICATION_NAME);
126 if (nullptr != its_name) {
127 name_ = its_name;
128 }
129 }
130
131 std::string configuration_path;
132
133 // load configuration from module
134 std::string config_module = "";
135 const char *its_config_module = getenv(VSOMEIP_ENV_CONFIGURATION_MODULE);
136 if (nullptr != its_config_module) {
137 // TODO: Add loading of custom configuration module
138 } else { // load default module
139 #ifndef VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
140 auto its_plugin = plugin_manager::get()->get_plugin(
141 plugin_type_e::CONFIGURATION_PLUGIN, VSOMEIP_CFG_LIBRARY);
142 if (its_plugin) {
143 auto its_configuration_plugin
144 = std::dynamic_pointer_cast<configuration_plugin>(its_plugin);
145 if (its_configuration_plugin) {
146 configuration_ = its_configuration_plugin->get_configuration(name_);
147 VSOMEIP_INFO << "Configuration module loaded.";
148 } else {
149 std::cerr << "Invalid configuration module!" << std::endl;
150 std::exit(EXIT_FAILURE);
151 }
152 } else {
153 std::cerr << "Configuration module could not be loaded!" << std::endl;
154 std::exit(EXIT_FAILURE);
155 }
156 #else
157 configuration_ = std::dynamic_pointer_cast<configuration>(
158 std::make_shared<vsomeip_v3::cfg::configuration_impl>());
159 if (configuration_path.length()) {
160 configuration_->set_configuration_path(configuration_path);
161 }
162 configuration_->load(name_);
163 #endif // VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
164 }
165
166 // Set security mode
167 auto its_security = security::get();
168 if (its_security->is_enabled()) {
169 if (its_security->is_audit()) {
170 security_mode_ = security_mode_e::SM_AUDIT;
171 } else {
172 security_mode_ = security_mode_e::SM_ON;
173 }
174 } else {
175 security_mode_ = security_mode_e::SM_OFF;
176 }
177
178 const char *client_side_logging = getenv(VSOMEIP_ENV_CLIENTSIDELOGGING);
179 if (client_side_logging != nullptr) {
180 client_side_logging_ = true;
181 VSOMEIP_INFO << "Client side logging for application: " << name_
182 << " is enabled";
183
184 if ('\0' != *client_side_logging) {
185 std::stringstream its_converter(client_side_logging);
186 if ('"' == its_converter.peek()) {
187 its_converter.get(); // skip quote
188 }
189 uint16_t val(0xffffu);
190 bool stop_parsing(false);
191 do {
192 const uint16_t prev_val(val);
193 its_converter >> std::hex >> std::setw(4) >> val;
194 if (its_converter.good()) {
195 const std::stringstream::int_type c = its_converter.eof()?'\0':its_converter.get();
196 switch (c) {
197 case '"':
198 case '.':
199 case ':':
200 case ' ':
201 case '\0': {
202 if ('.' != c) {
203 if (0xffffu == prev_val) {
204 VSOMEIP_INFO << "+filter "
205 << std::hex << std::setw(4) << std::setfill('0') << val;
206 client_side_logging_filter_.insert(std::make_tuple(val, ANY_INSTANCE));
207 } else {
208 VSOMEIP_INFO << "+filter "
209 << std::hex << std::setw(4) << std::setfill('0') << prev_val << "."
210 << std::hex << std::setw(4) << std::setfill('0') << val;
211 client_side_logging_filter_.insert(std::make_tuple(prev_val, val));
212 }
213 val = 0xffffu;
214 }
215 }
216 break;
217 default:
218 stop_parsing = true;
219 break;
220 }
221 }
222 }
223 while (!stop_parsing && its_converter.good());
224 }
225 }
226
227 std::shared_ptr<configuration> its_configuration = get_configuration();
228 if (its_configuration) {
229 VSOMEIP_INFO << "Initializing vsomeip application \"" << name_ << "\".";
230 client_ = its_configuration->get_id(name_);
231
232 // Max dispatchers is the configured maximum number of dispatchers and
233 // the main dispatcher
234 max_dispatchers_ = its_configuration->get_max_dispatchers(name_) + 1;
235 max_dispatch_time_ = its_configuration->get_max_dispatch_time(name_);
236
237 #ifdef VSOMEIP_HAS_SESSION_HANDLING_CONFIG
238 has_session_handling_ = its_configuration->has_session_handling(name_);
239 if (!has_session_handling_)
240 VSOMEIP_INFO << "application: " << name_
241 << " has session handling switched off!";
242 #endif // VSOMEIP_HAS_SESSION_HANDLING_CONFIG
243
244 std::string its_routing_host = its_configuration->get_routing_host();
245 if (its_routing_host != "") {
246 is_routing_manager_host_ = (its_routing_host == name_);
247 if (is_routing_manager_host_ &&
248 !utility::is_routing_manager(configuration_)) {
249 #ifndef VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
250 VSOMEIP_ERROR << "application: " << name_ << " configured as "
251 "routing but other routing manager present. Won't "
252 "instantiate routing";
253 is_routing_manager_host_ = false;
254 return false;
255 #else
256 is_routing_manager_host_ = true;
257 #endif // VSOMEIP_ENABLE_MULTIPLE_ROUTING_MANAGERS
258 }
259 } else {
260 is_routing_manager_host_ = utility::is_routing_manager(configuration_);
261 }
262
263 if (is_routing_manager_host_) {
264 VSOMEIP_INFO << "Instantiating routing manager [Host].";
265 if (client_ == VSOMEIP_CLIENT_UNSET) {
266 client_ = static_cast<client_t>(
267 (configuration_->get_diagnosis_address() << 8)
268 & configuration_->get_diagnosis_mask());
269 utility::request_client_id(configuration_, name_, client_);
270 }
271 routing_ = std::make_shared<routing_manager_impl>(this);
272 } else {
273 VSOMEIP_INFO << "Instantiating routing manager [Proxy].";
274 routing_ = std::make_shared<routing_manager_proxy>(this, client_side_logging_, client_side_logging_filter_);
275 }
276
277 routing_->set_client(client_);
278 routing_->init();
279
280 #ifdef USE_DLT
281 // Tracing
282 std::shared_ptr<trace::connector_impl> its_connector
283 = trace::connector_impl::get();
284 std::shared_ptr<cfg::trace> its_trace_configuration
285 = its_configuration->get_trace();
286 its_connector->configure(its_trace_configuration);
287 #endif
288
289 VSOMEIP_INFO << "Application(" << (name_ != "" ? name_ : "unnamed")
290 << ", " << std::hex << std::setw(4) << std::setfill('0') << client_
291 << ") is initialized ("
292 << std::dec << max_dispatchers_ << ", "
293 << std::dec << max_dispatch_time_ << ").";
294
295 is_initialized_ = true;
296 }
297
298 #ifdef VSOMEIP_ENABLE_SIGNAL_HANDLING
299 if (is_initialized_) {
300 signals_.add(SIGINT);
301 signals_.add(SIGTERM);
302
303 // Register signal handler
304 auto its_signal_handler =
305 [this] (boost::system::error_code const &_error, int _signal) {
306 if (!_error) {
307 switch (_signal) {
308 case SIGTERM:
309 case SIGINT:
310 catched_signal_ = true;
311 stop();
312 break;
313 default:
314 break;
315 }
316 }
317 };
318 signals_.async_wait(its_signal_handler);
319 }
320 #endif
321
322 if (configuration_) {
323 auto its_plugins = configuration_->get_plugins(name_);
324 auto its_app_plugin_info = its_plugins.find(plugin_type_e::APPLICATION_PLUGIN);
325 if (its_app_plugin_info != its_plugins.end()) {
326 for (auto its_library : its_app_plugin_info->second) {
327 auto its_application_plugin = plugin_manager::get()->get_plugin(
328 plugin_type_e::APPLICATION_PLUGIN, its_library);
329 if (its_application_plugin) {
330 VSOMEIP_INFO << "Client 0x" << std::hex << get_client()
331 << " Loading plug-in library: " << its_library << " succeeded!";
332 std::dynamic_pointer_cast<application_plugin>(its_application_plugin)->
333 on_application_state_change(name_, application_plugin_state_e::STATE_INITIALIZED);
334 }
335 }
336 }
337 } else {
338 std::cerr << "Configuration module could not be loaded!" << std::endl;
339 std::exit(EXIT_FAILURE);
340 }
341
342 return is_initialized_;
343 }
344
start()345 void application_impl::start() {
346 #ifndef _WIN32
347 if (getpid() != static_cast<pid_t>(syscall(SYS_gettid))) {
348 // only set threadname if calling thread isn't the main thread
349 std::stringstream s;
350 s << std::hex << std::setw(4) << std::setfill('0') << client_
351 << "_io" << std::setw(2) << std::setfill('0') << 0;
352 pthread_setname_np(pthread_self(),s.str().c_str());
353 }
354 #endif
355 if (!is_initialized_) {
356 VSOMEIP_ERROR << "Trying to start an unintialized application.";
357 return;
358 }
359
360 const size_t io_thread_count = configuration_->get_io_thread_count(name_);
361 const int io_thread_nice_level = configuration_->get_io_thread_nice_level(name_);
362 {
363 std::lock_guard<std::mutex> its_lock(start_stop_mutex_);
364 if (io_.stopped()) {
365 io_.reset();
366 } else if(stop_thread_.joinable()) {
367 VSOMEIP_ERROR << "Trying to start an already started application.";
368 return;
369 }
370 if (stopped_) {
371 {
372 std::lock_guard<std::mutex> its_lock_start_stop(block_stop_mutex_);
373 block_stopping_ = true;
374 block_stop_cv_.notify_all();
375 }
376
377 stopped_ = false;
378 return;
379 }
380 stopped_ = false;
381 stopped_called_ = false;
382 VSOMEIP_INFO << "Starting vsomeip application \"" << name_ << "\" ("
383 << std::hex << std::setw(4) << std::setfill('0') << client_
384 << ") using " << std::dec << io_thread_count << " threads"
385 #ifndef _WIN32
386 << " I/O nice " << io_thread_nice_level
387 #endif
388 ;
389
390 start_caller_id_ = std::this_thread::get_id();
391 {
392 std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
393 is_dispatching_ = true;
394 auto its_main_dispatcher = std::make_shared<std::thread>(
395 std::bind(&application_impl::main_dispatch, shared_from_this()));
396 dispatchers_[its_main_dispatcher->get_id()] = its_main_dispatcher;
397 }
398
399 if (stop_thread_.joinable()) {
400 stop_thread_.join();
401 }
402 stop_thread_= std::thread(&application_impl::shutdown, shared_from_this());
403
404 if (routing_)
405 routing_->start();
406
407 for (size_t i = 0; i < io_thread_count - 1; i++) {
408 std::shared_ptr<std::thread> its_thread
409 = std::make_shared<std::thread>([this, i, io_thread_nice_level] {
410 VSOMEIP_INFO << "io thread id from application: "
411 << std::hex << std::setw(4) << std::setfill('0')
412 << client_ << " (" << name_ << ") is: " << std::hex
413 << std::this_thread::get_id()
414 #ifndef _WIN32
415 << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
416 #endif
417 ;
418 #ifndef _WIN32
419 {
420 std::stringstream s;
421 s << std::hex << std::setw(4) << std::setfill('0')
422 << client_ << "_io" << std::setw(2)
423 << std::setfill('0') << i+1;
424 pthread_setname_np(pthread_self(),s.str().c_str());
425 }
426 if ((VSOMEIP_IO_THREAD_NICE_LEVEL != io_thread_nice_level) && (io_thread_nice_level != nice(io_thread_nice_level))) {
427 VSOMEIP_WARNING << "nice(" << io_thread_nice_level << ") failed " << errno << " for " << std::this_thread::get_id();
428 }
429 #endif
430 try {
431 io_.run();
432 } catch (const std::exception &e) {
433 VSOMEIP_ERROR << "application_impl::start() "
434 "catched exception: " << e.what();
435 }
436 });
437 io_threads_.insert(its_thread);
438 }
439 }
440
441 auto its_plugins = configuration_->get_plugins(name_);
442 auto its_app_plugin_info = its_plugins.find(plugin_type_e::APPLICATION_PLUGIN);
443 if (its_app_plugin_info != its_plugins.end()) {
444 for (const auto& its_library : its_app_plugin_info->second) {
445 auto its_application_plugin = plugin_manager::get()->get_plugin(
446 plugin_type_e::APPLICATION_PLUGIN, its_library);
447 if (its_application_plugin) {
448 std::dynamic_pointer_cast<application_plugin>(its_application_plugin)->
449 on_application_state_change(name_, application_plugin_state_e::STATE_STARTED);
450 }
451 }
452
453 }
454
455 app_counter_mutex__.lock();
456 app_counter__++;
457 app_counter_mutex__.unlock();
458 VSOMEIP_INFO << "io thread id from application: "
459 << std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
460 << name_ << ") is: " << std::hex << std::this_thread::get_id()
461 #ifndef _WIN32
462 << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
463 #endif
464 ;
465 #ifndef _WIN32
466 if ((VSOMEIP_IO_THREAD_NICE_LEVEL != io_thread_nice_level) && (io_thread_nice_level != nice(io_thread_nice_level))) {
467 VSOMEIP_WARNING << "nice(" << io_thread_nice_level << ") failed " << errno << " for " << std::this_thread::get_id();
468 }
469 #endif
470 try {
471 io_.run();
472
473 if (stop_thread_.joinable()) {
474 stop_thread_.join();
475 }
476
477 } catch (const std::exception &e) {
478 VSOMEIP_ERROR << "application_impl::start() catched exception: " << e.what();
479 }
480
481 {
482 std::lock_guard<std::mutex> its_lock_start_stop(block_stop_mutex_);
483 block_stopping_ = true;
484 block_stop_cv_.notify_all();
485 }
486
487 {
488 std::lock_guard<std::mutex> its_lock(start_stop_mutex_);
489 stopped_ = false;
490 }
491
492 app_counter_mutex__.lock();
493 app_counter__--;
494
495 #ifdef VSOMEIP_ENABLE_SIGNAL_HANDLING
496 if (catched_signal_ && !app_counter__) {
497 app_counter_mutex__.unlock();
498 VSOMEIP_INFO << "Exiting vsomeip application...";
499 exit(0);
500 }
501 #endif
502 app_counter_mutex__.unlock();
503 }
504
stop()505 void application_impl::stop() {
506
507 VSOMEIP_INFO << "Stopping vsomeip application \"" << name_ << "\" ("
508 << std::hex << std::setw(4) << std::setfill('0') << client_ << ").";
509
510 bool block = true;
511 {
512 std::lock_guard<std::mutex> its_lock_start_stop(start_stop_mutex_);
513 if (stopped_ || stopped_called_) {
514 return;
515 }
516 stop_caller_id_ = std::this_thread::get_id();
517 stopped_ = true;
518 stopped_called_ = true;
519 for (const auto& thread : io_threads_) {
520 if (thread->get_id() == std::this_thread::get_id()) {
521 block = false;
522 }
523 }
524 if (start_caller_id_ == stop_caller_id_) {
525 block = false;
526 }
527 }
528 auto its_plugins = configuration_->get_plugins(name_);
529 auto its_app_plugin_info = its_plugins.find(plugin_type_e::APPLICATION_PLUGIN);
530 if (its_app_plugin_info != its_plugins.end()) {
531 for (const auto& its_library : its_app_plugin_info->second) {
532 auto its_application_plugin = plugin_manager::get()->get_plugin(
533 plugin_type_e::APPLICATION_PLUGIN, its_library);
534 if (its_application_plugin) {
535 std::dynamic_pointer_cast<application_plugin>(its_application_plugin)->
536 on_application_state_change(name_, application_plugin_state_e::STATE_STOPPED);
537 }
538 }
539
540 }
541
542 {
543 std::lock_guard<std::mutex> its_lock_start_stop(start_stop_mutex_);
544 stop_cv_.notify_one();
545 }
546
547 if (block) {
548 std::unique_lock<std::mutex> block_stop_lock(block_stop_mutex_);
549 while (!block_stopping_) {
550 block_stop_cv_.wait(block_stop_lock);
551 }
552 block_stopping_ = false;
553 }
554 }
555
process(int _number)556 void application_impl::process(int _number) {
557 (void)_number;
558 VSOMEIP_ERROR << "application::process is not (yet) implemented.";
559 }
560
get_security_mode() const561 security_mode_e application_impl::get_security_mode() const {
562 return security_mode_;
563 }
564
offer_service(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)565 void application_impl::offer_service(service_t _service, instance_t _instance,
566 major_version_t _major, minor_version_t _minor) {
567 if (routing_)
568 routing_->offer_service(client_, _service, _instance, _major, _minor);
569 }
570
stop_offer_service(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)571 void application_impl::stop_offer_service(service_t _service, instance_t _instance,
572 major_version_t _major, minor_version_t _minor) {
573 if (routing_)
574 routing_->stop_offer_service(client_, _service, _instance, _major, _minor);
575 }
576
request_service(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)577 void application_impl::request_service(service_t _service, instance_t _instance,
578 major_version_t _major, minor_version_t _minor) {
579 if (routing_)
580 routing_->request_service(client_, _service, _instance, _major, _minor);
581 }
582
release_service(service_t _service,instance_t _instance)583 void application_impl::release_service(service_t _service,
584 instance_t _instance) {
585 if (routing_)
586 routing_->release_service(client_, _service, _instance);
587 }
588
subscribe(service_t _service,instance_t _instance,eventgroup_t _eventgroup,major_version_t _major,event_t _event)589 void application_impl::subscribe(service_t _service, instance_t _instance,
590 eventgroup_t _eventgroup,
591 major_version_t _major,
592 event_t _event) {
593 if (routing_) {
594 bool send_back_cached(false);
595 bool send_back_cached_group(false);
596 check_send_back_cached_event(_service, _instance, _event, _eventgroup,
597 &send_back_cached, &send_back_cached_group);
598
599 if (send_back_cached) {
600 send_back_cached_event(_service, _instance, _event);
601 } else if(send_back_cached_group) {
602 send_back_cached_eventgroup(_service, _instance, _eventgroup);
603 }
604
605 if (check_subscription_state(_service, _instance, _eventgroup, _event)) {
606 routing_->subscribe(client_, own_uid_, own_gid_, _service, _instance, _eventgroup, _major,
607 _event);
608 }
609 }
610 }
611
unsubscribe(service_t _service,instance_t _instance,eventgroup_t _eventgroup)612 void application_impl::unsubscribe(service_t _service, instance_t _instance,
613 eventgroup_t _eventgroup) {
614 remove_subscription(_service, _instance, _eventgroup, ANY_EVENT);
615 if (routing_)
616 routing_->unsubscribe(client_, own_uid_, own_gid_, _service, _instance, _eventgroup, ANY_EVENT);
617 }
618
unsubscribe(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)619 void application_impl::unsubscribe(service_t _service, instance_t _instance,
620 eventgroup_t _eventgroup, event_t _event) {
621 remove_subscription(_service, _instance, _eventgroup, _event);
622 if (routing_)
623 routing_->unsubscribe(client_, own_uid_, own_gid_, _service, _instance, _eventgroup, _event);
624 }
625
is_available(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor) const626 bool application_impl::is_available(
627 service_t _service, instance_t _instance,
628 major_version_t _major, minor_version_t _minor) const {
629 std::lock_guard<std::recursive_mutex> its_lock(availability_mutex_);
630 return is_available_unlocked(_service, _instance, _major, _minor);
631 }
632
is_available_unlocked(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor) const633 bool application_impl::is_available_unlocked(
634 service_t _service, instance_t _instance,
635 major_version_t _major, minor_version_t _minor) const {
636
637 bool is_available(false);
638
639 auto check_major_minor = [&](const std::map<instance_t,
640 std::map<major_version_t,
641 minor_version_t >>::const_iterator &_found_instance) {
642 auto found_major = _found_instance->second.find(_major);
643 if (found_major != _found_instance->second.end()) {
644 if (_minor <= found_major->second || _minor == ANY_MINOR
645 || _minor == DEFAULT_MINOR) {
646 is_available = true;
647 }
648 } else if ((_major == DEFAULT_MAJOR || _major == ANY_MAJOR)) {
649 for (const auto &found_major : _found_instance->second) {
650 if (_minor == DEFAULT_MINOR || _minor == ANY_MINOR) {
651 is_available = true;
652 break;
653 } else if (_minor <= found_major.second) {
654 is_available = true;
655 break;
656 }
657 }
658 }
659 };
660 auto found_service = available_.find(_service);
661 if (found_service != available_.end()) {
662 auto found_instance = found_service->second.find(_instance);
663 if (found_instance != found_service->second.end()) {
664 check_major_minor(found_instance);
665 } else if (_instance == ANY_INSTANCE) {
666 for (auto it = found_service->second.cbegin();
667 it != found_service->second.cend(); it++) {
668 check_major_minor(it);
669 if (is_available) {
670 break;
671 }
672 }
673 }
674 } else if (_service == ANY_SERVICE) {
675 for (const auto &found_service : available_) {
676 auto found_instance = found_service.second.find(_instance);
677 if (found_instance != found_service.second.end()) {
678 check_major_minor(found_instance);
679 if (is_available) {
680 break;
681 }
682 } else if (_instance == ANY_INSTANCE) {
683 for (auto it = found_service.second.cbegin();
684 it != found_service.second.cend(); it++) {
685 check_major_minor(it);
686 if (is_available) {
687 break;
688 }
689 }
690 }
691 if (is_available) {
692 break;
693 }
694 }
695 }
696 return is_available;
697 }
698
are_available(available_t & _available,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor) const699 bool application_impl::are_available(
700 available_t &_available,
701 service_t _service, instance_t _instance,
702 major_version_t _major, minor_version_t _minor) const {
703 std::lock_guard<std::recursive_mutex> its_lock(availability_mutex_);
704 return are_available_unlocked(_available, _service, _instance, _major, _minor);
705 }
706
are_available_unlocked(available_t & _available,service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor) const707 bool application_impl::are_available_unlocked(available_t &_available,
708 service_t _service, instance_t _instance,
709 major_version_t _major, minor_version_t _minor) const {
710
711 //find available services
712 if(_service == ANY_SERVICE) {
713 //add all available services
714 for(auto its_available_services_it = available_.begin();
715 its_available_services_it != available_.end();
716 ++its_available_services_it)
717 _available[its_available_services_it->first];
718 } else {
719 // check if specific service is available
720 if(available_.find(_service) != available_.end())
721 _available[_service];
722 }
723
724 //find available instances
725 //iterate through found available services
726 for(auto its_available_services_it = _available.begin();
727 its_available_services_it != _available.end();
728 ++its_available_services_it) {
729 //get available service
730 auto found_available_service = available_.find(its_available_services_it->first);
731 if (found_available_service != available_.end()) {
732 if(_instance == ANY_INSTANCE) {
733 //add all available instances
734 for(auto its_available_instances_it = found_available_service->second.begin();
735 its_available_instances_it != found_available_service->second.end();
736 ++its_available_instances_it)
737 _available[its_available_services_it->first][its_available_instances_it->first];
738 } else {
739 if(found_available_service->second.find(_instance) != found_available_service->second.end())
740 _available[its_available_services_it->first][_instance];
741 }
742 }
743 }
744
745 //find major versions
746 //iterate through found available services
747 for(auto its_available_services_it = _available.begin();
748 its_available_services_it != _available.end();
749 ++its_available_services_it) {
750 //get available service
751 auto found_available_service = available_.find(its_available_services_it->first);
752 if (found_available_service != available_.end()) {
753 //iterate through found available instances
754 for(auto its_available_instances_it = found_available_service->second.begin();
755 its_available_instances_it != found_available_service->second.end();
756 ++its_available_instances_it) {
757 //get available instance
758 auto found_available_instance = found_available_service->second.find(its_available_instances_it->first);
759 if(found_available_instance != found_available_service->second.end()) {
760 if(_major == ANY_MAJOR || _major == DEFAULT_MAJOR) {
761 //add all major versions
762 for(auto its_available_major_it = found_available_instance->second.begin();
763 its_available_major_it != found_available_instance->second.end();
764 ++its_available_major_it)
765 _available[its_available_services_it->first][its_available_instances_it->first][its_available_major_it->first];
766 } else {
767 if(found_available_instance->second.find(_major) != found_available_instance->second.end())
768 _available[its_available_services_it->first][its_available_instances_it->first][_major];
769 }
770 }
771 }
772 }
773 }
774
775 //find minor
776 //iterate through found available services
777 auto its_available_services_it = _available.begin();
778 while(its_available_services_it != _available.end()) {
779 bool found_minor(false);
780 //get available service
781 auto found_available_service = available_.find(its_available_services_it->first);
782 if (found_available_service != available_.end()) {
783 //iterate through found available instances
784 for(auto its_available_instances_it = found_available_service->second.begin();
785 its_available_instances_it != found_available_service->second.end();
786 ++its_available_instances_it) {
787 //get available instance
788 auto found_available_instance = found_available_service->second.find(its_available_instances_it->first);
789 if(found_available_instance != found_available_service->second.end()) {
790 //iterate through found available major version
791 for(auto its_available_major_it = found_available_instance->second.begin();
792 its_available_major_it != found_available_instance->second.end();
793 ++its_available_major_it) {
794 //get available major version
795 auto found_available_major = found_available_instance->second.find(its_available_major_it->first);
796 if(found_available_major != found_available_instance->second.end()) {
797 if(_minor == ANY_MINOR || _minor == DEFAULT_MINOR
798 || _minor <= found_available_major->second) {
799 //add minor version
800 _available[its_available_services_it->first][its_available_instances_it->first][its_available_major_it->first] = found_available_major->second;
801 found_minor = true;
802 }
803 }
804 }
805 }
806 }
807 }
808 if(found_minor)
809 ++its_available_services_it;
810 else
811 its_available_services_it = _available.erase(its_available_services_it);
812 }
813
814 if(_available.empty()) {
815 _available[_service][_instance][_major] = _minor ;
816 return false;
817 }
818 return true;
819 }
820
send(std::shared_ptr<message> _message)821 void application_impl::send(std::shared_ptr<message> _message) {
822 bool is_request = utility::is_request(_message);
823 if (client_side_logging_
824 && (client_side_logging_filter_.empty()
825 || (1 == client_side_logging_filter_.count(std::make_tuple(_message->get_service(), ANY_INSTANCE)))
826 || (1 == client_side_logging_filter_.count(std::make_tuple(_message->get_service(), _message->get_instance()))))) {
827 VSOMEIP_INFO << "application_impl::send: ("
828 << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
829 << std::hex << std::setw(4) << std::setfill('0') << _message->get_service() << "."
830 << std::hex << std::setw(4) << std::setfill('0') << _message->get_instance() << "."
831 << std::hex << std::setw(4) << std::setfill('0') << _message->get_method() << ":"
832 << std::hex << std::setw(4) << std::setfill('0')
833 << ((is_request) ? session_ : _message->get_session()) << ":"
834 << std::hex << std::setw(4) << std::setfill('0')
835 << ((is_request) ? client_.load() : _message->get_client()) << "] "
836 << "type=" << std::hex << static_cast<std::uint32_t>(_message->get_message_type())
837 << " thread=" << std::hex << std::this_thread::get_id();
838 }
839 if (routing_) {
840 // in case of requests set the request-id (client-id|session-id)
841 if (is_request) {
842 _message->set_client(client_);
843 _message->set_session(get_session());
844 }
845 // Always increment the session-id
846 (void)routing_->send(client_, _message);
847 }
848 }
849
notify(service_t _service,instance_t _instance,event_t _event,std::shared_ptr<payload> _payload,bool _force) const850 void application_impl::notify(service_t _service, instance_t _instance,
851 event_t _event, std::shared_ptr<payload> _payload, bool _force) const {
852 if (routing_)
853 routing_->notify(_service, _instance, _event, _payload, _force);
854 }
855
notify_one(service_t _service,instance_t _instance,event_t _event,std::shared_ptr<payload> _payload,client_t _client,bool _force) const856 void application_impl::notify_one(service_t _service, instance_t _instance,
857 event_t _event, std::shared_ptr<payload> _payload,
858 client_t _client, bool _force) const {
859 if (routing_) {
860 routing_->notify_one(_service, _instance, _event, _payload, _client,
861 _force
862 #ifdef VSOMEIP_ENABLE_COMPAT
863 , false
864 #endif
865 );
866 }
867 }
868
register_state_handler(state_handler_t _handler)869 void application_impl::register_state_handler(state_handler_t _handler) {
870 std::lock_guard<std::mutex> its_lock(state_handler_mutex_);
871 handler_ = _handler;
872 }
873
unregister_state_handler()874 void application_impl::unregister_state_handler() {
875 std::lock_guard<std::mutex> its_lock(state_handler_mutex_);
876 handler_ = nullptr;
877 }
878
register_availability_handler(service_t _service,instance_t _instance,availability_handler_t _handler,major_version_t _major,minor_version_t _minor)879 void application_impl::register_availability_handler(service_t _service,
880 instance_t _instance, availability_handler_t _handler,
881 major_version_t _major, minor_version_t _minor) {
882 std::lock_guard<std::recursive_mutex> availability_lock(availability_mutex_);
883 if (state_ == state_type_e::ST_REGISTERED) {
884 do_register_availability_handler(_service, _instance,
885 _handler, _major, _minor);
886 } else {
887 availability_[_service][_instance][_major][_minor] = std::make_pair(
888 _handler, false);
889 }
890 }
891
do_register_availability_handler(service_t _service,instance_t _instance,availability_handler_t _handler,major_version_t _major,minor_version_t _minor)892 void application_impl::do_register_availability_handler(service_t _service,
893 instance_t _instance, availability_handler_t _handler,
894 major_version_t _major, minor_version_t _minor) {
895 available_t available;
896 bool are_available = are_available_unlocked(available, _service, _instance, _major, _minor);
897 availability_[_service][_instance][_major][_minor] = std::make_pair(
898 _handler, true);
899
900 std::lock_guard<std::mutex> handlers_lock(handlers_mutex_);
901
902 std::shared_ptr<sync_handler> its_sync_handler
903 = std::make_shared<sync_handler>([_handler, are_available, available]() {
904 for(const auto& available_services_it : available)
905 for(const auto& available_instances_it : available_services_it.second)
906 _handler(available_services_it.first, available_instances_it.first, are_available);
907 });
908 its_sync_handler->handler_type_ = handler_type_e::AVAILABILITY;
909 its_sync_handler->service_id_ = _service;
910 its_sync_handler->instance_id_ = _instance;
911 handlers_.push_back(its_sync_handler);
912
913 dispatcher_condition_.notify_one();
914 }
915
unregister_availability_handler(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor)916 void application_impl::unregister_availability_handler(service_t _service,
917 instance_t _instance, major_version_t _major, minor_version_t _minor) {
918 std::lock_guard<std::recursive_mutex> its_lock(availability_mutex_);
919 auto found_service = availability_.find(_service);
920 if (found_service != availability_.end()) {
921 auto found_instance = found_service->second.find(_instance);
922 if (found_instance != found_service->second.end()) {
923 auto found_major = found_instance->second.find(_major);
924 if (found_major != found_instance->second.end()) {
925 auto found_minor = found_major->second.find(_minor);
926 if (found_minor != found_major->second.end()) {
927 found_major->second.erase(_minor);
928
929 if (!found_major->second.size()) {
930 found_instance->second.erase(_major);
931 if (!found_instance->second.size()) {
932 found_service->second.erase(_instance);
933 if (!found_service->second.size()) {
934 availability_.erase(_service);
935 }
936 }
937 }
938 }
939 }
940 }
941 }
942 }
943
on_subscription(service_t _service,instance_t _instance,eventgroup_t _eventgroup,client_t _client,uid_t _uid,gid_t _gid,bool _subscribed,std::function<void (bool)> _accepted_cb)944 void application_impl::on_subscription(service_t _service, instance_t _instance,
945 eventgroup_t _eventgroup, client_t _client, uid_t _uid, gid_t _gid,
946 bool _subscribed, std::function<void(bool)> _accepted_cb) {
947 bool handler_found = false;
948 std::pair<subscription_handler_t, async_subscription_handler_t> its_handlers;
949 {
950 std::lock_guard<std::mutex> its_lock(subscription_mutex_);
951 auto found_service = subscription_.find(_service);
952 if (found_service != subscription_.end()) {
953 auto found_instance = found_service->second.find(_instance);
954 if (found_instance != found_service->second.end()) {
955 auto found_eventgroup = found_instance->second.find(_eventgroup);
956 if (found_eventgroup != found_instance->second.end()) {
957 its_handlers = found_eventgroup->second;
958 handler_found = true;
959 }
960 }
961 }
962 }
963
964 if (handler_found) {
965 if(auto its_handler = its_handlers.first) {
966 // "normal" subscription handler exists
967 _accepted_cb(its_handler(_client, _uid, _gid, _subscribed));
968 } else if(auto its_handler = its_handlers.second) {
969 // async subscription handler exists
970 its_handler(_client, _uid, _gid, _subscribed, _accepted_cb);
971 }
972 } else {
973 _accepted_cb(true);
974 }
975 }
976
register_subscription_handler(service_t _service,instance_t _instance,eventgroup_t _eventgroup,subscription_handler_t _handler)977 void application_impl::register_subscription_handler(service_t _service,
978 instance_t _instance, eventgroup_t _eventgroup,
979 subscription_handler_t _handler) {
980
981 std::lock_guard<std::mutex> its_lock(subscription_mutex_);
982 subscription_[_service][_instance][_eventgroup] = std::make_pair(_handler, nullptr);
983 }
984
unregister_subscription_handler(service_t _service,instance_t _instance,eventgroup_t _eventgroup)985 void application_impl::unregister_subscription_handler(service_t _service,
986 instance_t _instance, eventgroup_t _eventgroup) {
987 std::lock_guard<std::mutex> its_lock(subscription_mutex_);
988 auto found_service = subscription_.find(_service);
989 if (found_service != subscription_.end()) {
990 auto found_instance = found_service->second.find(_instance);
991 if (found_instance != found_service->second.end()) {
992 auto found_eventgroup = found_instance->second.find(_eventgroup);
993 if (found_eventgroup != found_instance->second.end()) {
994 found_instance->second.erase(_eventgroup);
995 }
996 }
997 }
998 }
999
on_subscription_status(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,uint16_t _error)1000 void application_impl::on_subscription_status(service_t _service,
1001 instance_t _instance, eventgroup_t _eventgroup, event_t _event,
1002 uint16_t _error) {
1003 bool entry_found(false);
1004 {
1005 auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, _event);
1006 std::lock_guard<std::mutex> its_lock(subscriptions_state_mutex_);
1007 auto its_subscription_state = subscription_state_.find(its_tuple);
1008 if (its_subscription_state == subscription_state_.end()) {
1009 its_tuple = std::make_tuple(_service, _instance, _eventgroup, ANY_EVENT);
1010 auto its_any_subscription_state = subscription_state_.find(its_tuple);
1011 if (its_any_subscription_state == subscription_state_.end()) {
1012 VSOMEIP_TRACE << std::hex << get_client( )
1013 << " application_impl::on_subscription_status: "
1014 << "Received a subscription status without subscribe for "
1015 << std::hex << _service << "/" << _instance << "/"
1016 << _eventgroup << "/" << _event << "/error=" << _error;
1017 } else {
1018 entry_found = true;
1019 }
1020 } else {
1021 entry_found = true;
1022 }
1023 if (entry_found) {
1024 if (_error) {
1025 subscription_state_[its_tuple] =
1026 subscription_state_e::SUBSCRIPTION_NOT_ACKNOWLEDGED;
1027 } else {
1028 subscription_state_[its_tuple] =
1029 subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED;
1030 }
1031 }
1032 }
1033
1034 if (entry_found) {
1035 deliver_subscription_state(_service, _instance, _eventgroup, _event, _error);
1036 }
1037 }
1038
deliver_subscription_state(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,uint16_t _error)1039 void application_impl::deliver_subscription_state(service_t _service, instance_t _instance,
1040 eventgroup_t _eventgroup, event_t _event, uint16_t _error) {
1041 std::vector<subscription_status_handler_t> handlers;
1042 {
1043 std::lock_guard<std::mutex> its_lock(subscription_status_handlers_mutex_);
1044 auto found_service = subscription_status_handlers_.find(_service);
1045 if (found_service != subscription_status_handlers_.end()) {
1046 auto found_instance = found_service->second.find(_instance);
1047 if (found_instance != found_service->second.end()) {
1048 auto found_eventgroup = found_instance->second.find(_eventgroup);
1049 if (found_eventgroup != found_instance->second.end()) {
1050 auto found_event = found_eventgroup->second.find(_event);
1051 if (found_event != found_eventgroup->second.end()) {
1052 if (!_error || (_error && found_event->second.second)) {
1053 handlers.push_back(found_event->second.first);
1054 }
1055 } else {
1056 auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
1057 if (its_any_event != found_eventgroup->second.end()) {
1058 if (!_error || (_error && its_any_event->second.second)) {
1059 handlers.push_back(its_any_event->second.first);
1060 }
1061 }
1062 }
1063 }
1064 }
1065 found_instance = found_service->second.find(ANY_INSTANCE);
1066 if (found_instance != found_service->second.end()) {
1067 auto found_eventgroup = found_instance->second.find(_eventgroup);
1068 if (found_eventgroup != found_instance->second.end()) {
1069 auto found_event = found_eventgroup->second.find(_event);
1070 if (found_event != found_eventgroup->second.end()) {
1071 if (!_error || (_error && found_event->second.second)) {
1072 handlers.push_back(found_event->second.first);
1073 }
1074 } else {
1075 auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
1076 if (its_any_event != found_eventgroup->second.end()) {
1077 if (!_error || (_error && its_any_event->second.second)) {
1078 handlers.push_back(its_any_event->second.first);
1079 }
1080 }
1081 }
1082 }
1083 }
1084 }
1085 found_service = subscription_status_handlers_.find(ANY_SERVICE);
1086 if (found_service != subscription_status_handlers_.end()) {
1087 auto found_instance = found_service->second.find(_instance);
1088 if (found_instance != found_service->second.end()) {
1089 auto found_eventgroup = found_instance->second.find(_eventgroup);
1090 if (found_eventgroup != found_instance->second.end()) {
1091 auto found_event = found_eventgroup->second.find(_event);
1092 if (found_event != found_eventgroup->second.end()) {
1093 if (!_error || (_error && found_event->second.second)) {
1094 handlers.push_back(found_event->second.first);
1095 }
1096 } else {
1097 auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
1098 if (its_any_event != found_eventgroup->second.end()) {
1099 if (!_error || (_error && its_any_event->second.second)) {
1100 handlers.push_back(its_any_event->second.first);
1101 }
1102 }
1103 }
1104 }
1105 }
1106 found_instance = found_service->second.find(ANY_INSTANCE);
1107 if (found_instance != found_service->second.end()) {
1108 auto found_eventgroup = found_instance->second.find(_eventgroup);
1109 if (found_eventgroup != found_instance->second.end()) {
1110 auto found_event = found_eventgroup->second.find(_event);
1111 if (found_event != found_eventgroup->second.end()) {
1112 if (!_error || (_error && found_event->second.second)) {
1113 handlers.push_back(found_event->second.first);
1114 }
1115 } else {
1116 auto its_any_event = found_eventgroup->second.find(ANY_EVENT);
1117 if (its_any_event != found_eventgroup->second.end()) {
1118 if (!_error || (_error && its_any_event->second.second)) {
1119 handlers.push_back(its_any_event->second.first);
1120 }
1121 }
1122 }
1123 }
1124 }
1125 }
1126 }
1127 {
1128 std::unique_lock<std::mutex> handlers_lock(handlers_mutex_);
1129 for (auto &handler : handlers) {
1130 std::shared_ptr<sync_handler> its_sync_handler
1131 = std::make_shared<sync_handler>([handler, _service,
1132 _instance, _eventgroup,
1133 _event, _error]() {
1134 handler(_service, _instance,
1135 _eventgroup, _event, _error);
1136 });
1137 its_sync_handler->handler_type_ = handler_type_e::SUBSCRIPTION;
1138 its_sync_handler->service_id_ = _service;
1139 its_sync_handler->instance_id_ = _instance;
1140 its_sync_handler->method_id_ = _event;
1141 its_sync_handler->eventgroup_id_ = _eventgroup;
1142 handlers_.push_back(its_sync_handler);
1143 }
1144 if (handlers.size()) {
1145 dispatcher_condition_.notify_one();
1146 }
1147 }
1148 }
1149
register_subscription_status_handler(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event,subscription_status_handler_t _handler,bool _is_selective)1150 void application_impl::register_subscription_status_handler(service_t _service,
1151 instance_t _instance, eventgroup_t _eventgroup, event_t _event,
1152 subscription_status_handler_t _handler, bool _is_selective) {
1153 std::lock_guard<std::mutex> its_lock(subscription_status_handlers_mutex_);
1154 if (_handler) {
1155 subscription_status_handlers_[_service][_instance][_eventgroup][_event] =
1156 std::make_pair(_handler, _is_selective);
1157 } else {
1158 VSOMEIP_WARNING <<
1159 "application_impl::register_subscription_status_handler: "
1160 "_handler is null, for unregistration please use "
1161 "application_impl::unregister_subscription_status_handler ["
1162 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1163 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
1164 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "."
1165 << std::hex << std::setw(4) << std::setfill('0') << _event << "]";
1166 }
1167 }
1168
unregister_subscription_status_handler(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)1169 void application_impl::unregister_subscription_status_handler(service_t _service,
1170 instance_t _instance, eventgroup_t _eventgroup, event_t _event) {
1171 std::lock_guard<std::mutex> its_lock(subscription_status_handlers_mutex_);
1172 auto its_service = subscription_status_handlers_.find(_service);
1173 if (its_service != subscription_status_handlers_.end()) {
1174 auto its_instance = its_service->second.find(_instance);
1175 if (its_instance != its_service->second.end()) {
1176 auto its_eventgroup = its_instance->second.find(_eventgroup);
1177 if (its_eventgroup != its_instance->second.end()) {
1178 its_eventgroup->second.erase(_event);
1179 if (its_eventgroup->second.empty()) {
1180 its_instance->second.erase(_eventgroup);
1181 if (its_instance->second.empty()) {
1182 its_service->second.erase(_instance);
1183 if (its_service->second.empty()) {
1184 subscription_status_handlers_.erase(_service);
1185 }
1186 }
1187 }
1188 }
1189 }
1190 }
1191 }
1192
register_message_handler(service_t _service,instance_t _instance,method_t _method,message_handler_t _handler)1193 void application_impl::register_message_handler(service_t _service,
1194 instance_t _instance, method_t _method, message_handler_t _handler) {
1195 std::lock_guard<std::mutex> its_lock(members_mutex_);
1196 members_[_service][_instance][_method] = _handler;
1197 }
1198
unregister_message_handler(service_t _service,instance_t _instance,method_t _method)1199 void application_impl::unregister_message_handler(service_t _service,
1200 instance_t _instance, method_t _method) {
1201 std::lock_guard<std::mutex> its_lock(members_mutex_);
1202 auto found_service = members_.find(_service);
1203 if (found_service != members_.end()) {
1204 auto found_instance = found_service->second.find(_instance);
1205 if (found_instance != found_service->second.end()) {
1206 auto found_method = found_instance->second.find(_method);
1207 if (found_method != found_instance->second.end()) {
1208 found_instance->second.erase(_method);
1209 }
1210 }
1211 }
1212 }
1213
offer_event(service_t _service,instance_t _instance,event_t _notifier,const std::set<eventgroup_t> & _eventgroups,event_type_e _type,std::chrono::milliseconds _cycle,bool _change_resets_cycle,bool _update_on_change,const epsilon_change_func_t & _epsilon_change_func,reliability_type_e _reliability)1214 void application_impl::offer_event(service_t _service, instance_t _instance,
1215 event_t _notifier, const std::set<eventgroup_t> &_eventgroups,
1216 event_type_e _type,
1217 std::chrono::milliseconds _cycle, bool _change_resets_cycle,
1218 bool _update_on_change,
1219 const epsilon_change_func_t &_epsilon_change_func,
1220 reliability_type_e _reliability) {
1221 if (routing_)
1222 routing_->register_event(client_,
1223 _service, _instance,
1224 _notifier, _eventgroups, _type, _reliability,
1225 _cycle, _change_resets_cycle, _update_on_change,
1226 _epsilon_change_func, true);
1227 }
1228
stop_offer_event(service_t _service,instance_t _instance,event_t _event)1229 void application_impl::stop_offer_event(service_t _service, instance_t _instance,
1230 event_t _event) {
1231 if (routing_)
1232 routing_->unregister_event(client_, _service, _instance, _event, true);
1233 }
1234
request_event(service_t _service,instance_t _instance,event_t _event,const std::set<eventgroup_t> & _eventgroups,event_type_e _type,reliability_type_e _reliability)1235 void application_impl::request_event(service_t _service, instance_t _instance,
1236 event_t _event, const std::set<eventgroup_t> &_eventgroups,
1237 event_type_e _type, reliability_type_e _reliability) {
1238 if (routing_)
1239 routing_->register_event(client_,
1240 _service, _instance,
1241 _event, _eventgroups, _type, _reliability,
1242 std::chrono::milliseconds::zero(), false, true,
1243 nullptr,
1244 false);
1245 }
1246
release_event(service_t _service,instance_t _instance,event_t _event)1247 void application_impl::release_event(service_t _service, instance_t _instance,
1248 event_t _event) {
1249 if (routing_)
1250 routing_->unregister_event(client_, _service, _instance, _event, false);
1251 }
1252
1253 // Interface "routing_manager_host"
get_name() const1254 const std::string & application_impl::get_name() const {
1255 return name_;
1256 }
1257
get_client() const1258 client_t application_impl::get_client() const {
1259 return client_;
1260 }
1261
set_client(const client_t & _client)1262 void application_impl::set_client(const client_t &_client) {
1263 client_ = _client;
1264 }
1265
get_session()1266 session_t application_impl::get_session() {
1267
1268 #ifdef VSOMEIP_HAS_SESSION_HANDLING_CONFIG
1269 if (!has_session_handling_)
1270 return (0);
1271 #endif // VSOMEIP_HAS_SESSION_HANDLING_CONFIG
1272
1273 std::lock_guard<std::mutex> its_lock(session_mutex_);
1274 if (0 == ++session_) {
1275 // Smallest allowed session identifier
1276 session_ = 1;
1277 }
1278
1279 return session_;
1280 }
1281
get_configuration() const1282 std::shared_ptr<configuration> application_impl::get_configuration() const {
1283 return configuration_;
1284 }
1285
get_diagnosis() const1286 diagnosis_t application_impl::get_diagnosis() const {
1287 return configuration_->get_diagnosis_address();
1288 }
1289
get_io()1290 boost::asio::io_service & application_impl::get_io() {
1291 return io_;
1292 }
1293
on_state(state_type_e _state)1294 void application_impl::on_state(state_type_e _state) {
1295 {
1296 std::lock_guard<std::recursive_mutex> availability_lock(availability_mutex_);
1297 if (state_ != _state) {
1298 state_ = _state;
1299 if (state_ == state_type_e::ST_REGISTERED) {
1300 for (const auto &its_service : availability_) {
1301 for (const auto &its_instance : its_service.second) {
1302 for (const auto &its_major : its_instance.second) {
1303 for (const auto &its_minor : its_major.second) {
1304 if (!its_minor.second.second) {
1305 do_register_availability_handler(
1306 its_service.first,
1307 its_instance.first,
1308 its_minor.second.first,
1309 its_major.first, its_minor.first);
1310 }
1311 }
1312 }
1313 }
1314 }
1315 } else {
1316 // Call on_availability callback on each service
1317 for (const auto &its_service : availability_) {
1318 for (const auto &its_instance : its_service.second) {
1319 for (const auto &its_major : its_instance.second) {
1320 for (const auto &its_minor : its_major.second) {
1321 on_availability(its_service.first, its_instance.first, false, its_major.first, its_minor.first);
1322 }
1323 }
1324 }
1325 }
1326 }
1327 }
1328 }
1329 bool has_state_handler(false);
1330 state_handler_t handler = nullptr;
1331 {
1332 std::lock_guard<std::mutex> its_lock(state_handler_mutex_);
1333 if (handler_) {
1334 has_state_handler = true;
1335 handler = handler_;
1336 }
1337 }
1338 if (has_state_handler) {
1339 std::lock_guard<std::mutex> its_lock(handlers_mutex_);
1340 std::shared_ptr<sync_handler> its_sync_handler
1341 = std::make_shared<sync_handler>([handler, _state]() {
1342 handler(_state);
1343 });
1344 its_sync_handler->handler_type_ = handler_type_e::STATE;
1345 handlers_.push_back(its_sync_handler);
1346 dispatcher_condition_.notify_one();
1347 }
1348 }
1349
on_availability(service_t _service,instance_t _instance,bool _is_available,major_version_t _major,minor_version_t _minor)1350 void application_impl::on_availability(service_t _service, instance_t _instance,
1351 bool _is_available, major_version_t _major, minor_version_t _minor) {
1352 std::vector<availability_handler_t> its_handlers;
1353 {
1354 std::lock_guard<std::recursive_mutex> availability_lock(availability_mutex_);
1355 if (_is_available == is_available_unlocked(_service, _instance, _major, _minor)) {
1356 return;
1357 }
1358
1359 if (_is_available) {
1360 available_[_service][_instance][_major] = _minor;
1361 } else {
1362 auto found_available_service = available_.find(_service);
1363 if (found_available_service != available_.end()) {
1364 auto found_instance = found_available_service->second.find(_instance);
1365 if( found_instance != found_available_service->second.end()) {
1366 auto found_major = found_instance->second.find(_major);
1367 if( found_major != found_instance->second.end() ){
1368 if( _minor == found_major->second)
1369 found_available_service->second.erase(_instance);
1370 }
1371 }
1372 }
1373 }
1374
1375 auto find_matching_handler =
1376 [&](const availability_major_minor_t& _av_ma_mi_it) {
1377 auto found_major = _av_ma_mi_it.find(_major);
1378 if (found_major != _av_ma_mi_it.end()) {
1379 for (std::int32_t mi = static_cast<std::int32_t>(_minor); mi >= 0; mi--) {
1380 const auto found_minor = found_major->second.find(static_cast<minor_version_t>(mi));
1381 if (found_minor != found_major->second.end()) {
1382 its_handlers.push_back(found_minor->second.first);
1383 }
1384 }
1385 const auto found_any_minor = found_major->second.find(ANY_MINOR);
1386 if (found_any_minor != found_major->second.end()) {
1387 its_handlers.push_back(found_any_minor->second.first);
1388 }
1389 }
1390 found_major = _av_ma_mi_it.find(ANY_MAJOR);
1391 if (found_major != _av_ma_mi_it.end()) {
1392 for (std::int32_t mi = static_cast<std::int32_t>(_minor); mi >= 0; mi--) {
1393 const auto found_minor = found_major->second.find(static_cast<minor_version_t>(mi));
1394 if (found_minor != found_major->second.end()) {
1395 its_handlers.push_back(found_minor->second.first);
1396 }
1397 }
1398 const auto found_any_minor = found_major->second.find(ANY_MINOR);
1399 if (found_any_minor != found_major->second.end()) {
1400 its_handlers.push_back(found_any_minor->second.first);
1401 }
1402 }
1403 };
1404
1405 auto found_service = availability_.find(_service);
1406 if (found_service != availability_.end()) {
1407 auto found_instance = found_service->second.find(_instance);
1408 if (found_instance != found_service->second.end()) {
1409 find_matching_handler(found_instance->second);
1410 }
1411 found_instance = found_service->second.find(ANY_INSTANCE);
1412 if (found_instance != found_service->second.end()) {
1413 find_matching_handler(found_instance->second);
1414 }
1415 }
1416 found_service = availability_.find(ANY_SERVICE);
1417 if (found_service != availability_.end()) {
1418 auto found_instance = found_service->second.find(_instance);
1419 if( found_instance != found_service->second.end()) {
1420 find_matching_handler(found_instance->second);
1421 }
1422 found_instance = found_service->second.find(ANY_INSTANCE);
1423 if( found_instance != found_service->second.end()) {
1424 find_matching_handler(found_instance->second);
1425 }
1426 }
1427 {
1428 std::lock_guard<std::mutex> handlers_lock(handlers_mutex_);
1429 for (const auto &handler : its_handlers) {
1430 std::shared_ptr<sync_handler> its_sync_handler =
1431 std::make_shared<sync_handler>(
1432 [handler, _service, _instance, _is_available]()
1433 {
1434 handler(_service, _instance, _is_available);
1435 });
1436 its_sync_handler->handler_type_ = handler_type_e::AVAILABILITY;
1437 its_sync_handler->service_id_ = _service;
1438 its_sync_handler->instance_id_ = _instance;
1439 handlers_.push_back(its_sync_handler);
1440 }
1441 }
1442 }
1443 if (!_is_available) {
1444 {
1445 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
1446 auto found_service = subscriptions_.find(_service);
1447 if (found_service != subscriptions_.end()) {
1448 auto found_instance = found_service->second.find(_instance);
1449 if (found_instance != found_service->second.end()) {
1450 for (auto &event : found_instance->second) {
1451 for (auto &eventgroup : event.second) {
1452 eventgroup.second = false;
1453 }
1454 }
1455 }
1456 }
1457 }
1458 {
1459 std::lock_guard<std::mutex> its_lock(subscriptions_state_mutex_);
1460 for (auto &its_subscription_state : subscription_state_) {
1461 if (std::get<0>(its_subscription_state.first) == _service &&
1462 std::get<1>(its_subscription_state.first) == _instance) {
1463 its_subscription_state.second =
1464 subscription_state_e::SUBSCRIPTION_NOT_ACKNOWLEDGED;
1465 }
1466 }
1467 }
1468 }
1469
1470 if (its_handlers.size()) {
1471 std::lock_guard<std::mutex> handlers_lock(handlers_mutex_);
1472 dispatcher_condition_.notify_one();
1473 }
1474 }
1475
on_message(std::shared_ptr<message> && _message)1476 void application_impl::on_message(std::shared_ptr<message> &&_message) {
1477 const service_t its_service = _message->get_service();
1478 const instance_t its_instance = _message->get_instance();
1479 const method_t its_method = _message->get_method();
1480
1481 if (_message->get_message_type() == message_type_e::MT_NOTIFICATION) {
1482 if (!check_for_active_subscription(its_service, its_instance,
1483 static_cast<event_t>(its_method))) {
1484 VSOMEIP_ERROR << "application_impl::on_message ["
1485 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
1486 << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
1487 << std::hex << std::setw(4) << std::setfill('0') << its_method << "]";
1488 return;
1489 }
1490 }
1491
1492 {
1493 std::lock_guard<std::mutex> its_lock(members_mutex_);
1494 std::set<message_handler> its_handlers;
1495 auto found_service = members_.find(its_service);
1496 if (found_service != members_.end()) {
1497 auto found_instance = found_service->second.find(its_instance);
1498 if (found_instance != found_service->second.end()) {
1499 auto found_method = found_instance->second.find(its_method);
1500 if (found_method != found_instance->second.end()) {
1501 its_handlers.insert(found_method->second);
1502 }
1503 auto found_any_method = found_instance->second.find(ANY_METHOD);
1504 if (found_any_method != found_instance->second.end()) {
1505 its_handlers.insert(found_any_method->second);
1506 }
1507 }
1508 auto found_any_instance = found_service->second.find(ANY_INSTANCE);
1509 if (found_any_instance != found_service->second.end()) {
1510 auto found_method = found_any_instance->second.find(its_method);
1511 if (found_method != found_any_instance->second.end()) {
1512 its_handlers.insert(found_method->second);
1513 }
1514 auto found_any_method = found_any_instance->second.find(ANY_METHOD);
1515 if (found_any_method != found_any_instance->second.end()) {
1516 its_handlers.insert(found_any_method->second);
1517 }
1518 }
1519 }
1520 auto found_any_service = members_.find(ANY_SERVICE);
1521 if (found_any_service != members_.end()) {
1522 auto found_instance = found_any_service->second.find(its_instance);
1523 if (found_instance != found_any_service->second.end()) {
1524 auto found_method = found_instance->second.find(its_method);
1525 if (found_method != found_instance->second.end()) {
1526 its_handlers.insert(found_method->second);
1527 }
1528 auto found_any_method = found_instance->second.find(ANY_METHOD);
1529 if (found_any_method != found_instance->second.end()) {
1530 its_handlers.insert(found_any_method->second);
1531 }
1532 }
1533 auto found_any_instance = found_any_service->second.find(ANY_INSTANCE);
1534 if (found_any_instance != found_any_service->second.end()) {
1535 auto found_method = found_any_instance->second.find(its_method);
1536 if (found_method != found_any_instance->second.end()) {
1537 its_handlers.insert(found_method->second);
1538 }
1539 auto found_any_method = found_any_instance->second.find(ANY_METHOD);
1540 if (found_any_method != found_any_instance->second.end()) {
1541 its_handlers.insert(found_any_method->second);
1542 }
1543 }
1544 }
1545
1546 if (its_handlers.size()) {
1547 std::lock_guard<std::mutex> its_lock(handlers_mutex_);
1548 for (const auto &its_handler : its_handlers) {
1549 auto handler = its_handler.handler_;
1550 std::shared_ptr<sync_handler> its_sync_handler =
1551 std::make_shared<sync_handler>([handler, _message]() {
1552 handler(_message);
1553 });
1554 its_sync_handler->handler_type_ = handler_type_e::MESSAGE;
1555 its_sync_handler->service_id_ = _message->get_service();
1556 its_sync_handler->instance_id_ = _message->get_instance();
1557 its_sync_handler->method_id_ = _message->get_method();
1558 its_sync_handler->session_id_ = _message->get_session();
1559 handlers_.push_back(its_sync_handler);
1560 }
1561 dispatcher_condition_.notify_one();
1562 }
1563 }
1564 }
1565
1566 // Interface "service_discovery_host"
get_routing_manager() const1567 routing_manager * application_impl::get_routing_manager() const {
1568 return routing_.get();
1569 }
1570
main_dispatch()1571 void application_impl::main_dispatch() {
1572 #ifndef _WIN32
1573 {
1574 std::stringstream s;
1575 s << std::hex << std::setw(4) << std::setfill('0')
1576 << client_ << "_m_dispatch";
1577 pthread_setname_np(pthread_self(),s.str().c_str());
1578 }
1579 #endif
1580 const std::thread::id its_id = std::this_thread::get_id();
1581 VSOMEIP_INFO << "main dispatch thread id from application: "
1582 << std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
1583 << name_ << ") is: " << std::hex << its_id
1584 #ifndef _WIN32
1585 << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
1586 #endif
1587 ;
1588 std::unique_lock<std::mutex> its_lock(handlers_mutex_);
1589 while (is_dispatching_) {
1590 if (handlers_.empty() || !is_active_dispatcher(its_id)) {
1591 // Cancel other waiting dispatcher
1592 dispatcher_condition_.notify_all();
1593 // Wait for new handlers to execute
1594 while (is_dispatching_ && (handlers_.empty() || !is_active_dispatcher(its_id))) {
1595 dispatcher_condition_.wait(its_lock);
1596 }
1597 } else {
1598 std::shared_ptr<sync_handler> its_handler;
1599 while (is_dispatching_ && is_active_dispatcher(its_id)
1600 && (its_handler = get_next_handler())) {
1601 its_lock.unlock();
1602 invoke_handler(its_handler);
1603
1604 if (!is_dispatching_)
1605 return;
1606
1607 its_lock.lock();
1608
1609 reschedule_availability_handler(its_handler);
1610 remove_elapsed_dispatchers();
1611
1612 #ifdef _WIN32
1613 if(!is_dispatching_) {
1614 its_lock.unlock();
1615 return;
1616 }
1617 #endif
1618 }
1619 }
1620 }
1621 its_lock.unlock();
1622 }
1623
dispatch()1624 void application_impl::dispatch() {
1625 #ifndef _WIN32
1626 {
1627 std::stringstream s;
1628 s << std::hex << std::setw(4) << std::setfill('0')
1629 << client_ << "_dispatch";
1630 pthread_setname_np(pthread_self(),s.str().c_str());
1631 }
1632 #endif
1633 const std::thread::id its_id = std::this_thread::get_id();
1634 VSOMEIP_INFO << "dispatch thread id from application: "
1635 << std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
1636 << name_ << ") is: " << std::hex << its_id
1637 #ifndef _WIN32
1638 << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
1639 #endif
1640 ;
1641 std::unique_lock<std::mutex> its_lock(handlers_mutex_);
1642 while (is_active_dispatcher(its_id)) {
1643 if (is_dispatching_ && handlers_.empty()) {
1644 dispatcher_condition_.wait(its_lock);
1645 // Maybe woken up from main dispatcher
1646 if (handlers_.empty() && !is_active_dispatcher(its_id)) {
1647 if (!is_dispatching_) {
1648 return;
1649 }
1650 std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
1651 elapsed_dispatchers_.insert(its_id);
1652 return;
1653 }
1654 } else {
1655 std::shared_ptr<sync_handler> its_handler;
1656 while (is_dispatching_ && is_active_dispatcher(its_id)
1657 && (its_handler = get_next_handler())) {
1658 its_lock.unlock();
1659 invoke_handler(its_handler);
1660
1661 if (!is_dispatching_)
1662 return;
1663
1664 its_lock.lock();
1665
1666 reschedule_availability_handler(its_handler);
1667 remove_elapsed_dispatchers();
1668 }
1669 }
1670 }
1671 if (is_dispatching_) {
1672 std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
1673 elapsed_dispatchers_.insert(its_id);
1674 }
1675 dispatcher_condition_.notify_all();
1676 }
1677
get_next_handler()1678 std::shared_ptr<application_impl::sync_handler> application_impl::get_next_handler() {
1679 std::shared_ptr<sync_handler> its_next_handler;
1680 while (!handlers_.empty() && !its_next_handler) {
1681 its_next_handler = handlers_.front();
1682 handlers_.pop_front();
1683
1684 // Check handler
1685 if (its_next_handler->handler_type_ == handler_type_e::AVAILABILITY) {
1686 const std::pair<service_t, instance_t> its_si_pair = std::make_pair(
1687 its_next_handler->service_id_,
1688 its_next_handler->instance_id_);
1689 auto found_si = availability_handlers_.find(its_si_pair);
1690 if (found_si != availability_handlers_.end()
1691 && !found_si->second.empty()
1692 && found_si->second.front() != its_next_handler) {
1693 found_si->second.push_back(its_next_handler);
1694 // There is a running availability handler for this service.
1695 // Therefore, this one must wait...
1696 its_next_handler = nullptr;
1697 } else {
1698 availability_handlers_[its_si_pair].push_back(its_next_handler);
1699 }
1700 } else if (its_next_handler->handler_type_ == handler_type_e::MESSAGE) {
1701 const std::pair<service_t, instance_t> its_si_pair = std::make_pair(
1702 its_next_handler->service_id_,
1703 its_next_handler->instance_id_);
1704 auto found_si = availability_handlers_.find(its_si_pair);
1705 if (found_si != availability_handlers_.end()
1706 && found_si->second.size() > 1) {
1707 // The message comes after the next availability handler
1708 // Therefore, queue it to the last one
1709 found_si->second.push_back(its_next_handler);
1710 its_next_handler = nullptr;
1711 }
1712 }
1713 }
1714
1715 return its_next_handler;
1716 }
1717
reschedule_availability_handler(const std::shared_ptr<sync_handler> & _handler)1718 void application_impl::reschedule_availability_handler(
1719 const std::shared_ptr<sync_handler> &_handler) {
1720 if (_handler->handler_type_ == handler_type_e::AVAILABILITY) {
1721 const std::pair<service_t, instance_t> its_si_pair = std::make_pair(
1722 _handler->service_id_, _handler->instance_id_);
1723 auto found_si = availability_handlers_.find(its_si_pair);
1724 if (found_si != availability_handlers_.end()) {
1725 if (!found_si->second.empty()
1726 && found_si->second.front() == _handler) {
1727 found_si->second.pop_front();
1728
1729 // If there are other availability handlers pending, schedule
1730 // them and all handlers that were queued because of them
1731 for (auto it = found_si->second.rbegin();
1732 it != found_si->second.rend(); it++) {
1733 handlers_.push_front(*it);
1734 }
1735 availability_handlers_.erase(found_si);
1736 }
1737 return;
1738 }
1739 VSOMEIP_WARNING << __func__
1740 << ": An unknown availability handler returned!";
1741 }
1742 }
1743
invoke_handler(std::shared_ptr<sync_handler> & _handler)1744 void application_impl::invoke_handler(std::shared_ptr<sync_handler> &_handler) {
1745 const std::thread::id its_id = std::this_thread::get_id();
1746
1747 std::shared_ptr<sync_handler> its_sync_handler
1748 = std::make_shared<sync_handler>(_handler->service_id_,
1749 _handler->instance_id_, _handler->method_id_,
1750 _handler->session_id_, _handler->eventgroup_id_,
1751 _handler->handler_type_);
1752
1753 boost::asio::steady_timer its_dispatcher_timer(io_);
1754 its_dispatcher_timer.expires_from_now(std::chrono::milliseconds(max_dispatch_time_));
1755 its_dispatcher_timer.async_wait([this, its_sync_handler](const boost::system::error_code &_error) {
1756 if (!_error) {
1757 print_blocking_call(its_sync_handler);
1758 if (has_active_dispatcher()) {
1759 std::lock_guard<std::mutex> its_lock(handlers_mutex_);
1760 dispatcher_condition_.notify_all();
1761 } else {
1762 // If possible, create a new dispatcher thread to unblock.
1763 // If this is _not_ possible, dispatching is blocked until
1764 // at least one of the active handler calls returns.
1765 while (is_dispatching_) {
1766 if (dispatcher_mutex_.try_lock()) {
1767 if (dispatchers_.size() < max_dispatchers_) {
1768 if (is_dispatching_) {
1769 auto its_dispatcher = std::make_shared<std::thread>(
1770 std::bind(&application_impl::dispatch, shared_from_this()));
1771 dispatchers_[its_dispatcher->get_id()] = its_dispatcher;
1772 } else {
1773 VSOMEIP_INFO << "Won't start new dispatcher "
1774 "thread as Client=" << std::hex
1775 << get_client() << " is shutting down";
1776 }
1777 } else {
1778 VSOMEIP_ERROR << "Maximum number of dispatchers exceeded.";
1779 }
1780 dispatcher_mutex_.unlock();
1781 break;
1782 } else {
1783 std::this_thread::yield();
1784 }
1785 }
1786 }
1787 }
1788 });
1789 if (client_side_logging_
1790 && (client_side_logging_filter_.empty()
1791 || (1 == client_side_logging_filter_.count(std::make_tuple(its_sync_handler->service_id_, ANY_INSTANCE)))
1792 || (1 == client_side_logging_filter_.count(std::make_tuple(its_sync_handler->service_id_, its_sync_handler->instance_id_))))) {
1793 VSOMEIP_INFO << "Invoking handler: ("
1794 << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
1795 << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->service_id_ << "."
1796 << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->instance_id_ << "."
1797 << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->method_id_ << ":"
1798 << std::hex << std::setw(4) << std::setfill('0') << its_sync_handler->session_id_ << "] "
1799 << "type=" << static_cast<std::uint32_t>(its_sync_handler->handler_type_)
1800 << " thread=" << std::hex << its_id;
1801 }
1802
1803 while (is_dispatching_ ) {
1804 if (dispatcher_mutex_.try_lock()) {
1805 running_dispatchers_.insert(its_id);
1806 dispatcher_mutex_.unlock();
1807 break;
1808 }
1809 std::this_thread::yield();
1810 }
1811
1812 if (is_dispatching_) {
1813 try {
1814 _handler->handler_();
1815 } catch (const std::exception &e) {
1816 VSOMEIP_ERROR << "application_impl::invoke_handler caught exception: "
1817 << e.what();
1818 print_blocking_call(its_sync_handler);
1819 }
1820 }
1821 boost::system::error_code ec;
1822 its_dispatcher_timer.cancel(ec);
1823
1824 while (is_dispatching_ ) {
1825 if (dispatcher_mutex_.try_lock()) {
1826 running_dispatchers_.erase(its_id);
1827 dispatcher_mutex_.unlock();
1828 return;
1829 }
1830 std::this_thread::yield();
1831 }
1832 }
1833
has_active_dispatcher()1834 bool application_impl::has_active_dispatcher() {
1835 while (is_dispatching_) {
1836 if (dispatcher_mutex_.try_lock()) {
1837 for (const auto &d : dispatchers_) {
1838 if (running_dispatchers_.find(d.first) == running_dispatchers_.end() &&
1839 elapsed_dispatchers_.find(d.first) == elapsed_dispatchers_.end()) {
1840 dispatcher_mutex_.unlock();
1841 return true;
1842 }
1843 }
1844 dispatcher_mutex_.unlock();
1845 return false;
1846 }
1847 std::this_thread::yield();
1848 }
1849 return false;
1850 }
1851
is_active_dispatcher(const std::thread::id & _id)1852 bool application_impl::is_active_dispatcher(const std::thread::id &_id) {
1853 while (is_dispatching_) {
1854 if (dispatcher_mutex_.try_lock()) {
1855 for (const auto &d : dispatchers_) {
1856 if (d.first != _id &&
1857 running_dispatchers_.find(d.first) == running_dispatchers_.end() &&
1858 elapsed_dispatchers_.find(d.first) == elapsed_dispatchers_.end()) {
1859 dispatcher_mutex_.unlock();
1860 return false;
1861 }
1862 }
1863 dispatcher_mutex_.unlock();
1864 return true;
1865 }
1866 std::this_thread::yield();
1867 }
1868 return false;
1869 }
1870
remove_elapsed_dispatchers()1871 void application_impl::remove_elapsed_dispatchers() {
1872 if (is_dispatching_) {
1873 std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
1874 for (auto id : elapsed_dispatchers_) {
1875 auto its_dispatcher = dispatchers_.find(id);
1876 if (its_dispatcher->second->joinable())
1877 its_dispatcher->second->join();
1878 dispatchers_.erase(id);
1879 }
1880 elapsed_dispatchers_.clear();
1881 }
1882 }
1883
clear_all_handler()1884 void application_impl::clear_all_handler() {
1885 unregister_state_handler();
1886 {
1887 std::lock_guard<std::mutex> its_lock(offered_services_handler_mutex_);
1888 offered_services_handler_ = nullptr;
1889 }
1890
1891 {
1892 std::lock_guard<std::recursive_mutex> availability_lock(availability_mutex_);
1893 availability_.clear();
1894 }
1895
1896 {
1897 std::lock_guard<std::mutex> its_lock(subscription_mutex_);
1898 subscription_.clear();
1899 }
1900
1901 {
1902 std::lock_guard<std::mutex> its_lock(subscription_error_mutex_);
1903 eventgroup_error_handlers_.clear();
1904 }
1905
1906 {
1907 std::lock_guard<std::mutex> its_lock(members_mutex_);
1908 members_.clear();
1909 }
1910 {
1911 std::lock_guard<std::mutex> its_lock(handlers_mutex_);
1912 handlers_.clear();
1913 }
1914 }
1915
shutdown()1916 void application_impl::shutdown() {
1917 VSOMEIP_INFO << "shutdown thread id from application: "
1918 << std::hex << std::setw(4) << std::setfill('0') << client_ << " ("
1919 << name_ << ") is: " << std::hex << std::this_thread::get_id()
1920 #ifndef _WIN32
1921 << " TID: " << std::dec << static_cast<int>(syscall(SYS_gettid))
1922 #endif
1923 ;
1924 #ifndef _WIN32
1925 boost::asio::detail::posix_signal_blocker blocker;
1926 {
1927 std::stringstream s;
1928 s << std::hex << std::setw(4) << std::setfill('0')
1929 << client_ << "_shutdown";
1930 pthread_setname_np(pthread_self(),s.str().c_str());
1931 }
1932 #endif
1933
1934 {
1935 std::unique_lock<std::mutex> its_lock(start_stop_mutex_);
1936 while(!stopped_) {
1937 stop_cv_.wait(its_lock);
1938 }
1939 }
1940 {
1941 std::lock_guard<std::mutex> its_handler_lock(handlers_mutex_);
1942 is_dispatching_ = false;
1943 dispatcher_condition_.notify_all();
1944 }
1945
1946 try {
1947 std::lock_guard<std::mutex> its_lock(dispatcher_mutex_);
1948 for (const auto& its_dispatcher : dispatchers_) {
1949 if (its_dispatcher.second->get_id() != stop_caller_id_) {
1950 if (its_dispatcher.second->joinable()) {
1951 its_dispatcher.second->join();
1952 }
1953 } else {
1954 // If the caller of stop() is one of our dispatchers
1955 // it can happen the shutdown mechanism will block
1956 // as that thread probably can't be joined. The reason
1957 // is the caller of stop() probably wants to join the
1958 // thread once call start (which got to the IO-Thread)
1959 // and which is expected to return after stop() has been
1960 // called.
1961 // Therefore detach this thread instead of joining because
1962 // after it will return to "main_dispatch" it will be
1963 // properly shutdown anyways because "is_dispatching_"
1964 // was set to "false" here.
1965 its_dispatcher.second->detach();
1966 }
1967 }
1968 availability_handlers_.clear();
1969 running_dispatchers_.clear();
1970 elapsed_dispatchers_.clear();
1971 dispatchers_.clear();
1972 } catch (const std::exception &e) {
1973 VSOMEIP_ERROR << "application_impl::" << __func__ << ": stopping dispatchers, "
1974 << " catched exception: " << e.what();
1975 }
1976
1977 try {
1978 if (routing_)
1979 routing_->stop();
1980 } catch (const std::exception &e) {
1981 VSOMEIP_ERROR << "application_impl::" << __func__ << ": stopping routing, "
1982 << " catched exception: " << e.what();
1983 }
1984
1985 try {
1986 work_.reset();
1987 io_.stop();
1988 } catch (const std::exception &e) {
1989 VSOMEIP_ERROR << "application_impl::" << __func__ << ": stopping io, "
1990 << " catched exception: " << e.what();
1991 }
1992
1993 try {
1994 std::lock_guard<std::mutex> its_lock_start_stop(start_stop_mutex_);
1995 for (const auto& t : io_threads_) {
1996 if (t->joinable()) {
1997 t->join();
1998 }
1999 }
2000 io_threads_.clear();
2001 } catch (const std::exception &e) {
2002 VSOMEIP_ERROR << "application_impl::" << __func__ << ": joining threads, "
2003 << " catched exception: " << e.what();
2004 }
2005 }
2006
is_routing() const2007 bool application_impl::is_routing() const {
2008 return is_routing_manager_host_;
2009 }
2010
send_back_cached_event(service_t _service,instance_t _instance,event_t _event)2011 void application_impl::send_back_cached_event(service_t _service,
2012 instance_t _instance,
2013 event_t _event) {
2014 std::shared_ptr<event> its_event = routing_->find_event(_service,
2015 _instance, _event);
2016 if (its_event && its_event->is_field() && its_event->is_set()) {
2017 std::shared_ptr<message> its_message = runtime_->create_notification();
2018 its_message->set_service(_service);
2019 its_message->set_method(_event);
2020 its_message->set_instance(_instance);
2021 its_message->set_payload(its_event->get_payload());
2022 its_message->set_initial(true);
2023 on_message(std::move(its_message));
2024 VSOMEIP_INFO << "Sending back cached event ("
2025 << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
2026 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2027 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2028 << std::hex << std::setw(4) << std::setfill('0') << _event << "]";
2029 }
2030 }
2031
send_back_cached_eventgroup(service_t _service,instance_t _instance,eventgroup_t _eventgroup)2032 void application_impl::send_back_cached_eventgroup(service_t _service,
2033 instance_t _instance,
2034 eventgroup_t _eventgroup) {
2035 std::set<std::shared_ptr<event>> its_events = routing_->find_events(_service, _instance,
2036 _eventgroup);
2037 for(const auto &its_event : its_events) {
2038 if (its_event && its_event->is_field() && its_event->is_set()) {
2039 std::shared_ptr<message> its_message = runtime_->create_notification();
2040 const event_t its_event_id(its_event->get_event());
2041 its_message->set_service(_service);
2042 its_message->set_method(its_event_id);
2043 its_message->set_instance(_instance);
2044 its_message->set_payload(its_event->get_payload());
2045 its_message->set_initial(true);
2046 on_message(std::move(its_message));
2047 VSOMEIP_INFO << "Sending back cached event ("
2048 << std::hex << std::setw(4) << std::setfill('0') << client_ <<"): ["
2049 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2050 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2051 << std::hex << std::setw(4) << std::setfill('0') << its_event_id
2052 << "] from eventgroup "
2053 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup;
2054 }
2055 }
2056 }
2057
set_routing_state(routing_state_e _routing_state)2058 void application_impl::set_routing_state(routing_state_e _routing_state) {
2059 if (routing_)
2060 routing_->set_routing_state(_routing_state);
2061 }
2062
check_send_back_cached_event(service_t _service,instance_t _instance,event_t _event,eventgroup_t _eventgroup,bool * _send_back_cached_event,bool * _send_back_cached_eventgroup)2063 void application_impl::check_send_back_cached_event(
2064 service_t _service, instance_t _instance, event_t _event,
2065 eventgroup_t _eventgroup, bool *_send_back_cached_event,
2066 bool *_send_back_cached_eventgroup) {
2067 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
2068 *_send_back_cached_event = false;
2069 *_send_back_cached_eventgroup = false;
2070 bool already_subscribed(false);
2071 auto found_service = subscriptions_.find(_service);
2072 if(found_service != subscriptions_.end()) {
2073 auto found_instance = found_service->second.find(_instance);
2074 if (found_instance != found_service->second.end()) {
2075 auto found_event = found_instance->second.find(_event);
2076 if (found_event != found_instance->second.end()) {
2077 auto found_eventgroup = found_event->second.find(_eventgroup);
2078 if (found_eventgroup != found_event->second.end()) {
2079 already_subscribed = true;
2080 if (found_eventgroup->second) {
2081 // initial values for this event have already been
2082 // received, send back cached value
2083 if(_event == ANY_EVENT) {
2084 *_send_back_cached_eventgroup = true;
2085 } else {
2086 *_send_back_cached_event = true;
2087 }
2088 }
2089 }
2090 }
2091 }
2092 }
2093
2094 if (!already_subscribed) {
2095 subscriptions_[_service][_instance][_event][_eventgroup] = false;
2096 }
2097 }
2098
remove_subscription(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)2099 void application_impl::remove_subscription(service_t _service,
2100 instance_t _instance,
2101 eventgroup_t _eventgroup,
2102 event_t _event) {
2103
2104 {
2105 auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, _event);
2106 std::lock_guard<std::mutex> its_lock(subscriptions_state_mutex_);
2107 subscription_state_.erase(its_tuple);
2108 }
2109
2110 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
2111
2112 auto found_service = subscriptions_.find(_service);
2113 if(found_service != subscriptions_.end()) {
2114 auto found_instance = found_service->second.find(_instance);
2115 if (found_instance != found_service->second.end()) {
2116 auto found_event = found_instance->second.find(_event);
2117 if (found_event != found_instance->second.end()) {
2118 if (found_event->second.erase(_eventgroup)) {
2119 if (!found_event->second.size()) {
2120 found_instance->second.erase(_event);
2121 if (!found_instance->second.size()) {
2122 found_service->second.erase(_instance);
2123 if (!found_service->second.size()) {
2124 subscriptions_.erase(_service);
2125 }
2126 }
2127 }
2128 }
2129 }
2130 }
2131 }
2132 }
2133
check_for_active_subscription(service_t _service,instance_t _instance,event_t _event)2134 bool application_impl::check_for_active_subscription(service_t _service,
2135 instance_t _instance,
2136 event_t _event) {
2137 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
2138 auto found_service = subscriptions_.find(_service);
2139 if(found_service != subscriptions_.end()) {
2140 auto found_instance = found_service->second.find(_instance);
2141 if (found_instance != found_service->second.end()) {
2142 auto found_event = found_instance->second.find(_event);
2143 if (found_event != found_instance->second.end()) {
2144 if (found_event->second.size()) {
2145 for (auto &eventgroup : found_event->second) {
2146 eventgroup.second = true;
2147 }
2148 return true;
2149 }
2150 } else {
2151 // Received a event which nobody yet explicitly subscribed to.
2152 // Check if someone subscribed to ANY_EVENT for one of
2153 // the received event's eventgroups
2154 auto found_any_event = found_instance->second.find(ANY_EVENT);
2155 if (found_any_event != found_instance->second.end()) {
2156 if (routing_) {
2157 std::shared_ptr<event> its_event = routing_->find_event(
2158 _service, _instance, _event);
2159 if (its_event) {
2160 for (const auto& eg : its_event->get_eventgroups()) {
2161 auto found_eventgroup = found_any_event->second.find(eg);
2162 if (found_eventgroup != found_any_event->second.end()) {
2163 // set the flag for initial event received to true
2164 // even if we might not already received all of the
2165 // eventgroups events.
2166 found_eventgroup->second = true;
2167 return true;
2168 }
2169 }
2170 }
2171 }
2172 }
2173 }
2174 }
2175 }
2176 // Return false if an event was received from:
2177 // - a service which nobody yet subscribed to
2178 // - a service instance which nobody yet subscribed to
2179 // - a service instance and nobody yet subscribed to one of the event's
2180 // eventgroups
2181 return false;
2182 }
2183
check_subscription_state(service_t _service,instance_t _instance,eventgroup_t _eventgroup,event_t _event)2184 bool application_impl::check_subscription_state(service_t _service, instance_t _instance,
2185 eventgroup_t _eventgroup, event_t _event) {
2186 bool is_acknowledged(false);
2187 bool should_subscribe(true);
2188 {
2189 auto its_tuple = std::make_tuple(_service, _instance, _eventgroup, _event);
2190 std::lock_guard<std::mutex> its_lock(subscriptions_state_mutex_);
2191 auto its_subscription_state = subscription_state_.find(its_tuple);
2192 if (its_subscription_state != subscription_state_.end()) {
2193 if (its_subscription_state->second !=
2194 subscription_state_e::SUBSCRIPTION_NOT_ACKNOWLEDGED) {
2195 // only return true if subscription is NACK
2196 // as only then we need to subscribe!
2197 should_subscribe = false;
2198 if (its_subscription_state->second ==
2199 subscription_state_e::SUBSCRIPTION_ACKNOWLEDGED) {
2200 is_acknowledged = true;
2201 }
2202 }
2203 } else {
2204 subscription_state_[its_tuple] = subscription_state_e::IS_SUBSCRIBING;
2205 }
2206 }
2207
2208 if (!should_subscribe && is_acknowledged) {
2209 // Deliver subscription state only if ACK has already received
2210 deliver_subscription_state(_service, _instance, _eventgroup, _event, 0 /* OK */);
2211 }
2212
2213 return should_subscribe;
2214 }
2215
print_blocking_call(const std::shared_ptr<sync_handler> & _handler)2216 void application_impl::print_blocking_call(const std::shared_ptr<sync_handler>& _handler) {
2217 switch (_handler->handler_type_) {
2218 case handler_type_e::AVAILABILITY:
2219 VSOMEIP_WARNING << "BLOCKING CALL AVAILABILITY("
2220 << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): ["
2221 << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "."
2222 << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "]";
2223 break;
2224 case handler_type_e::MESSAGE:
2225 VSOMEIP_WARNING << "BLOCKING CALL MESSAGE("
2226 << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): ["
2227 << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "."
2228 << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "."
2229 << std::hex << std::setw(4) << std::setfill('0') << _handler->method_id_ << ":"
2230 << std::hex << std::setw(4) << std::setfill('0') << _handler->session_id_ << "]";
2231 break;
2232 case handler_type_e::STATE:
2233 VSOMEIP_WARNING << "BLOCKING CALL STATE("
2234 << std::hex << std::setw(4) << std::setfill('0') << get_client() << ")";
2235 break;
2236 case handler_type_e::SUBSCRIPTION:
2237 VSOMEIP_WARNING << "BLOCKING CALL SUBSCRIPTION("
2238 << std::hex << std::setw(4) << std::setfill('0') << get_client() <<"): ["
2239 << std::hex << std::setw(4) << std::setfill('0') << _handler->service_id_ << "."
2240 << std::hex << std::setw(4) << std::setfill('0') << _handler->instance_id_ << "."
2241 << std::hex << std::setw(4) << std::setfill('0') << _handler->eventgroup_id_ << ":"
2242 << std::hex << std::setw(4) << std::setfill('0') << _handler->method_id_ << "]";
2243 break;
2244 case handler_type_e::OFFERED_SERVICES_INFO:
2245 VSOMEIP_WARNING << "BLOCKING CALL OFFERED_SERVICES_INFO("
2246 << std::hex << std::setw(4) << std::setfill('0') << get_client() <<")";
2247 break;
2248 case handler_type_e::WATCHDOG:
2249 VSOMEIP_WARNING << "BLOCKING CALL WATCHDOG("
2250 << std::hex << std::setw(4) << std::setfill('0') << get_client() <<")";
2251 break;
2252 case handler_type_e::UNKNOWN:
2253 VSOMEIP_WARNING << "BLOCKING CALL UNKNOWN("
2254 << std::hex << std::setw(4) << std::setfill('0') << get_client() << ")";
2255 break;
2256 }
2257 }
2258
2259
get_offered_services_async(offer_type_e _offer_type,offered_services_handler_t _handler)2260 void application_impl::get_offered_services_async(offer_type_e _offer_type, offered_services_handler_t _handler) {
2261 {
2262 std::lock_guard<std::mutex> its_lock(offered_services_handler_mutex_);
2263 offered_services_handler_ = _handler;
2264 }
2265
2266 if (!is_routing_manager_host_) {
2267 routing_->send_get_offered_services_info(get_client(), _offer_type);
2268 } else {
2269 std::vector<std::pair<service_t, instance_t>> its_services;
2270 auto its_routing_manager_host = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2271
2272 for (const auto& s : its_routing_manager_host->get_offered_services()) {
2273 for (const auto& i : s.second) {
2274 auto its_unreliable_endpoint = i.second->get_endpoint(false);
2275 auto its_reliable_endpoint = i.second->get_endpoint(true);
2276
2277 if (_offer_type == offer_type_e::OT_LOCAL) {
2278 if ( ((its_unreliable_endpoint && (its_unreliable_endpoint->get_local_port() == ILLEGAL_PORT))
2279 && (its_reliable_endpoint && (its_reliable_endpoint->get_local_port() == ILLEGAL_PORT)))
2280 || (!its_reliable_endpoint && !its_unreliable_endpoint)) {
2281 its_services.push_back(std::make_pair(s.first, i.first));
2282 }
2283 } else if (_offer_type == offer_type_e::OT_REMOTE) {
2284 if ((its_unreliable_endpoint && its_unreliable_endpoint->get_local_port() != ILLEGAL_PORT)
2285 || (its_reliable_endpoint && its_reliable_endpoint->get_local_port() != ILLEGAL_PORT)) {
2286 its_services.push_back(std::make_pair(s.first, i.first));
2287 }
2288 } else if (_offer_type == offer_type_e::OT_ALL) {
2289 its_services.push_back(std::make_pair(s.first, i.first));
2290 }
2291 }
2292 }
2293 on_offered_services_info(its_services);
2294 }
2295 return;
2296 }
2297
2298
on_offered_services_info(std::vector<std::pair<service_t,instance_t>> & _services)2299 void application_impl::on_offered_services_info(std::vector<std::pair<service_t, instance_t>> &_services) {
2300 bool has_offered_services_handler(false);
2301 offered_services_handler_t handler = nullptr;
2302 {
2303 std::lock_guard<std::mutex> its_lock(offered_services_handler_mutex_);
2304 if (offered_services_handler_) {
2305 has_offered_services_handler = true;
2306 handler = offered_services_handler_;
2307 }
2308 }
2309 if (has_offered_services_handler) {
2310 std::lock_guard<std::mutex> its_lock(handlers_mutex_);
2311 std::shared_ptr<sync_handler> its_sync_handler
2312 = std::make_shared<sync_handler>([handler, _services]() {
2313 handler(_services);
2314 });
2315 its_sync_handler->handler_type_ = handler_type_e::OFFERED_SERVICES_INFO;
2316 handlers_.push_back(its_sync_handler);
2317 dispatcher_condition_.notify_one();
2318 }
2319 }
2320
watchdog_cbk(boost::system::error_code const & _error)2321 void application_impl::watchdog_cbk(boost::system::error_code const &_error) {
2322 if (!_error) {
2323
2324 watchdog_handler_t handler = nullptr;
2325 {
2326 std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
2327 handler = watchdog_handler_;
2328 if (handler && std::chrono::seconds::zero() != watchdog_interval_) {
2329 watchdog_timer_.expires_from_now(watchdog_interval_);
2330 watchdog_timer_.async_wait(std::bind(&application_impl::watchdog_cbk,
2331 this, std::placeholders::_1));
2332 }
2333 }
2334
2335 if (handler) {
2336 std::lock_guard<std::mutex> its_lock(handlers_mutex_);
2337 std::shared_ptr<sync_handler> its_sync_handler
2338 = std::make_shared<sync_handler>([handler]() { handler(); });
2339 its_sync_handler->handler_type_ = handler_type_e::WATCHDOG;
2340 handlers_.push_back(its_sync_handler);
2341 dispatcher_condition_.notify_one();
2342 }
2343 }
2344 }
2345
set_watchdog_handler(watchdog_handler_t _handler,std::chrono::seconds _interval)2346 void application_impl::set_watchdog_handler(watchdog_handler_t _handler,
2347 std::chrono::seconds _interval) {
2348 if (_handler && std::chrono::seconds::zero() != _interval) {
2349 std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
2350 watchdog_handler_ = _handler;
2351 watchdog_interval_ = _interval;
2352 watchdog_timer_.expires_from_now(_interval);
2353 watchdog_timer_.async_wait(std::bind(&application_impl::watchdog_cbk,
2354 this, std::placeholders::_1));
2355 } else {
2356 std::lock_guard<std::mutex> its_lock(watchdog_timer_mutex_);
2357 watchdog_timer_.cancel();
2358 watchdog_handler_ = nullptr;
2359 watchdog_interval_ = std::chrono::seconds::zero();
2360 }
2361 }
2362
register_async_subscription_handler(service_t _service,instance_t _instance,eventgroup_t _eventgroup,async_subscription_handler_t _handler)2363 void application_impl::register_async_subscription_handler(service_t _service,
2364 instance_t _instance, eventgroup_t _eventgroup,
2365 async_subscription_handler_t _handler) {
2366
2367 std::lock_guard<std::mutex> its_lock(subscription_mutex_);
2368 subscription_[_service][_instance][_eventgroup] = std::make_pair(nullptr, _handler);;
2369 }
2370
register_sd_acceptance_handler(sd_acceptance_handler_t _handler)2371 void application_impl::register_sd_acceptance_handler(
2372 sd_acceptance_handler_t _handler) {
2373 if (is_routing() && routing_) {
2374 const auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2375 rm_impl->register_sd_acceptance_handler(_handler);
2376 }
2377 }
2378
register_reboot_notification_handler(reboot_notification_handler_t _handler)2379 void application_impl::register_reboot_notification_handler(
2380 reboot_notification_handler_t _handler) {
2381 if (is_routing() && routing_) {
2382 const auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2383 rm_impl->register_reboot_notification_handler(_handler);
2384 }
2385 }
2386
set_sd_acceptance_required(const remote_info_t & _remote,const std::string & _path,bool _enable)2387 void application_impl::set_sd_acceptance_required(
2388 const remote_info_t &_remote, const std::string &_path, bool _enable) {
2389
2390 if (!is_routing()) {
2391 return;
2392 }
2393
2394 const boost::asio::ip::address its_address(_remote.ip_.is_v4_ ?
2395 static_cast<boost::asio::ip::address>(boost::asio::ip::address_v4(
2396 _remote.ip_.address_.v4_)) :
2397 static_cast<boost::asio::ip::address>(boost::asio::ip::address_v6(
2398 _remote.ip_.address_.v6_)));
2399
2400 if (_remote.first_ == std::numeric_limits<std::uint16_t>::max()
2401 && _remote.last_ == 0) {
2402 // special case to (de)activate rules per IP
2403 configuration_->set_sd_acceptance_rules_active(its_address, _enable);
2404 return;
2405 }
2406
2407 configuration::port_range_t its_range { _remote.first_, _remote.last_ };
2408 configuration_->set_sd_acceptance_rule(its_address,
2409 its_range, port_type_e::PT_UNKNOWN,
2410 _path, _remote.is_reliable_, _enable, true);
2411
2412 if (_enable && routing_) {
2413 const auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2414 rm_impl->sd_acceptance_enabled(its_address, its_range,
2415 _remote.is_reliable_);
2416 }
2417 }
2418
set_sd_acceptance_required(const sd_acceptance_map_type_t & _remotes,bool _enable)2419 void application_impl::set_sd_acceptance_required(
2420 const sd_acceptance_map_type_t& _remotes, bool _enable) {
2421
2422 (void)_remotes;
2423 (void)_enable;
2424
2425 #if 0
2426 if (!is_routing()) {
2427 return;
2428 }
2429
2430 configuration::sd_acceptance_rules_t its_rules;
2431 for (const auto& remote_info : _remotes) {
2432 const boost::asio::ip::address its_address(remote_info.first.ip_.is_v4_ ?
2433 static_cast<boost::asio::ip::address>(boost::asio::ip::address_v4(
2434 remote_info.first.ip_.address_.v4_)) :
2435 static_cast<boost::asio::ip::address>(boost::asio::ip::address_v6(
2436 remote_info.first.ip_.address_.v6_)));
2437 const boost::icl::interval<std::uint16_t>::interval_type its_interval =
2438 remote_info.first.is_range_ ?
2439 boost::icl::interval<std::uint16_t>::closed(
2440 remote_info.first.first_,
2441 ((remote_info.first.last_ == ANY_PORT) ?
2442 std::numeric_limits<std::uint16_t>::max() :
2443 remote_info.first.last_)) :
2444 boost::icl::interval<std::uint16_t>::closed(
2445 remote_info.first.first_, remote_info.first.first_);
2446
2447 const bool its_reliability = remote_info.first.is_reliable_;
2448
2449 const auto found_address = its_rules.find(its_address);
2450 if (found_address != its_rules.end()) {
2451 const auto found_reliability = found_address->second.second.find(
2452 its_reliability);
2453 if (found_reliability != found_address->second.second.end()) {
2454 found_reliability->second.insert(its_interval);
2455 } else {
2456 found_address->second.second.emplace(std::make_pair(
2457 its_reliability,
2458 boost::icl::interval_set<std::uint16_t>(its_interval)));
2459 }
2460 } else {
2461 its_rules.insert(std::make_pair(its_address,
2462 std::make_pair(remote_info.second,
2463 std::map<bool, boost::icl::interval_set<std::uint16_t>>(
2464 {{ its_reliability,
2465 boost::icl::interval_set<std::uint16_t>(
2466 its_interval) } }))));
2467 }
2468 }
2469
2470 configuration_->set_sd_acceptance_rules(its_rules, _enable);
2471 #endif
2472 }
2473
2474 application::sd_acceptance_map_type_t
get_sd_acceptance_required()2475 application_impl::get_sd_acceptance_required() {
2476
2477 sd_acceptance_map_type_t its_ret;
2478
2479 if (is_routing()) {
2480 for (const auto& e : configuration_->get_sd_acceptance_rules()) {
2481 remote_info_t its_remote_info;
2482 its_remote_info.ip_.is_v4_ = e.first.is_v4();
2483 if (its_remote_info.ip_.is_v4_) {
2484 its_remote_info.ip_.address_.v4_ = e.first.to_v4().to_bytes();
2485 } else {
2486 its_remote_info.ip_.address_.v6_ = e.first.to_v6().to_bytes();
2487 }
2488 for (const auto& reliability : e.second.second) {
2489 its_remote_info.is_reliable_ = reliability.first;
2490 for (const auto& port_range : reliability.second.first) {
2491 if (port_range.lower() == port_range.upper()) {
2492 its_remote_info.first_ = port_range.lower();
2493 its_remote_info.last_ = port_range.lower();
2494 its_remote_info.is_range_ = false;
2495 } else {
2496 its_remote_info.first_ = port_range.lower();
2497 its_remote_info.last_ = port_range.upper();
2498 its_remote_info.is_range_ = true;
2499 }
2500 its_ret[its_remote_info] = e.second.first;
2501 }
2502 for (const auto& port_range : reliability.second.second) {
2503 if (port_range.lower() == port_range.upper()) {
2504 its_remote_info.first_ = port_range.lower();
2505 its_remote_info.last_ = port_range.lower();
2506 its_remote_info.is_range_ = false;
2507 } else {
2508 its_remote_info.first_ = port_range.lower();
2509 its_remote_info.last_ = port_range.upper();
2510 its_remote_info.is_range_ = true;
2511 }
2512 its_ret[its_remote_info] = e.second.first;
2513 }
2514 }
2515 }
2516 }
2517
2518 return its_ret;
2519 }
2520
register_routing_ready_handler(routing_ready_handler_t _handler)2521 void application_impl::register_routing_ready_handler(
2522 routing_ready_handler_t _handler) {
2523 if (is_routing() && routing_) {
2524 const auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2525 rm_impl->register_routing_ready_handler(_handler);
2526 }
2527 }
2528
register_routing_state_handler(routing_state_handler_t _handler)2529 void application_impl::register_routing_state_handler(
2530 routing_state_handler_t _handler) {
2531 if (is_routing() && routing_) {
2532 const auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2533 rm_impl->register_routing_state_handler(_handler);
2534 }
2535 }
2536
update_service_configuration(service_t _service,instance_t _instance,std::uint16_t _port,bool _reliable,bool _magic_cookies_enabled,bool _offer)2537 bool application_impl::update_service_configuration(service_t _service,
2538 instance_t _instance,
2539 std::uint16_t _port,
2540 bool _reliable,
2541 bool _magic_cookies_enabled,
2542 bool _offer) {
2543 bool ret = false;
2544 if (!is_routing_manager_host_) {
2545 VSOMEIP_ERROR << __func__ << " is only intended to be called by "
2546 "application acting as routing manager host";
2547 } else if (!routing_) {
2548 VSOMEIP_ERROR << __func__ << " routing is zero";
2549 } else {
2550 auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2551 if (rm_impl) {
2552 if (_offer) {
2553 ret = rm_impl->offer_service_remotely(_service, _instance,
2554 _port, _reliable, _magic_cookies_enabled);
2555 } else {
2556 ret = rm_impl->stop_offer_service_remotely(_service, _instance,
2557 _port, _reliable, _magic_cookies_enabled);
2558 }
2559 }
2560 }
2561 return ret;
2562 }
2563
update_security_policy_configuration(uint32_t _uid,uint32_t _gid,::std::shared_ptr<policy> _policy,std::shared_ptr<payload> _payload,security_update_handler_t _handler)2564 void application_impl::update_security_policy_configuration(uint32_t _uid,
2565 uint32_t _gid,
2566 ::std::shared_ptr<policy> _policy,
2567 std::shared_ptr<payload> _payload,
2568 security_update_handler_t _handler) {
2569 if (!is_routing()) {
2570 VSOMEIP_ERROR << __func__ << " is only intended to be called by "
2571 "application acting as routing manager host";
2572 } else if (!routing_) {
2573 VSOMEIP_ERROR << __func__ << " routing is zero";
2574 } else {
2575 auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2576 if (rm_impl) {
2577 rm_impl->update_security_policy_configuration(_uid, _gid, _policy, _payload, _handler);
2578 }
2579 }
2580 }
2581
remove_security_policy_configuration(uint32_t _uid,uint32_t _gid,security_update_handler_t _handler)2582 void application_impl::remove_security_policy_configuration(uint32_t _uid,
2583 uint32_t _gid,
2584 security_update_handler_t _handler) {
2585 if (!is_routing()) {
2586 VSOMEIP_ERROR << __func__ << " is only intended to be called by "
2587 "application acting as routing manager host";
2588 } else if (!routing_) {
2589 VSOMEIP_ERROR << __func__ << " routing is zero";
2590 } else {
2591 auto rm_impl = std::dynamic_pointer_cast<routing_manager_impl>(routing_);
2592 if (rm_impl) {
2593 rm_impl->remove_security_policy_configuration(_uid, _gid, _handler);
2594 }
2595 }
2596 }
2597
2598 } // namespace vsomeip_v3
2599