1 // Copyright (C) 2014-2018 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <vsomeip/constants.hpp>
7
8 #include <random>
9 #include <forward_list>
10 #include <thread>
11
12 #include <vsomeip/internal/logger.hpp>
13
14 #include "../include/constants.hpp"
15 #include "../include/defines.hpp"
16 #include "../include/deserializer.hpp"
17 #include "../include/enumeration_types.hpp"
18 #include "../include/eventgroupentry_impl.hpp"
19 #include "../include/ipv4_option_impl.hpp"
20 #include "../include/ipv6_option_impl.hpp"
21 #include "../include/selective_option_impl.hpp"
22 #include "../include/message_impl.hpp"
23 #include "../include/remote_subscription_ack.hpp"
24 #include "../include/request.hpp"
25 #include "../include/runtime.hpp"
26 #include "../include/service_discovery_host.hpp"
27 #include "../include/service_discovery_impl.hpp"
28 #include "../include/serviceentry_impl.hpp"
29 #include "../include/subscription.hpp"
30 #include "../../configuration/include/configuration.hpp"
31 #include "../../endpoints/include/endpoint.hpp"
32 #include "../../endpoints/include/client_endpoint.hpp"
33 #include "../../endpoints/include/endpoint_definition.hpp"
34 #include "../../endpoints/include/tcp_server_endpoint_impl.hpp"
35 #include "../../endpoints/include/udp_server_endpoint_impl.hpp"
36 #include "../../message/include/serializer.hpp"
37 #include "../../plugin/include/plugin_manager_impl.hpp"
38 #include "../../routing/include/event.hpp"
39 #include "../../routing/include/eventgroupinfo.hpp"
40 #include "../../routing/include/serviceinfo.hpp"
41 #include "../../utility/include/byteorder.hpp"
42
43 namespace vsomeip_v3 {
44 namespace sd {
45
service_discovery_impl(service_discovery_host * _host,const std::shared_ptr<configuration> & _configuration)46 service_discovery_impl::service_discovery_impl(
47 service_discovery_host *_host,
48 const std::shared_ptr<configuration>& _configuration)
49 : io_(_host->get_io()),
50 host_(_host),
51 configuration_(_configuration),
52 port_(VSOMEIP_SD_DEFAULT_PORT),
53 reliable_(false),
54 serializer_(std::make_shared<serializer>(
55 configuration_->get_buffer_shrink_threshold())),
56 deserializer_(std::make_shared<deserializer>(
57 configuration_->get_buffer_shrink_threshold())),
58 ttl_timer_(_host->get_io()),
59 ttl_timer_runtime_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY / 2),
60 ttl_(VSOMEIP_SD_DEFAULT_TTL),
61 subscription_expiration_timer_(_host->get_io()),
62 max_message_size_(VSOMEIP_MAX_UDP_SD_PAYLOAD),
63 initial_delay_(0),
64 offer_debounce_time_(VSOMEIP_SD_DEFAULT_OFFER_DEBOUNCE_TIME),
65 repetitions_base_delay_(VSOMEIP_SD_DEFAULT_REPETITIONS_BASE_DELAY),
66 repetitions_max_(VSOMEIP_SD_DEFAULT_REPETITIONS_MAX),
67 cyclic_offer_delay_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY),
68 offer_debounce_timer_(_host->get_io()),
69 find_debounce_time_(VSOMEIP_SD_DEFAULT_FIND_DEBOUNCE_TIME),
70 find_debounce_timer_(_host->get_io()),
71 main_phase_timer_(_host->get_io()),
72 is_suspended_(false),
73 is_diagnosis_(false),
74 last_msg_received_timer_(_host->get_io()),
75 last_msg_received_timer_timeout_(VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY +
76 (VSOMEIP_SD_DEFAULT_CYCLIC_OFFER_DELAY / 10)) {
77
78 next_subscription_expiration_ = std::chrono::steady_clock::now() + std::chrono::hours(24);
79 }
80
~service_discovery_impl()81 service_discovery_impl::~service_discovery_impl() {
82 }
83
84 boost::asio::io_service &
get_io()85 service_discovery_impl::get_io() {
86 return io_;
87 }
88
89 void
init()90 service_discovery_impl::init() {
91 runtime_ = std::dynamic_pointer_cast<sd::runtime>(
92 plugin_manager::get()->get_plugin(
93 plugin_type_e::SD_RUNTIME_PLUGIN, VSOMEIP_SD_LIBRARY));
94
95 unicast_ = configuration_->get_unicast_address();
96 sd_multicast_ = configuration_->get_sd_multicast();
97 boost::system::error_code ec;
98 sd_multicast_address_ = boost::asio::ip::address::from_string(sd_multicast_, ec);
99
100 port_ = configuration_->get_sd_port();
101 reliable_ = (configuration_->get_sd_protocol() == "tcp");
102 max_message_size_ = (reliable_ ? VSOMEIP_MAX_TCP_SD_PAYLOAD :
103 VSOMEIP_MAX_UDP_SD_PAYLOAD);
104
105 ttl_ = configuration_->get_sd_ttl();
106
107 // generate random initial delay based on initial delay min and max
108 std::uint32_t initial_delay_min =
109 configuration_->get_sd_initial_delay_min();
110 std::uint32_t initial_delay_max =
111 configuration_->get_sd_initial_delay_max();
112 if (initial_delay_min > initial_delay_max) {
113 const std::uint32_t tmp(initial_delay_min);
114 initial_delay_min = initial_delay_max;
115 initial_delay_max = tmp;
116 }
117
118 std::random_device r;
119 std::mt19937 e(r());
120 std::uniform_int_distribution<std::uint32_t> distribution(
121 initial_delay_min, initial_delay_max);
122 initial_delay_ = std::chrono::milliseconds(distribution(e));
123
124
125 repetitions_base_delay_ = std::chrono::milliseconds(
126 configuration_->get_sd_repetitions_base_delay());
127 repetitions_max_ = configuration_->get_sd_repetitions_max();
128 cyclic_offer_delay_ = std::chrono::milliseconds(
129 configuration_->get_sd_cyclic_offer_delay());
130 offer_debounce_time_ = std::chrono::milliseconds(
131 configuration_->get_sd_offer_debounce_time());
132 ttl_timer_runtime_ = cyclic_offer_delay_ / 2;
133
134 ttl_factor_offers_ = configuration_->get_ttl_factor_offers();
135 ttl_factor_subscriptions_ = configuration_->get_ttl_factor_subscribes();
136 last_msg_received_timer_timeout_ = cyclic_offer_delay_
137 + (cyclic_offer_delay_ / 10);
138 }
139
140 void
start()141 service_discovery_impl::start() {
142 if (!endpoint_) {
143 endpoint_ = host_->create_service_discovery_endpoint(
144 sd_multicast_, port_, reliable_);
145 if (!endpoint_) {
146 VSOMEIP_ERROR << "Couldn't start service discovery";
147 return;
148 }
149 }
150 {
151 std::lock_guard<std::mutex> its_lock(sessions_received_mutex_);
152 sessions_received_.clear();
153 }
154 {
155 std::lock_guard<std::mutex> its_lock(serialize_mutex_);
156 sessions_sent_.clear();
157 }
158
159 if (is_suspended_) {
160 // make sure to sent out FindService messages after resume
161 std::lock_guard<std::mutex> its_lock(requested_mutex_);
162 for (const auto &s : requested_) {
163 for (const auto &i : s.second) {
164 i.second->set_sent_counter(0);
165 }
166 }
167 if (endpoint_ && !reliable_) {
168 auto its_endpoint = std::dynamic_pointer_cast<
169 udp_server_endpoint_impl>(endpoint_);
170 if (its_endpoint)
171 its_endpoint->join(sd_multicast_);
172 }
173 }
174 is_suspended_ = false;
175 start_main_phase_timer();
176 start_offer_debounce_timer(true);
177 start_find_debounce_timer(true);
178 start_ttl_timer();
179 }
180
181 void
stop()182 service_discovery_impl::stop() {
183 is_suspended_ = true;
184 stop_ttl_timer();
185 stop_last_msg_received_timer();
186 }
187
188 void
request_service(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,ttl_t _ttl)189 service_discovery_impl::request_service(
190 service_t _service, instance_t _instance,
191 major_version_t _major, minor_version_t _minor,
192 ttl_t _ttl) {
193 std::lock_guard<std::mutex> its_lock(requested_mutex_);
194 auto find_service = requested_.find(_service);
195 if (find_service != requested_.end()) {
196 auto find_instance = find_service->second.find(_instance);
197 if (find_instance == find_service->second.end()) {
198 find_service->second[_instance]
199 = std::make_shared<request>(_major, _minor, _ttl);
200 }
201 } else {
202 requested_[_service][_instance]
203 = std::make_shared<request>(_major, _minor, _ttl);
204 }
205 }
206
207 void
release_service(service_t _service,instance_t _instance)208 service_discovery_impl::release_service(
209 service_t _service, instance_t _instance) {
210 std::lock_guard<std::mutex> its_lock(requested_mutex_);
211 auto find_service = requested_.find(_service);
212 if (find_service != requested_.end()) {
213 find_service->second.erase(_instance);
214 }
215 }
216
217 void
update_request(service_t _service,instance_t _instance)218 service_discovery_impl::update_request(service_t _service, instance_t _instance) {
219 std::lock_guard<std::mutex> its_lock(requested_mutex_);
220 auto find_service = requested_.find(_service);
221 if (find_service != requested_.end()) {
222 auto find_instance = find_service->second.find(_instance);
223 if (find_instance != find_service->second.end()) {
224 find_instance->second->set_sent_counter(
225 std::uint8_t(repetitions_max_ + 1));
226 }
227 }
228 }
229
230 void
subscribe(service_t _service,instance_t _instance,eventgroup_t _eventgroup,major_version_t _major,ttl_t _ttl,client_t _client,const std::shared_ptr<eventgroupinfo> & _info)231 service_discovery_impl::subscribe(
232 service_t _service, instance_t _instance,
233 eventgroup_t _eventgroup, major_version_t _major,
234 ttl_t _ttl, client_t _client,
235 const std::shared_ptr<eventgroupinfo> &_info) {
236
237 if (is_suspended_) {
238 VSOMEIP_WARNING << "service_discovery::" << __func__
239 << ": Ignoring subscription as we are suspended.";
240 return;
241 }
242
243 #ifdef VSOMEIP_ENABLE_COMPAT
244 bool is_selective(_info ? _info->is_selective() : false);
245 #endif // VSOMEIP_ENABLE_COMPAT
246
247 std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
248 auto found_service = subscribed_.find(_service);
249 if (found_service != subscribed_.end()) {
250 auto found_instance = found_service->second.find(_instance);
251 if (found_instance != found_service->second.end()) {
252 auto found_eventgroup = found_instance->second.find(_eventgroup);
253 if (found_eventgroup != found_instance->second.end()) {
254 auto its_subscription = found_eventgroup->second;
255 #ifdef VSOMEIP_ENABLE_COMPAT
256 if (!its_subscription->is_selective() && is_selective) {
257 its_subscription->set_selective(true);
258 its_subscription->remove_client(VSOMEIP_ROUTING_CLIENT);
259 for (const auto &e : _info->get_events()) {
260 for (const auto &c : e->get_subscribers(_eventgroup)) {
261 its_subscription->add_client(c);
262 }
263 }
264 }
265 #endif // VSOMEIP_ENABLE_COMPAT
266 if (its_subscription->get_major() != _major) {
267 VSOMEIP_ERROR
268 << "Subscriptions to different versions of the same "
269 "service instance are not supported!";
270 } else if (its_subscription->is_selective()) {
271 if (its_subscription->add_client(_client)) {
272 its_subscription->set_state(_client,
273 subscription_state_e::ST_NOT_ACKNOWLEDGED);
274 send_subscription(its_subscription,
275 _service, _instance, _eventgroup,
276 _client);
277 }
278 }
279 return;
280 }
281 }
282 }
283
284 std::shared_ptr<endpoint> its_reliable, its_unreliable;
285 get_subscription_endpoints(_service, _instance,
286 its_reliable, its_unreliable);
287
288 // New subscription
289 std::shared_ptr<subscription> its_subscription
290 = create_subscription(
291 _major, _ttl, its_reliable, its_unreliable, _info);
292
293 if (!its_subscription) {
294 VSOMEIP_ERROR << __func__
295 << ": creating subscription failed!";
296 return;
297 }
298
299 subscribed_[_service][_instance][_eventgroup] = its_subscription;
300
301 its_subscription->add_client(_client);
302 its_subscription->set_state(_client,
303 subscription_state_e::ST_NOT_ACKNOWLEDGED);
304
305 send_subscription(its_subscription,
306 _service, _instance, _eventgroup,
307 _client);
308 }
309
310 void
send_subscription(const std::shared_ptr<subscription> & _subscription,const service_t _service,const instance_t _instance,const eventgroup_t _eventgroup,const client_t _client)311 service_discovery_impl::send_subscription(
312 const std::shared_ptr<subscription> &_subscription,
313 const service_t _service, const instance_t _instance,
314 const eventgroup_t _eventgroup,
315 const client_t _client) {
316 (void)_client;
317
318 auto its_reliable = _subscription->get_endpoint(true);
319 auto its_unreliable = _subscription->get_endpoint(false);
320
321 boost::asio::ip::address its_address;
322 get_subscription_address(its_reliable, its_unreliable, its_address);
323 if (!its_address.is_unspecified()) {
324 entry_data_t its_data;
325 const reliability_type_e its_reliability_type =
326 get_eventgroup_reliability(_service, _instance, _eventgroup, _subscription);
327 if (its_reliability_type == reliability_type_e::RT_UNRELIABLE && its_unreliable) {
328 if (its_unreliable->is_established()) {
329 its_data = create_eventgroup_entry(_service, _instance,
330 _eventgroup, _subscription, its_reliability_type);
331 } else {
332 _subscription->set_udp_connection_established(false);
333 }
334 } else if (its_reliability_type == reliability_type_e::RT_RELIABLE && its_reliable) {
335 if (its_reliable->is_established()) {
336 its_data = create_eventgroup_entry(_service, _instance,
337 _eventgroup, _subscription, its_reliability_type);
338 } else {
339 _subscription->set_tcp_connection_established(false);
340 }
341 } else if (its_reliability_type == reliability_type_e::RT_BOTH &&
342 its_reliable && its_unreliable) {
343 if (its_reliable->is_established() && its_unreliable->is_established()) {
344 its_data = create_eventgroup_entry(_service, _instance,
345 _eventgroup, _subscription, its_reliability_type);
346 } else {
347 if (!its_reliable->is_established()) {
348 _subscription->set_tcp_connection_established(false);
349 }
350 if (!its_unreliable->is_established()) {
351 _subscription->set_udp_connection_established(false);
352 }
353 }
354 } else if (its_reliability_type == reliability_type_e::RT_UNKNOWN) {
355 VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't determine reliability type for subscription to ["
356 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
357 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
358 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] ";
359 }
360
361 if (its_data.entry_) {
362 // TODO: Implement a simple path, that sends a single message
363 auto its_current_message = std::make_shared<message_impl>();
364 std::vector<std::shared_ptr<message_impl> > its_messages;
365 its_messages.push_back(its_current_message);
366
367 add_entry_data(its_messages, its_data);
368
369 serialize_and_send(its_messages, its_address);
370 }
371 }
372 }
373
374 void
get_subscription_endpoints(service_t _service,instance_t _instance,std::shared_ptr<endpoint> & _reliable,std::shared_ptr<endpoint> & _unreliable) const375 service_discovery_impl::get_subscription_endpoints(
376 service_t _service, instance_t _instance,
377 std::shared_ptr<endpoint> &_reliable,
378 std::shared_ptr<endpoint> &_unreliable) const {
379 _unreliable = host_->find_or_create_remote_client(
380 _service, _instance, false);
381 _reliable = host_->find_or_create_remote_client(
382 _service, _instance, true);
383 }
384
385 void
get_subscription_address(const std::shared_ptr<endpoint> & _reliable,const std::shared_ptr<endpoint> & _unreliable,boost::asio::ip::address & _address) const386 service_discovery_impl::get_subscription_address(
387 const std::shared_ptr<endpoint> &_reliable,
388 const std::shared_ptr<endpoint> &_unreliable,
389 boost::asio::ip::address &_address) const {
390 if (_reliable) {
391 auto its_client_endpoint
392 = std::dynamic_pointer_cast<client_endpoint>(_reliable);
393 if (its_client_endpoint) {
394 its_client_endpoint->get_remote_address(_address);
395 return;
396 }
397 }
398 if (_unreliable) {
399 auto its_client_endpoint
400 = std::dynamic_pointer_cast<client_endpoint>(_unreliable);
401 if (its_client_endpoint) {
402 its_client_endpoint->get_remote_address(_address);
403 }
404 }
405 }
406
407 void
unsubscribe(service_t _service,instance_t _instance,eventgroup_t _eventgroup,client_t _client)408 service_discovery_impl::unsubscribe(service_t _service,
409 instance_t _instance, eventgroup_t _eventgroup, client_t _client) {
410 std::shared_ptr < runtime > its_runtime = runtime_.lock();
411 if (!its_runtime) {
412 return;
413 }
414
415 auto its_current_message = std::make_shared<message_impl>();
416
417 boost::asio::ip::address its_address;
418 {
419 std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
420 auto found_service = subscribed_.find(_service);
421 if (found_service != subscribed_.end()) {
422 auto found_instance = found_service->second.find(_instance);
423 if (found_instance != found_service->second.end()) {
424 auto found_eventgroup = found_instance->second.find(_eventgroup);
425 if (found_eventgroup != found_instance->second.end()) {
426 auto its_subscription = found_eventgroup->second;
427 if (its_subscription->remove_client(_client)) {
428 auto its_reliable = its_subscription->get_endpoint(true);
429 auto its_unreliable = its_subscription->get_endpoint(false);
430 get_subscription_address(
431 its_reliable, its_unreliable, its_address);
432 if (!its_subscription->has_client()) {
433 its_subscription->set_ttl(0);
434 } else if (its_subscription->is_selective()) {
435 // create a dummy subscription object to unsubscribe
436 // the single client.
437 auto its_major = its_subscription->get_major();
438
439 its_subscription = std::make_shared<subscription>();
440 its_subscription->set_major(its_major);
441 its_subscription->set_ttl(0);
442 its_subscription->set_selective(true);
443 its_subscription->set_endpoint(its_reliable, true);
444 its_subscription->set_endpoint(its_unreliable, false);
445 }
446 }
447
448 // For selective subscriptions, the client must be added again
449 // to generate the selective option
450 if (its_subscription->is_selective())
451 its_subscription->add_client(_client);
452
453 const reliability_type_e its_reliability_type =
454 get_eventgroup_reliability(_service, _instance, _eventgroup, its_subscription);
455 auto its_data = create_eventgroup_entry(_service, _instance,
456 _eventgroup, its_subscription, its_reliability_type);
457 if (its_data.entry_)
458 its_current_message->add_entry_data(its_data.entry_, its_data.options_);
459
460 // Remove it again before updating (only impacts last unsubscribe)
461 if (its_subscription->is_selective())
462 (void)its_subscription->remove_client(_client);
463
464 // Ensure to update the "real" subscription
465 its_subscription = found_eventgroup->second;
466
467 // Finally update the subscriptions
468 if (!its_subscription->has_client()) {
469 found_instance->second.erase(found_eventgroup);
470 if (found_instance->second.size() == 0) {
471 found_service->second.erase(found_instance);
472 }
473 }
474 }
475 }
476 }
477 }
478
479 std::vector<std::shared_ptr<message_impl> > its_messages;
480 its_messages.push_back(its_current_message);
481
482 serialize_and_send(its_messages, its_address);
483 }
484
485 void
unsubscribe_all(service_t _service,instance_t _instance)486 service_discovery_impl::unsubscribe_all(
487 service_t _service, instance_t _instance) {
488
489 auto its_current_message = std::make_shared<message_impl>();;
490 boost::asio::ip::address its_address;
491
492 {
493 std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
494 auto found_service = subscribed_.find(_service);
495 if (found_service != subscribed_.end()) {
496 auto found_instance = found_service->second.find(_instance);
497 if (found_instance != found_service->second.end()) {
498 for (auto &its_eventgroup : found_instance->second) {
499 auto its_subscription = its_eventgroup.second;
500 its_subscription->set_ttl(0);
501
502 const reliability_type_e its_reliability =
503 get_eventgroup_reliability(_service, _instance,
504 its_eventgroup.first, its_subscription);
505
506 auto its_data = create_eventgroup_entry(_service, _instance,
507 its_eventgroup.first, its_subscription, its_reliability);
508 auto its_reliable = its_subscription->get_endpoint(true);
509 auto its_unreliable = its_subscription->get_endpoint(false);
510 get_subscription_address(
511 its_reliable, its_unreliable, its_address);
512 if (its_data.entry_) {
513 its_current_message->add_entry_data(its_data.entry_, its_data.options_);
514 }
515 }
516 found_instance->second.clear();
517 }
518 }
519 }
520
521 std::vector<std::shared_ptr<message_impl> > its_messages;
522 its_messages.push_back(its_current_message);
523
524 serialize_and_send(its_messages, its_address);
525 }
526
527 void
unsubscribe_all_on_suspend()528 service_discovery_impl::unsubscribe_all_on_suspend() {
529
530 std::map<boost::asio::ip::address,
531 std::vector<std::shared_ptr<message_impl> > > its_stopsubscribes;
532
533 {
534 std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
535 for (auto its_service : subscribed_) {
536 for (auto its_instance : its_service.second) {
537 for (auto &its_eventgroup : its_instance.second) {
538 boost::asio::ip::address its_address;
539 auto its_current_message = std::make_shared<message_impl>();
540 auto its_subscription = its_eventgroup.second;
541 its_subscription->set_ttl(0);
542 const reliability_type_e its_reliability =
543 get_eventgroup_reliability(its_service.first, its_instance.first,
544 its_eventgroup.first, its_subscription);
545 auto its_data = create_eventgroup_entry(its_service.first, its_instance.first,
546 its_eventgroup.first, its_subscription, its_reliability);
547 auto its_reliable = its_subscription->get_endpoint(true);
548 auto its_unreliable = its_subscription->get_endpoint(false);
549 get_subscription_address(
550 its_reliable, its_unreliable, its_address);
551 if (its_data.entry_
552 && its_current_message->add_entry_data(its_data.entry_, its_data.options_)) {
553 its_stopsubscribes[its_address].push_back(its_current_message);
554 } else {
555 VSOMEIP_WARNING << __func__ << ": Failed to create StopSubscribe entry for: "
556 << std::hex << std::setw(4) << std::setfill('0') << its_service.first << "."
557 << std::hex << std::setw(4) << std::setfill('0') << its_instance.first << "."
558 << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup.first
559 << " address: " << its_address.to_string();
560 }
561 }
562 its_instance.second.clear();
563 }
564 its_service.second.clear();
565 }
566 subscribed_.clear();
567 }
568
569 for (auto its_address : its_stopsubscribes) {
570 if (!serialize_and_send(its_address.second, its_address.first)) {
571 VSOMEIP_WARNING << __func__ << ": Failed to send StopSubscribe to address: "
572 << its_address.first.to_string();
573 }
574 }
575 }
576
577 void
remove_subscriptions(service_t _service,instance_t _instance)578 service_discovery_impl::remove_subscriptions(
579 service_t _service, instance_t _instance) {
580
581 std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
582 auto found_service = subscribed_.find(_service);
583 if (found_service != subscribed_.end()) {
584 found_service->second.erase(_instance);
585 if (found_service->second.empty()) {
586 subscribed_.erase(found_service);
587 }
588 }
589 }
590
591 std::pair<session_t, bool>
get_session(const boost::asio::ip::address & _address)592 service_discovery_impl::get_session(
593 const boost::asio::ip::address &_address) {
594 std::pair<session_t, bool> its_session;
595 auto found_session = sessions_sent_.find(_address);
596 if (found_session == sessions_sent_.end()) {
597 its_session = sessions_sent_[_address] = { 1, true };
598 } else {
599 its_session = found_session->second;
600 }
601 return its_session;
602 }
603
604 void
increment_session(const boost::asio::ip::address & _address)605 service_discovery_impl::increment_session(
606 const boost::asio::ip::address &_address) {
607 auto found_session = sessions_sent_.find(_address);
608 if (found_session != sessions_sent_.end()) {
609 found_session->second.first++;
610 if (found_session->second.first == 0) {
611 found_session->second = { 1, false };
612 }
613 }
614 }
615
616 bool
is_reboot(const boost::asio::ip::address & _sender,const boost::asio::ip::address & _destination,bool _reboot_flag,session_t _session)617 service_discovery_impl::is_reboot(
618 const boost::asio::ip::address &_sender,
619 const boost::asio::ip::address &_destination,
620 bool _reboot_flag, session_t _session) {
621 bool result(false);
622
623 auto its_received = sessions_received_.find(_sender);
624 bool is_multicast = _destination.is_multicast();
625
626 // Initialize both sessions with 0. Thus, the session identifier
627 // for the session not being received from the network is stored
628 // as 0 and will never trigger the reboot detection.
629 session_t its_multicast_session(0), its_unicast_session(0);
630
631 // Initialize both flags with true. Thus, the flag not being
632 // received from the network will never trigger the reboot detection.
633 bool its_multicast_reboot_flag(true), its_unicast_reboot_flag(true);
634
635 if (is_multicast) {
636 its_multicast_session = _session;
637 its_multicast_reboot_flag = _reboot_flag;
638 } else {
639 its_unicast_session = _session;
640 its_unicast_reboot_flag = _reboot_flag;
641 }
642
643 if (its_received == sessions_received_.end()) {
644 sessions_received_[_sender]
645 = std::make_tuple(its_multicast_session, its_unicast_session,
646 its_multicast_reboot_flag, its_unicast_reboot_flag);
647 } else {
648 // Reboot detection: Either the flag has changed from false to true,
649 // or the session identifier overrun while the flag is true.
650 if (_reboot_flag
651 && ((is_multicast && !std::get<2>(its_received->second))
652 || (!is_multicast && !std::get<3>(its_received->second)))) {
653 result = true;
654 } else {
655 session_t its_old_session;
656 bool its_old_reboot_flag;
657
658 if (is_multicast) {
659 its_old_session = std::get<0>(its_received->second);
660 its_old_reboot_flag = std::get<2>(its_received->second);
661 } else {
662 its_old_session = std::get<1>(its_received->second);
663 its_old_reboot_flag = std::get<3>(its_received->second);
664 }
665
666 if (its_old_reboot_flag && _reboot_flag
667 && its_old_session >= _session) {
668 result = true;
669 }
670 }
671
672 if (result == false) {
673 // no reboot -> update session/flag
674 if (is_multicast) {
675 std::get<0>(its_received->second) = its_multicast_session;
676 std::get<2>(its_received->second) = its_multicast_reboot_flag;
677 } else {
678 std::get<1>(its_received->second) = its_unicast_session;
679 std::get<3>(its_received->second) = its_unicast_reboot_flag;
680 }
681 } else {
682 // reboot -> reset the sender data
683 sessions_received_.erase(_sender);
684 }
685 }
686
687 return result;
688 }
689
690 void
insert_find_entries(std::vector<std::shared_ptr<message_impl>> & _messages,const requests_t & _requests)691 service_discovery_impl::insert_find_entries(
692 std::vector<std::shared_ptr<message_impl> > &_messages,
693 const requests_t &_requests) {
694
695 entry_data_t its_data;
696 its_data.entry_ = its_data.other_ = nullptr;
697
698 for (const auto& its_service : _requests) {
699 for (const auto& its_instance : its_service.second) {
700 std::lock_guard<std::mutex> its_lock(requested_mutex_);
701 auto its_request = its_instance.second;
702
703 // check if release_service was called / offer was received
704 auto the_service = requested_.find(its_service.first);
705 if ( the_service != requested_.end() ) {
706 auto the_instance = the_service->second.find(its_instance.first);
707 if(the_instance != the_service->second.end() ) {
708 uint8_t its_sent_counter = its_request->get_sent_counter();
709 if (its_sent_counter != repetitions_max_ + 1) {
710 auto its_entry = std::make_shared<serviceentry_impl>();
711 if (its_entry) {
712 its_entry->set_type(entry_type_e::FIND_SERVICE);
713 its_entry->set_service(its_service.first);
714 its_entry->set_instance(its_instance.first);
715 its_entry->set_major_version(its_request->get_major());
716 its_entry->set_minor_version(its_request->get_minor());
717 its_entry->set_ttl(its_request->get_ttl());
718 its_sent_counter++;
719
720 its_request->set_sent_counter(its_sent_counter);
721
722 its_data.entry_ = its_entry;
723 add_entry_data(_messages, its_data);
724 } else {
725 VSOMEIP_ERROR << "Failed to create service entry!";
726 }
727 }
728 }
729 }
730 }
731 }
732 }
733
734 void
insert_offer_entries(std::vector<std::shared_ptr<message_impl>> & _messages,const services_t & _services,bool _ignore_phase)735 service_discovery_impl::insert_offer_entries(
736 std::vector<std::shared_ptr<message_impl> > &_messages,
737 const services_t &_services, bool _ignore_phase) {
738 for (const auto& its_service : _services) {
739 for (const auto& its_instance : its_service.second) {
740 if ((!is_suspended_)
741 && ((!is_diagnosis_)
742 || (is_diagnosis_
743 && !configuration_->is_someip(its_service.first,
744 its_instance.first)))) {
745 // Only insert services with configured endpoint(s)
746 if ((_ignore_phase || its_instance.second->is_in_mainphase())
747 && (its_instance.second->get_endpoint(false)
748 || its_instance.second->get_endpoint(true))) {
749 insert_offer_service(_messages, its_instance.second);
750 }
751 }
752 }
753 }
754 }
755
756 entry_data_t
create_eventgroup_entry(service_t _service,instance_t _instance,eventgroup_t _eventgroup,const std::shared_ptr<subscription> & _subscription,reliability_type_e _reliability_type)757 service_discovery_impl::create_eventgroup_entry(
758 service_t _service, instance_t _instance, eventgroup_t _eventgroup,
759 const std::shared_ptr<subscription> &_subscription,
760 reliability_type_e _reliability_type) {
761
762 entry_data_t its_data;
763 its_data.entry_ = nullptr;
764 its_data.other_ = nullptr;
765
766 std::shared_ptr<endpoint> its_reliable_endpoint(_subscription->get_endpoint(true));
767 std::shared_ptr<endpoint> its_unreliable_endpoint(_subscription->get_endpoint(false));
768
769 bool insert_reliable(false);
770 bool insert_unreliable(false);
771 switch (_reliability_type) {
772 case reliability_type_e::RT_RELIABLE:
773 if (its_reliable_endpoint) {
774 insert_reliable = true;
775 } else {
776 VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
777 "reliable endpoint is zero: ["
778 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
779 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
780 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
781 }
782 break;
783 case reliability_type_e::RT_UNRELIABLE:
784 if (its_unreliable_endpoint) {
785 insert_unreliable = true;
786 } else {
787 VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
788 "unreliable endpoint is zero: ["
789 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
790 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
791 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
792 }
793 break;
794 case reliability_type_e::RT_BOTH:
795 if (its_reliable_endpoint && its_unreliable_endpoint) {
796 insert_reliable = true;
797 insert_unreliable = true;
798 } else {
799 VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
800 "endpoint is zero: ["
801 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
802 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
803 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
804 << " reliable: " << !!its_reliable_endpoint
805 << " unreliable: " << !!its_unreliable_endpoint;
806 }
807 break;
808 default:
809 break;
810 }
811
812 if (!insert_reliable && !insert_unreliable
813 && _reliability_type != reliability_type_e::RT_UNKNOWN) {
814 VSOMEIP_WARNING << __func__ << ": Didn't insert subscription as "
815 "subscription doesn't match reliability type: ["
816 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
817 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
818 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] "
819 << (uint16_t) _reliability_type;
820 return its_data;
821 }
822 std::shared_ptr<eventgroupentry_impl> its_entry, its_other;
823 if (insert_reliable && its_reliable_endpoint) {
824 const std::uint16_t its_port = its_reliable_endpoint->get_local_port();
825 if (its_port) {
826 its_entry = std::make_shared<eventgroupentry_impl>();
827 if (!its_entry) {
828 VSOMEIP_ERROR << __func__
829 << ": Could not create eventgroup entry.";
830 return its_data;
831 }
832
833 its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP);
834 its_entry->set_service(_service);
835 its_entry->set_instance(_instance);
836 its_entry->set_eventgroup(_eventgroup);
837 its_entry->set_counter(0);
838 its_entry->set_major_version(_subscription->get_major());
839 its_entry->set_ttl(_subscription->get_ttl());
840 its_data.entry_ = its_entry;
841
842 for (const auto& its_client : _subscription->get_clients()) {
843 if (_subscription->get_state(its_client)
844 == subscription_state_e::ST_RESUBSCRIBING_NOT_ACKNOWLEDGED) {
845 its_other = std::make_shared<eventgroupentry_impl>();
846 its_other->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP);
847 its_other->set_service(_service);
848 its_other->set_instance(_instance);
849 its_other->set_eventgroup(_eventgroup);
850 its_other->set_counter(0);
851 its_other->set_major_version(_subscription->get_major());
852 its_other->set_ttl(0);
853 its_data.other_ = its_other;
854 break;
855 }
856 }
857
858 auto its_option = create_ip_option(unicast_, its_port, true);
859 its_data.options_.push_back(its_option);
860 } else {
861 VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
862 "local reliable port is zero: ["
863 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
864 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
865 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
866 its_data.entry_ = nullptr;
867 its_data.other_ = nullptr;
868 return its_data;
869 }
870 }
871
872 if (insert_unreliable && its_unreliable_endpoint) {
873 const std::uint16_t its_port = its_unreliable_endpoint->get_local_port();
874 if (its_port) {
875 if (!its_entry) {
876 its_entry = std::make_shared<eventgroupentry_impl>();
877 if (!its_entry) {
878 VSOMEIP_ERROR << __func__
879 << ": Could not create eventgroup entry.";
880 return its_data;
881 }
882
883 its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP);
884 its_entry->set_service(_service);
885 its_entry->set_instance(_instance);
886 its_entry->set_eventgroup(_eventgroup);
887 its_entry->set_counter(0);
888 its_entry->set_major_version(_subscription->get_major());
889 its_entry->set_ttl(_subscription->get_ttl());
890 its_data.entry_ = its_entry;
891 }
892
893 for (const auto& its_client : _subscription->get_clients()) {
894 if (_subscription->get_state(its_client)
895 == subscription_state_e::ST_RESUBSCRIBING_NOT_ACKNOWLEDGED) {
896 if (!its_other) {
897 its_other = std::make_shared<eventgroupentry_impl>();
898 its_other->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP);
899 its_other->set_service(_service);
900 its_other->set_instance(_instance);
901 its_other->set_eventgroup(_eventgroup);
902 its_other->set_counter(0);
903 its_other->set_major_version(_subscription->get_major());
904 its_other->set_ttl(0);
905 its_data.other_ = its_other;
906 break;
907 }
908 }
909 }
910
911 auto its_option = create_ip_option(unicast_, its_port, false);
912 its_data.options_.push_back(its_option);
913 } else {
914 VSOMEIP_WARNING << __func__ << ": Cannot create subscription as "
915 " local unreliable port is zero: ["
916 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
917 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
918 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
919 its_data.entry_ = nullptr;
920 its_data.other_ = nullptr;
921 return its_data;
922 }
923 }
924
925 if (its_entry &&_subscription->is_selective()) {
926 auto its_selective_option = std::make_shared<selective_option_impl>();
927 its_selective_option->set_clients(_subscription->get_clients());
928 its_data.options_.push_back(its_selective_option);
929 }
930
931 if (its_entry && its_other) {
932 its_data.entry_ = its_other;
933 its_data.other_ = its_entry;
934 }
935
936 return its_data;
937 }
938
939 void
insert_subscription_ack(const std::shared_ptr<remote_subscription_ack> & _acknowledgement,const std::shared_ptr<eventgroupinfo> & _info,ttl_t _ttl,const std::shared_ptr<endpoint_definition> & _target,const std::set<client_t> & _clients)940 service_discovery_impl::insert_subscription_ack(
941 const std::shared_ptr<remote_subscription_ack>& _acknowledgement,
942 const std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl,
943 const std::shared_ptr<endpoint_definition> &_target,
944 const std::set<client_t> &_clients) {
945 std::unique_lock<std::recursive_mutex> its_lock(_acknowledgement->get_lock());
946 auto its_message = _acknowledgement->get_current_message();
947
948 auto its_service = _info->get_service();
949 auto its_instance = _info->get_instance();
950 auto its_eventgroup = _info->get_eventgroup();
951 auto its_major = _info->get_major();
952
953 for (const auto& its_entry : its_message->get_entries()) {
954 if (its_entry->is_eventgroup_entry()) {
955 std::shared_ptr<eventgroupentry_impl> its_eventgroup_entry
956 = std::dynamic_pointer_cast<eventgroupentry_impl>(its_entry);
957 if (its_eventgroup_entry->get_type()
958 == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK
959 && its_eventgroup_entry->get_service() == its_service
960 && its_eventgroup_entry->get_instance() == its_instance
961 && its_eventgroup_entry->get_eventgroup() == its_eventgroup
962 && its_eventgroup_entry->get_major_version() == its_major
963 && its_eventgroup_entry->get_ttl() == _ttl) {
964
965 if (_ttl > 0) {
966 if (_target) {
967 if (_target->is_reliable()) {
968 if (!its_eventgroup_entry->get_target(true)) {
969 its_eventgroup_entry->add_target(_target);
970 }
971 } else {
972 if (!its_eventgroup_entry->get_target(false)) {
973 its_eventgroup_entry->add_target(_target);
974 }
975 }
976 }
977 }
978
979 if (_clients.size() > 1 || (*(_clients.begin())) != 0) {
980 auto its_selective_option = its_eventgroup_entry->get_selective_option();
981 if (its_selective_option)
982 its_selective_option->set_clients(_clients);
983 }
984
985 return;
986 }
987 }
988 }
989
990 entry_data_t its_data;
991
992 std::shared_ptr<eventgroupentry_impl> its_entry
993 = std::make_shared<eventgroupentry_impl>();
994 its_entry->set_type(entry_type_e::SUBSCRIBE_EVENTGROUP_ACK);
995 its_entry->set_service(its_service);
996 its_entry->set_instance(its_instance);
997 its_entry->set_eventgroup(its_eventgroup);
998 its_entry->set_major_version(its_major);
999 its_entry->set_reserved(0);
1000 its_entry->set_counter(0);
1001 // SWS_SD_00315
1002 its_entry->set_ttl(_ttl);
1003 if (_ttl > 0) {
1004 if (_target) {
1005 its_entry->add_target(_target);
1006 }
1007
1008 boost::asio::ip::address its_address;
1009 uint16_t its_port;
1010 if (_info->get_multicast(its_address, its_port)
1011 && _info->get_threshold() > 0) {
1012 // SIP_SD_855
1013 // Only insert a multicast option for eventgroups with multicast threshold > 0
1014 auto its_option = create_ip_option(its_address, its_port, false);
1015 its_data.options_.push_back(its_option);
1016 }
1017 }
1018
1019 // Selective
1020 if (_clients.size() > 1 || (*(_clients.begin())) != 0) {
1021 auto its_selective_option = std::make_shared<selective_option_impl>();
1022 (void)its_selective_option->set_clients(_clients);
1023
1024 its_data.options_.push_back(its_selective_option);
1025 }
1026
1027 its_data.entry_ = its_entry;
1028 its_data.other_ = nullptr;
1029
1030 add_entry_data_to_remote_subscription_ack_msg(_acknowledgement, its_data);
1031 }
1032
1033 bool
send(bool _is_announcing)1034 service_discovery_impl::send(bool _is_announcing) {
1035 std::shared_ptr < runtime > its_runtime = runtime_.lock();
1036 if (its_runtime) {
1037 std::vector<std::shared_ptr<message_impl> > its_messages;
1038 std::shared_ptr<message_impl> its_message;
1039
1040 if (_is_announcing) {
1041 its_message = std::make_shared<message_impl>();
1042 its_messages.push_back(its_message);
1043
1044 std::lock_guard<std::mutex> its_lock(offer_mutex_);
1045 services_t its_offers = host_->get_offered_services();
1046 insert_offer_entries(its_messages, its_offers, true);
1047
1048 // Serialize and send
1049 return send(its_messages);
1050 }
1051 }
1052 return false;
1053 }
1054
1055 // Interface endpoint_host
1056 void
on_message(const byte_t * _data,length_t _length,const boost::asio::ip::address & _sender,const boost::asio::ip::address & _destination)1057 service_discovery_impl::on_message(
1058 const byte_t *_data, length_t _length,
1059 const boost::asio::ip::address &_sender,
1060 const boost::asio::ip::address &_destination) {
1061 #if 0
1062 std::stringstream msg;
1063 msg << "sdi::on_message: ";
1064 for (length_t i = 0; i < _length; ++i)
1065 msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
1066 VSOMEIP_INFO << msg.str();
1067 #endif
1068 std::lock_guard<std::mutex> its_lock(check_ttl_mutex_);
1069 std::lock_guard<std::mutex> its_session_lock(sessions_received_mutex_);
1070
1071 if(is_suspended_) {
1072 return;
1073 }
1074 // ignore all SD messages with source address equal to node's unicast address
1075 if (!check_source_address(_sender)) {
1076 return;
1077 }
1078 const bool received_via_mcast = (_destination == sd_multicast_address_);
1079 if (received_via_mcast) {
1080 static bool must_start_last_msg_received_timer(true);
1081 boost::system::error_code ec;
1082
1083 std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_);
1084 if (0 < last_msg_received_timer_.cancel(ec) || must_start_last_msg_received_timer) {
1085 must_start_last_msg_received_timer = false;
1086 last_msg_received_timer_.expires_from_now(
1087 last_msg_received_timer_timeout_, ec);
1088 last_msg_received_timer_.async_wait(
1089 std::bind(&service_discovery_impl::on_last_msg_received_timer_expired,
1090 shared_from_this(), std::placeholders::_1));
1091 }
1092 }
1093
1094 current_remote_address_ = _sender;
1095 deserializer_->set_data(_data, _length);
1096 std::shared_ptr<message_impl> its_message(
1097 deserializer_->deserialize_sd_message());
1098 deserializer_->reset();
1099 if (its_message) {
1100 // ignore all messages which are sent with invalid header fields
1101 if(!check_static_header_fields(its_message)) {
1102 return;
1103 }
1104 // Expire all subscriptions / services in case of reboot
1105 if (is_reboot(_sender, _destination,
1106 its_message->get_reboot_flag(), its_message->get_session())) {
1107 VSOMEIP_INFO << "Reboot detected: IP=" << _sender.to_string();
1108 remove_remote_offer_type_by_ip(_sender);
1109 host_->expire_subscriptions(_sender);
1110 host_->expire_services(_sender);
1111 if (reboot_notification_handler_) {
1112 ip_address_t ip;
1113 if (_sender.is_v4()) {
1114 ip.address_.v4_ = _sender.to_v4().to_bytes();
1115 ip.is_v4_ = true;
1116 } else {
1117 ip.address_.v6_ = _sender.to_v6().to_bytes();
1118 ip.is_v4_ = false;
1119 }
1120 reboot_notification_handler_(ip);
1121 }
1122 }
1123
1124 std::vector<std::shared_ptr<option_impl> > its_options
1125 = its_message->get_options();
1126
1127 std::shared_ptr<runtime> its_runtime = runtime_.lock();
1128 if (!its_runtime) {
1129 return;
1130 }
1131
1132 auto its_acknowledgement = std::make_shared<remote_subscription_ack>(_sender);
1133
1134 std::vector<std::shared_ptr<message_impl> > its_resubscribes;
1135 its_resubscribes.push_back(std::make_shared<message_impl>());
1136
1137 const message_impl::entries_t& its_entries = its_message->get_entries();
1138 const message_impl::entries_t::const_iterator its_end = its_entries.end();
1139 bool is_stop_subscribe_subscribe(false);
1140 bool force_initial_events(false);
1141
1142 bool sd_acceptance_queried(false);
1143 expired_ports_t expired_ports;
1144 sd_acceptance_state_t accept_state(expired_ports);
1145
1146 for (auto iter = its_entries.begin(); iter != its_end; iter++) {
1147 if (!sd_acceptance_queried) {
1148 sd_acceptance_queried = true;
1149 if (sd_acceptance_handler_) {
1150 accept_state.sd_acceptance_required_
1151 = configuration_->is_protected_device(_sender);
1152 remote_info_t remote;
1153 remote.first_ = ANY_PORT;
1154 remote.last_ = ANY_PORT;
1155 remote.is_range_ = false;
1156 if (_sender.is_v4()) {
1157 remote.ip_.address_.v4_ = _sender.to_v4().to_bytes();
1158 remote.ip_.is_v4_ = true;
1159 } else {
1160 remote.ip_.address_.v6_ = _sender.to_v6().to_bytes();
1161 remote.ip_.is_v4_ = false;
1162 }
1163 accept_state.accept_entries_ = sd_acceptance_handler_(remote);
1164 } else {
1165 accept_state.accept_entries_ = true;
1166 }
1167 }
1168 if ((*iter)->is_service_entry()) {
1169 std::shared_ptr<serviceentry_impl> its_service_entry
1170 = std::dynamic_pointer_cast<serviceentry_impl>(*iter);
1171 bool its_unicast_flag = its_message->get_unicast_flag();
1172 process_serviceentry(its_service_entry, its_options,
1173 its_unicast_flag, its_resubscribes,
1174 received_via_mcast, accept_state);
1175 } else {
1176 std::shared_ptr<eventgroupentry_impl> its_eventgroup_entry
1177 = std::dynamic_pointer_cast<eventgroupentry_impl>(*iter);
1178
1179 bool must_process(true);
1180 // Do we need to process it?
1181 if (its_eventgroup_entry->get_type()
1182 == entry_type_e::SUBSCRIBE_EVENTGROUP) {
1183 must_process = !has_same(iter, its_end, its_options);
1184 }
1185
1186 if (must_process) {
1187 if (is_stop_subscribe_subscribe) {
1188 force_initial_events = true;
1189 }
1190 is_stop_subscribe_subscribe =
1191 check_stop_subscribe_subscribe(iter, its_end, its_options);
1192 process_eventgroupentry(its_eventgroup_entry, its_options,
1193 its_acknowledgement, _sender, _destination,
1194 is_stop_subscribe_subscribe, force_initial_events,
1195 accept_state);
1196 }
1197
1198 }
1199 }
1200
1201 {
1202 std::unique_lock<std::recursive_mutex> its_lock(its_acknowledgement->get_lock());
1203 its_acknowledgement->complete();
1204 // TODO: Check the following logic...
1205 if (its_acknowledgement->has_subscription()) {
1206 update_acknowledgement(its_acknowledgement);
1207 } else {
1208 if (!its_acknowledgement->is_pending()
1209 && !its_acknowledgement->is_done()) {
1210 send_subscription_ack(its_acknowledgement);
1211 }
1212 }
1213 }
1214
1215 // check resubscriptions for validity
1216 for (auto iter = its_resubscribes.begin(); iter != its_resubscribes.end();) {
1217 if ((*iter)->get_entries().empty() || (*iter)->get_options().empty()) {
1218 iter = its_resubscribes.erase(iter);
1219 } else {
1220 iter++;
1221 }
1222 }
1223 if (!its_resubscribes.empty()) {
1224 serialize_and_send(its_resubscribes, _sender);
1225 }
1226 } else {
1227 VSOMEIP_ERROR << "service_discovery_impl::" << __func__ << ": Deserialization error.";
1228 return;
1229 }
1230 }
1231
1232 // Entry processing
1233 void
process_serviceentry(std::shared_ptr<serviceentry_impl> & _entry,const std::vector<std::shared_ptr<option_impl>> & _options,bool _unicast_flag,std::vector<std::shared_ptr<message_impl>> & _resubscribes,bool _received_via_mcast,const sd_acceptance_state_t & _sd_ac_state)1234 service_discovery_impl::process_serviceentry(
1235 std::shared_ptr<serviceentry_impl> &_entry,
1236 const std::vector<std::shared_ptr<option_impl> > &_options,
1237 bool _unicast_flag,
1238 std::vector<std::shared_ptr<message_impl> > &_resubscribes,
1239 bool _received_via_mcast,
1240 const sd_acceptance_state_t& _sd_ac_state) {
1241
1242 // Read service info from entry
1243 entry_type_e its_type = _entry->get_type();
1244 service_t its_service = _entry->get_service();
1245 instance_t its_instance = _entry->get_instance();
1246 major_version_t its_major = _entry->get_major_version();
1247 minor_version_t its_minor = _entry->get_minor_version();
1248 ttl_t its_ttl = _entry->get_ttl();
1249
1250 // Read address info from options
1251 boost::asio::ip::address its_reliable_address;
1252 uint16_t its_reliable_port(ILLEGAL_PORT);
1253
1254 boost::asio::ip::address its_unreliable_address;
1255 uint16_t its_unreliable_port(ILLEGAL_PORT);
1256
1257 for (auto i : { 1, 2 }) {
1258 for (auto its_index : _entry->get_options(uint8_t(i))) {
1259 if( _options.size() > its_index ) {
1260 std::shared_ptr < option_impl > its_option = _options[its_index];
1261
1262 switch (its_option->get_type()) {
1263 case option_type_e::IP4_ENDPOINT: {
1264 std::shared_ptr < ipv4_option_impl > its_ipv4_option =
1265 std::dynamic_pointer_cast < ipv4_option_impl
1266 > (its_option);
1267
1268 boost::asio::ip::address_v4 its_ipv4_address(
1269 its_ipv4_option->get_address());
1270
1271 if (its_ipv4_option->get_layer_four_protocol()
1272 == layer_four_protocol_e::UDP) {
1273
1274
1275 its_unreliable_address = its_ipv4_address;
1276 its_unreliable_port = its_ipv4_option->get_port();
1277 } else {
1278 its_reliable_address = its_ipv4_address;
1279 its_reliable_port = its_ipv4_option->get_port();
1280 }
1281 break;
1282 }
1283 case option_type_e::IP6_ENDPOINT: {
1284 std::shared_ptr < ipv6_option_impl > its_ipv6_option =
1285 std::dynamic_pointer_cast < ipv6_option_impl
1286 > (its_option);
1287
1288 boost::asio::ip::address_v6 its_ipv6_address(
1289 its_ipv6_option->get_address());
1290
1291 if (its_ipv6_option->get_layer_four_protocol()
1292 == layer_four_protocol_e::UDP) {
1293 its_unreliable_address = its_ipv6_address;
1294 its_unreliable_port = its_ipv6_option->get_port();
1295 } else {
1296 its_reliable_address = its_ipv6_address;
1297 its_reliable_port = its_ipv6_option->get_port();
1298 }
1299 break;
1300 }
1301 case option_type_e::IP4_MULTICAST:
1302 case option_type_e::IP6_MULTICAST:
1303 break;
1304 case option_type_e::CONFIGURATION:
1305 break;
1306 case option_type_e::UNKNOWN:
1307 default:
1308 VSOMEIP_ERROR << __func__ << ": Unsupported service option";
1309 break;
1310 }
1311 }
1312 }
1313 }
1314
1315 if (0 < its_ttl) {
1316 switch(its_type) {
1317 case entry_type_e::FIND_SERVICE:
1318 process_findservice_serviceentry(its_service, its_instance,
1319 its_major, its_minor, _unicast_flag);
1320 break;
1321 case entry_type_e::OFFER_SERVICE:
1322 process_offerservice_serviceentry(its_service, its_instance,
1323 its_major, its_minor, its_ttl,
1324 its_reliable_address, its_reliable_port,
1325 its_unreliable_address, its_unreliable_port, _resubscribes,
1326 _received_via_mcast, _sd_ac_state);
1327 break;
1328 case entry_type_e::UNKNOWN:
1329 default:
1330 VSOMEIP_ERROR << __func__ << ": Unsupported service entry type";
1331 }
1332 } else if (its_type != entry_type_e::FIND_SERVICE
1333 && (!_sd_ac_state.sd_acceptance_required_ || _sd_ac_state.accept_entries_)) {
1334 // stop sending find service in repetition phase
1335 update_request(its_service, its_instance);
1336
1337 remove_remote_offer_type(its_service, its_instance,
1338 its_reliable_address, its_reliable_port,
1339 its_unreliable_address, its_unreliable_port);
1340 remove_subscriptions(its_service, its_instance);
1341 if (!is_diagnosis_ && !is_suspended_) {
1342 host_->del_routing_info(its_service, its_instance,
1343 (its_reliable_port != ILLEGAL_PORT),
1344 (its_unreliable_port != ILLEGAL_PORT));
1345 }
1346 }
1347 }
1348
1349 void
process_offerservice_serviceentry(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,ttl_t _ttl,const boost::asio::ip::address & _reliable_address,uint16_t _reliable_port,const boost::asio::ip::address & _unreliable_address,uint16_t _unreliable_port,std::vector<std::shared_ptr<message_impl>> & _resubscribes,bool _received_via_mcast,const sd_acceptance_state_t & _sd_ac_state)1350 service_discovery_impl::process_offerservice_serviceentry(
1351 service_t _service, instance_t _instance, major_version_t _major,
1352 minor_version_t _minor, ttl_t _ttl,
1353 const boost::asio::ip::address &_reliable_address,
1354 uint16_t _reliable_port,
1355 const boost::asio::ip::address &_unreliable_address,
1356 uint16_t _unreliable_port,
1357 std::vector<std::shared_ptr<message_impl> > &_resubscribes,
1358 bool _received_via_mcast, const sd_acceptance_state_t& _sd_ac_state) {
1359 std::shared_ptr < runtime > its_runtime = runtime_.lock();
1360 if (!its_runtime)
1361 return;
1362
1363 bool is_secure = configuration_->is_secure_service(_service, _instance);
1364 if (is_secure &&
1365 ((_reliable_port != ILLEGAL_PORT &&
1366 !configuration_->is_secure_port(_reliable_address, _reliable_port, true))
1367 || (_unreliable_port != ILLEGAL_PORT
1368 && !configuration_->is_secure_port(_unreliable_address, _unreliable_port, false)))) {
1369
1370 VSOMEIP_WARNING << __func__ << ": Ignoring offer of ["
1371 << std::hex << std::setw(4) << std::setfill('0') << _service
1372 << "."
1373 << std::hex << std::setw(4) << std::setfill('0') << _instance
1374 << "]";
1375 return;
1376 }
1377
1378 // stop sending find service in repetition phase
1379 update_request(_service, _instance);
1380
1381 remote_offer_type_e offer_type(remote_offer_type_e::UNKNOWN);
1382 if (_reliable_port != ILLEGAL_PORT
1383 && _unreliable_port != ILLEGAL_PORT
1384 && !_reliable_address.is_unspecified()
1385 && !_unreliable_address.is_unspecified()) {
1386 offer_type = remote_offer_type_e::RELIABLE_UNRELIABLE;
1387 } else if (_unreliable_port != ILLEGAL_PORT
1388 && !_unreliable_address.is_unspecified()) {
1389 offer_type = remote_offer_type_e::UNRELIABLE;
1390 } else if (_reliable_port != ILLEGAL_PORT
1391 && !_reliable_address.is_unspecified()) {
1392 offer_type = remote_offer_type_e::RELIABLE;
1393 } else {
1394 VSOMEIP_WARNING << __func__ << ": Unknown remote offer type ["
1395 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1396 << std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
1397 return; // Unknown remote offer type --> no way to access it!
1398 }
1399
1400 if (_sd_ac_state.sd_acceptance_required_) {
1401
1402 auto expire_subscriptions_and_services =
1403 [this, &_sd_ac_state](const boost::asio::ip::address& _address,
1404 std::uint16_t _port, bool _reliable) {
1405 const auto its_port_pair = std::make_pair(_reliable, _port);
1406 if (_sd_ac_state.expired_ports_.find(its_port_pair) ==
1407 _sd_ac_state.expired_ports_.end()) {
1408 VSOMEIP_WARNING << "service_discovery_impl::" << __func__
1409 << ": Do not accept offer from "
1410 << _address.to_string() << ":"
1411 << std::dec << _port << " reliable=" << _reliable;
1412 remove_remote_offer_type_by_ip(_address, _port, _reliable);
1413 host_->expire_subscriptions(_address, _port, _reliable);
1414 host_->expire_services(_address, _port, _reliable);
1415 _sd_ac_state.expired_ports_.insert(its_port_pair);
1416 }
1417 };
1418
1419 // return if the registered sd_acceptance handler returned false
1420 // and for the provided port sd_acceptance is necessary
1421 switch (offer_type) {
1422 case remote_offer_type_e::UNRELIABLE:
1423 if (!_sd_ac_state.accept_entries_
1424 && configuration_->is_protected_port(
1425 _unreliable_address, _unreliable_port, false)) {
1426 expire_subscriptions_and_services(_unreliable_address,
1427 _unreliable_port, false);
1428 return;
1429 }
1430 break;
1431 case remote_offer_type_e::RELIABLE:
1432 if (!_sd_ac_state.accept_entries_
1433 && configuration_->is_protected_port(
1434 _reliable_address, _reliable_port, true)) {
1435 expire_subscriptions_and_services(_reliable_address,
1436 _reliable_port, true);
1437 return;
1438 }
1439 break;
1440 case remote_offer_type_e::RELIABLE_UNRELIABLE:
1441 if (!_sd_ac_state.accept_entries_
1442 && (configuration_->is_protected_port(
1443 _unreliable_address, _unreliable_port, false)
1444 || configuration_->is_protected_port(
1445 _reliable_address, _reliable_port, true))) {
1446 expire_subscriptions_and_services(_unreliable_address,
1447 _unreliable_port, false);
1448 expire_subscriptions_and_services(_reliable_address,
1449 _reliable_port, true);
1450 return;
1451 }
1452 break;
1453 case remote_offer_type_e::UNKNOWN:
1454 default:
1455 break;
1456 }
1457 }
1458
1459 if (update_remote_offer_type(_service, _instance, offer_type,
1460 _reliable_address, _reliable_port,
1461 _unreliable_address, _unreliable_port)) {
1462 VSOMEIP_WARNING << __func__ << ": Remote offer type changed ["
1463 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1464 << std::hex << std::setw(4) << std::setfill('0') << _instance << "]";
1465
1466 // Only update eventgroup reliability type if it was initially unknown
1467 auto its_eventgroups = host_->get_subscribed_eventgroups(_service, _instance);
1468 for (auto eg : its_eventgroups) {
1469 auto its_info = host_->find_eventgroup(
1470 _service, _instance, eg);
1471 if (its_info) {
1472 if (its_info->is_reliability_auto_mode()) {
1473 reliability_type_e its_reliability(reliability_type_e::RT_UNKNOWN);
1474 switch (offer_type) {
1475 case remote_offer_type_e::RELIABLE:
1476 its_reliability = reliability_type_e::RT_RELIABLE;
1477 break;
1478 case remote_offer_type_e::UNRELIABLE:
1479 its_reliability = reliability_type_e::RT_UNRELIABLE;
1480 break;
1481 case remote_offer_type_e::RELIABLE_UNRELIABLE:
1482 its_reliability = reliability_type_e::RT_BOTH;
1483 break;
1484 default:
1485 ;
1486 }
1487 if (its_reliability != reliability_type_e::RT_UNKNOWN
1488 && its_reliability != its_info->get_reliability()) {
1489 VSOMEIP_WARNING << "sd::" << __func__ << ": eventgroup reliability type changed ["
1490 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
1491 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
1492 << std::hex << std::setw(4) << std::setfill('0') << eg << "]"
1493 << " using reliability type: "
1494 << std::hex << std::setw(4) << std::setfill('0') << (uint16_t) its_reliability;
1495 its_info->set_reliability(its_reliability);
1496 }
1497 }
1498 }
1499 }
1500 }
1501
1502
1503 // No need to resubscribe for unicast offers
1504 if (_received_via_mcast) {
1505 std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
1506 auto found_service = subscribed_.find(_service);
1507 if (found_service != subscribed_.end()) {
1508 auto found_instance = found_service->second.find(_instance);
1509 if (found_instance != found_service->second.end()) {
1510 if (0 < found_instance->second.size()) {
1511 for (const auto& its_eventgroup : found_instance->second) {
1512 auto its_subscription = its_eventgroup.second;
1513 std::shared_ptr<endpoint> its_reliable, its_unreliable;
1514 get_subscription_endpoints(_service, _instance,
1515 its_reliable, its_unreliable);
1516 its_subscription->set_endpoint(its_reliable, true);
1517 its_subscription->set_endpoint(its_unreliable, false);
1518 for (const auto& its_client : its_subscription->get_clients()) {
1519 if (its_subscription->get_state(its_client)
1520 == subscription_state_e::ST_ACKNOWLEDGED) {
1521 its_subscription->set_state(its_client,
1522 subscription_state_e::ST_RESUBSCRIBING);
1523 } else {
1524 its_subscription->set_state(its_client,
1525 subscription_state_e::ST_RESUBSCRIBING_NOT_ACKNOWLEDGED);
1526 }
1527 }
1528 const reliability_type_e its_reliability =
1529 get_eventgroup_reliability(_service, _instance,
1530 its_eventgroup.first, its_subscription);
1531
1532 auto its_data = create_eventgroup_entry(_service, _instance,
1533 its_eventgroup.first, its_subscription, its_reliability);
1534 if (its_data.entry_) {
1535 add_entry_data(_resubscribes, its_data);
1536 }
1537 for (const auto& its_client : its_subscription->get_clients()) {
1538 its_subscription->set_state(its_client,
1539 subscription_state_e::ST_NOT_ACKNOWLEDGED);
1540 }
1541 }
1542 }
1543 }
1544 }
1545 }
1546
1547 host_->add_routing_info(_service, _instance,
1548 _major, _minor,
1549 _ttl * get_ttl_factor(_service, _instance, ttl_factor_offers_),
1550 _reliable_address, _reliable_port,
1551 _unreliable_address, _unreliable_port);
1552 }
1553
1554 void
process_findservice_serviceentry(service_t _service,instance_t _instance,major_version_t _major,minor_version_t _minor,bool _unicast_flag)1555 service_discovery_impl::process_findservice_serviceentry(
1556 service_t _service, instance_t _instance,
1557 major_version_t _major, minor_version_t _minor,
1558 bool _unicast_flag) {
1559
1560 if (_instance != ANY_INSTANCE) {
1561 std::shared_ptr<serviceinfo> its_info = host_->get_offered_service(
1562 _service, _instance);
1563 if (its_info) {
1564 if (_major == ANY_MAJOR || _major == its_info->get_major()) {
1565 if (_minor == 0xFFFFFFFF || _minor <= its_info->get_minor()) {
1566 if (its_info->get_endpoint(false) || its_info->get_endpoint(true)) {
1567 send_uni_or_multicast_offerservice(its_info, _unicast_flag);
1568 }
1569 }
1570 }
1571 }
1572 } else {
1573 std::map<instance_t, std::shared_ptr<serviceinfo>> offered_instances =
1574 host_->get_offered_service_instances(_service);
1575 // send back all available instances
1576 for (const auto &found_instance : offered_instances) {
1577 auto its_info = found_instance.second;
1578 if (_major == ANY_MAJOR || _major == its_info->get_major()) {
1579 if (_minor == 0xFFFFFFFF || _minor <= its_info->get_minor()) {
1580 if (its_info->get_endpoint(false) || its_info->get_endpoint(true)) {
1581 send_uni_or_multicast_offerservice(its_info, _unicast_flag);
1582 }
1583 }
1584 }
1585 }
1586 }
1587 }
1588
1589 void
send_unicast_offer_service(const std::shared_ptr<const serviceinfo> & _info)1590 service_discovery_impl::send_unicast_offer_service(
1591 const std::shared_ptr<const serviceinfo> &_info) {
1592 std::shared_ptr<runtime> its_runtime = runtime_.lock();
1593 if (!its_runtime) {
1594 return;
1595 }
1596
1597 auto its_offer_message(std::make_shared<message_impl>());
1598 std::vector<std::shared_ptr<message_impl> > its_messages;
1599 its_messages.push_back(its_offer_message);
1600
1601 insert_offer_service(its_messages, _info);
1602
1603 serialize_and_send(its_messages, current_remote_address_);
1604 }
1605
1606 void
send_multicast_offer_service(const std::shared_ptr<const serviceinfo> & _info)1607 service_discovery_impl::send_multicast_offer_service(
1608 const std::shared_ptr<const serviceinfo> &_info) {
1609 auto its_offer_message(std::make_shared<message_impl>());
1610 std::vector<std::shared_ptr<message_impl> > its_messages;
1611 its_messages.push_back(its_offer_message);
1612
1613 insert_offer_service(its_messages, _info);
1614
1615 serialize_and_send(its_messages, current_remote_address_);
1616 }
1617
1618 void
on_endpoint_connected(service_t _service,instance_t _instance,const std::shared_ptr<endpoint> & _endpoint)1619 service_discovery_impl::on_endpoint_connected(
1620 service_t _service, instance_t _instance,
1621 const std::shared_ptr<endpoint> &_endpoint) {
1622 std::shared_ptr<runtime> its_runtime = runtime_.lock();
1623 if (!its_runtime) {
1624 return;
1625 }
1626
1627 // TODO: Simplify this method! It is not clear, why we need to check
1628 // both endpoints here although the method is always called for a
1629 // single one.
1630
1631 std::vector<std::shared_ptr<message_impl> > its_messages;
1632 its_messages.push_back(std::make_shared<message_impl>());
1633 boost::asio::ip::address its_address;
1634
1635 std::shared_ptr<endpoint> its_dummy;
1636 if (_endpoint->is_reliable())
1637 get_subscription_address(_endpoint, its_dummy, its_address);
1638 else
1639 get_subscription_address(its_dummy, _endpoint, its_address);
1640
1641 {
1642 std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
1643 auto found_service = subscribed_.find(_service);
1644 if (found_service != subscribed_.end()) {
1645 auto found_instance = found_service->second.find(_instance);
1646 if (found_instance != found_service->second.end()) {
1647 if (0 < found_instance->second.size()) {
1648 for (const auto &its_eventgroup : found_instance->second) {
1649 std::shared_ptr<subscription> its_subscription(its_eventgroup.second);
1650 if (its_subscription) {
1651 if (!its_subscription->is_tcp_connection_established() ||
1652 !its_subscription->is_udp_connection_established()) {
1653 const std::shared_ptr<const endpoint> its_reliable_endpoint(
1654 its_subscription->get_endpoint(true));
1655 const std::shared_ptr<const endpoint> its_unreliable_endpoint(
1656 its_subscription->get_endpoint(false));
1657 if (its_reliable_endpoint && its_reliable_endpoint->is_established()) {
1658 if (its_reliable_endpoint.get() == _endpoint.get()) {
1659 // mark tcp as established
1660 its_subscription->set_tcp_connection_established(true);
1661 }
1662 }
1663 if (its_unreliable_endpoint && its_unreliable_endpoint->is_established()) {
1664 if (its_unreliable_endpoint.get() == _endpoint.get()) {
1665 // mark udp as established
1666 its_subscription->set_udp_connection_established(true);
1667 }
1668 }
1669
1670 if ((its_reliable_endpoint && its_unreliable_endpoint &&
1671 its_subscription->is_tcp_connection_established() &&
1672 its_subscription->is_udp_connection_established()) ||
1673 (its_reliable_endpoint && !its_unreliable_endpoint &&
1674 its_subscription->is_tcp_connection_established()) ||
1675 (its_unreliable_endpoint && !its_reliable_endpoint &&
1676 its_subscription->is_udp_connection_established())) {
1677
1678 std::shared_ptr<endpoint> its_unreliable;
1679 std::shared_ptr<endpoint> its_reliable;
1680 get_subscription_endpoints(_service, _instance,
1681 its_reliable, its_unreliable);
1682 get_subscription_address(its_reliable, its_unreliable, its_address);
1683
1684 its_subscription->set_endpoint(its_reliable, true);
1685 its_subscription->set_endpoint(its_unreliable, false);
1686 for (const auto its_client : its_subscription->get_clients())
1687 its_subscription->set_state(its_client,
1688 subscription_state_e::ST_NOT_ACKNOWLEDGED);
1689
1690 const reliability_type_e its_reliability_type =
1691 get_eventgroup_reliability(_service, _instance, its_eventgroup.first, its_subscription);
1692 auto its_data = create_eventgroup_entry(_service, _instance,
1693 its_eventgroup.first, its_subscription, its_reliability_type);
1694
1695 if (its_data.entry_) {
1696 add_entry_data(its_messages, its_data);
1697 }
1698 }
1699 }
1700 }
1701 }
1702 }
1703 }
1704 }
1705 }
1706
1707 serialize_and_send(its_messages, its_address);
1708 }
1709
1710 std::shared_ptr<option_impl>
create_ip_option(const boost::asio::ip::address & _address,uint16_t _port,bool _is_reliable) const1711 service_discovery_impl::create_ip_option(
1712 const boost::asio::ip::address &_address, uint16_t _port,
1713 bool _is_reliable) const {
1714 std::shared_ptr<option_impl> its_option;
1715 if (_address.is_v4()) {
1716 its_option = std::make_shared<ipv4_option_impl>(
1717 _address, _port, _is_reliable);
1718 } else {
1719 its_option = std::make_shared<ipv6_option_impl>(
1720 _address, _port, _is_reliable);
1721 }
1722 return its_option;
1723 }
1724
1725 void
insert_offer_service(std::vector<std::shared_ptr<message_impl>> & _messages,const std::shared_ptr<const serviceinfo> & _info)1726 service_discovery_impl::insert_offer_service(
1727 std::vector<std::shared_ptr<message_impl> > &_messages,
1728 const std::shared_ptr<const serviceinfo> &_info) {
1729 entry_data_t its_data;
1730 its_data.entry_ = its_data.other_ = nullptr;
1731
1732 std::shared_ptr<endpoint> its_reliable = _info->get_endpoint(true);
1733 if (its_reliable) {
1734 auto its_new_option = create_ip_option(unicast_,
1735 its_reliable->get_local_port(), true);
1736 its_data.options_.push_back(its_new_option);
1737 }
1738
1739 std::shared_ptr<endpoint> its_unreliable = _info->get_endpoint(false);
1740 if (its_unreliable) {
1741 auto its_new_option = create_ip_option(unicast_,
1742 its_unreliable->get_local_port(), false);
1743 its_data.options_.push_back(its_new_option);
1744 }
1745
1746 auto its_entry = std::make_shared<serviceentry_impl>();
1747 if (its_entry) {
1748 its_data.entry_ = its_entry;
1749
1750 its_entry->set_type(entry_type_e::OFFER_SERVICE);
1751 its_entry->set_service(_info->get_service());
1752 its_entry->set_instance(_info->get_instance());
1753 its_entry->set_major_version(_info->get_major());
1754 its_entry->set_minor_version(_info->get_minor());
1755
1756 ttl_t its_ttl = _info->get_ttl();
1757 if (its_ttl > 0)
1758 its_ttl = ttl_;
1759 its_entry->set_ttl(its_ttl);
1760
1761 add_entry_data(_messages, its_data);
1762 } else {
1763 VSOMEIP_ERROR << __func__ << ": Failed to create service entry.";
1764 }
1765 }
1766
1767 void
process_eventgroupentry(std::shared_ptr<eventgroupentry_impl> & _entry,const std::vector<std::shared_ptr<option_impl>> & _options,std::shared_ptr<remote_subscription_ack> & _acknowledgement,const boost::asio::ip::address & _sender,const boost::asio::ip::address & _destination,bool _is_stop_subscribe_subscribe,bool _force_initial_events,const sd_acceptance_state_t & _sd_ac_state)1768 service_discovery_impl::process_eventgroupentry(
1769 std::shared_ptr<eventgroupentry_impl> &_entry,
1770 const std::vector<std::shared_ptr<option_impl> > &_options,
1771 std::shared_ptr<remote_subscription_ack> &_acknowledgement,
1772 const boost::asio::ip::address &_sender,
1773 const boost::asio::ip::address &_destination,
1774 bool _is_stop_subscribe_subscribe, bool _force_initial_events,
1775 const sd_acceptance_state_t& _sd_ac_state) {
1776
1777 std::set<client_t> its_clients({0}); // maybe overridden for selectives
1778
1779 auto its_sender = _acknowledgement->get_target_address();
1780 auto its_session = _entry->get_owning_message()->get_session();
1781
1782 service_t its_service = _entry->get_service();
1783 instance_t its_instance = _entry->get_instance();
1784 eventgroup_t its_eventgroup = _entry->get_eventgroup();
1785 entry_type_e its_type = _entry->get_type();
1786 major_version_t its_major = _entry->get_major_version();
1787 ttl_t its_ttl = _entry->get_ttl();
1788
1789 auto its_info = host_->find_eventgroup(
1790 its_service, its_instance, its_eventgroup);
1791 if (!its_info) {
1792 if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type) {
1793 // We received a subscription for a non-existing eventgroup.
1794 // --> Create dummy eventgroupinfo to send Nack.
1795 its_info = std::make_shared<eventgroupinfo>(its_service, its_instance,
1796 its_eventgroup, its_major, its_ttl, VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS);
1797 boost::system::error_code ec;
1798 VSOMEIP_ERROR << __func__
1799 << ": Received a SubscribeEventGroup entry for unknown eventgroup "
1800 << " from: " << its_sender.to_string(ec) << " for: ["
1801 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
1802 << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
1803 << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup
1804 << "] session: " << std::hex << std::setw(4) << std::setfill('0')
1805 << its_session << ", ttl: " << its_ttl;
1806 if (its_ttl > 0) {
1807 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
1808 }
1809 } else {
1810 // We received a subscription [n]ack for an eventgroup that does not exist.
1811 // --> Remove subscription.
1812 unsubscribe(its_service, its_instance, its_eventgroup, VSOMEIP_ROUTING_CLIENT);
1813
1814 boost::system::error_code ec;
1815 VSOMEIP_WARNING << __func__
1816 << ": Received a SubscribeEventGroup[N]Ack entry for unknown eventgroup "
1817 << " from: " << its_sender.to_string(ec) << " for: ["
1818 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
1819 << std::hex << std::setw(4) << std::setfill('0') << its_instance << "."
1820 << std::hex << std::setw(4) << std::setfill('0') << its_eventgroup
1821 << "] session: " << std::hex << std::setw(4) << std::setfill('0')
1822 << its_session << ", ttl: " << its_ttl;
1823 }
1824 return;
1825 }
1826
1827 if (_entry->get_owning_message()->get_return_code() != return_code) {
1828 boost::system::error_code ec;
1829 VSOMEIP_ERROR << __func__ << ": Invalid return code in SOMEIP/SD header "
1830 << its_sender.to_string(ec) << " session: "
1831 << std::hex << std::setw(4) << std::setfill('0') << its_session;
1832 if (its_ttl > 0) {
1833 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
1834 }
1835 return;
1836 }
1837
1838 if(its_type == entry_type_e::SUBSCRIBE_EVENTGROUP) {
1839 if (_destination.is_multicast() ) {
1840 boost::system::error_code ec;
1841 VSOMEIP_ERROR << __func__
1842 << ": Received a SubscribeEventGroup entry on multicast address "
1843 << its_sender.to_string(ec) << " session: "
1844 << std::hex << std::setw(4) << std::setfill('0') << its_session;
1845 if (its_ttl > 0) {
1846 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
1847 }
1848 return;
1849 }
1850 if (_entry->get_num_options(1) == 0
1851 && _entry->get_num_options(2) == 0) {
1852 boost::system::error_code ec;
1853 VSOMEIP_ERROR << __func__
1854 << ": Invalid number of options in SubscribeEventGroup entry "
1855 << its_sender.to_string(ec) << " session: "
1856 << std::hex << std::setw(4) << std::setfill('0') << its_session;
1857 if (its_ttl > 0) {
1858 // increase number of required acks by one as number required acks
1859 // is calculated based on the number of referenced options
1860 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
1861 }
1862 return;
1863 }
1864 if (_entry->get_owning_message()->get_options_length() < 12) {
1865 boost::system::error_code ec;
1866 VSOMEIP_ERROR << __func__
1867 << ": Invalid options length in SOMEIP/SD message "
1868 << its_sender.to_string(ec) << " session: "
1869 << std::hex << std::setw(4) << std::setfill('0') << its_session;
1870 if (its_ttl > 0) {
1871 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
1872 }
1873 return;
1874 }
1875 if (_options.size()
1876 // cast is needed in order to get unsigned type since int will be promoted
1877 // by the + operator on 16 bit or higher machines.
1878 < static_cast<std::vector<std::shared_ptr<option_impl>>::size_type>(
1879 (_entry->get_num_options(1)) + (_entry->get_num_options(2)))) {
1880 boost::system::error_code ec;
1881 VSOMEIP_ERROR << __func__
1882 << "Fewer options in SOMEIP/SD message than "
1883 "referenced in EventGroup entry or malformed option received "
1884 << its_sender.to_string(ec) << " session: "
1885 << std::hex << std::setw(4) << std::setfill('0') << its_session;
1886 if (its_ttl > 0) {
1887 // set to 0 to ensure an answer containing at least this subscribe_nack is sent out
1888 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
1889 }
1890 return;
1891 }
1892 if (_entry->get_owning_message()->get_someip_length()
1893 < _entry->get_owning_message()->get_length()
1894 && its_ttl > 0) {
1895 boost::system::error_code ec;
1896 VSOMEIP_ERROR << __func__
1897 << ": SOME/IP length field in SubscribeEventGroup message header: ["
1898 << std::dec << _entry->get_owning_message()->get_someip_length()
1899 << "] bytes, is shorter than length of deserialized message: ["
1900 << (uint32_t) _entry->get_owning_message()->get_length() << "] bytes. "
1901 << its_sender.to_string(ec) << " session: "
1902 << std::hex << std::setw(4) << std::setfill('0') << its_session;
1903 return;
1904 }
1905 }
1906
1907 boost::asio::ip::address its_first_address;
1908 uint16_t its_first_port(ILLEGAL_PORT);
1909 bool is_first_reliable(false);
1910 boost::asio::ip::address its_second_address;
1911 uint16_t its_second_port(ILLEGAL_PORT);
1912 bool is_second_reliable(false);
1913
1914 for (auto i : { 1, 2 }) {
1915 for (auto its_index : _entry->get_options(uint8_t(i))) {
1916 std::shared_ptr < option_impl > its_option;
1917 try {
1918 its_option = _options.at(its_index);
1919 } catch(const std::out_of_range& e) {
1920 #ifdef _WIN32
1921 e; // silence MSVC warning C4101
1922 #endif
1923 boost::system::error_code ec;
1924 VSOMEIP_ERROR << __func__
1925 << ": Fewer options in SD message than "
1926 "referenced in EventGroup entry for "
1927 "option run number: "
1928 << i << " "
1929 << its_sender.to_string(ec) << " session: "
1930 << std::hex << std::setw(4) << std::setfill('0')
1931 << its_session;
1932 if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type && its_ttl > 0) {
1933 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
1934 }
1935 return;
1936 }
1937 switch (its_option->get_type()) {
1938 case option_type_e::IP4_ENDPOINT: {
1939 if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type) {
1940 std::shared_ptr < ipv4_option_impl > its_ipv4_option =
1941 std::dynamic_pointer_cast < ipv4_option_impl
1942 > (its_option);
1943
1944 boost::asio::ip::address_v4 its_ipv4_address(
1945 its_ipv4_option->get_address());
1946 if (!check_layer_four_protocol(its_ipv4_option)) {
1947 if (its_ttl > 0) {
1948 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
1949 }
1950 return;
1951 }
1952
1953 if (its_first_port == ILLEGAL_PORT) {
1954 its_first_address = its_ipv4_address;
1955 its_first_port = its_ipv4_option->get_port();
1956 is_first_reliable = (its_ipv4_option->get_layer_four_protocol()
1957 == layer_four_protocol_e::TCP);
1958
1959 // reject subscription referencing two conflicting options of same protocol type
1960 // ID: SIP_SD_1144
1961 if (is_first_reliable == is_second_reliable
1962 && its_second_port != ILLEGAL_PORT) {
1963 if (its_ttl > 0) {
1964 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
1965 }
1966 boost::system::error_code ec;
1967 VSOMEIP_ERROR << __func__
1968 << ": Multiple IPv4 endpoint options of same kind referenced! "
1969 << its_sender.to_string(ec) << " session: "
1970 << std::hex << std::setw(4) << std::setfill('0') << its_session
1971 << " is_first_reliable: " << is_first_reliable;
1972 return;
1973 }
1974
1975 if (!check_ipv4_address(its_first_address)
1976 || 0 == its_first_port) {
1977 if (its_ttl > 0) {
1978 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
1979 }
1980 boost::system::error_code ec;
1981 VSOMEIP_ERROR << __func__
1982 << ": Invalid port or IP address in first IPv4 endpoint option specified! "
1983 << its_sender.to_string(ec) << " session: "
1984 << std::hex << std::setw(4) << std::setfill('0') << its_session;
1985 return;
1986 }
1987 } else
1988 if (its_second_port == ILLEGAL_PORT) {
1989 its_second_address = its_ipv4_address;
1990 its_second_port = its_ipv4_option->get_port();
1991 is_second_reliable = (its_ipv4_option->get_layer_four_protocol()
1992 == layer_four_protocol_e::TCP);
1993
1994 // reject subscription referencing two conflicting options of same protocol type
1995 // ID: SIP_SD_1144
1996 if (is_second_reliable == is_first_reliable
1997 && its_first_port != ILLEGAL_PORT) {
1998 if (its_ttl > 0) {
1999 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
2000 }
2001 boost::system::error_code ec;
2002 VSOMEIP_ERROR << __func__
2003 << ": Multiple IPv4 endpoint options of same kind referenced! "
2004 << its_sender.to_string(ec) << " session: "
2005 << std::hex << std::setw(4) << std::setfill('0') << its_session
2006 << " is_second_reliable: " << is_second_reliable;
2007 return;
2008 }
2009
2010 if (!check_ipv4_address(its_second_address)
2011 || 0 == its_second_port) {
2012 if (its_ttl > 0) {
2013 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
2014 }
2015 boost::system::error_code ec;
2016 VSOMEIP_ERROR << __func__
2017 << ": Invalid port or IP address in second IPv4 endpoint option specified! "
2018 << its_sender.to_string(ec) << " session: "
2019 << std::hex << std::setw(4) << std::setfill('0') << its_session;
2020 return;
2021 }
2022 } else {
2023 // TODO: error message, too many endpoint options!
2024 }
2025 } else {
2026 boost::system::error_code ec;
2027 VSOMEIP_ERROR << __func__
2028 << ": Invalid eventgroup option (IPv4 Endpoint)"
2029 << its_sender.to_string(ec) << " session: "
2030 << std::hex << std::setw(4) << std::setfill('0') << its_session;
2031 }
2032 break;
2033 }
2034 case option_type_e::IP6_ENDPOINT: {
2035 if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type) {
2036 std::shared_ptr < ipv6_option_impl > its_ipv6_option =
2037 std::dynamic_pointer_cast < ipv6_option_impl
2038 > (its_option);
2039
2040 boost::asio::ip::address_v6 its_ipv6_address(
2041 its_ipv6_option->get_address());
2042 if (!check_layer_four_protocol(its_ipv6_option)) {
2043 if(its_ttl > 0) {
2044 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
2045 }
2046 boost::system::error_code ec;
2047 VSOMEIP_ERROR << "Invalid layer 4 protocol type in IPv6 endpoint option specified! "
2048 << its_sender.to_string(ec) << " session: "
2049 << std::hex << std::setw(4) << std::setfill('0') << its_session;
2050 return;
2051 }
2052
2053 if (its_first_port == ILLEGAL_PORT) {
2054 its_first_address = its_ipv6_address;
2055 its_first_port = its_ipv6_option->get_port();
2056 is_first_reliable = (its_ipv6_option->get_layer_four_protocol()
2057 == layer_four_protocol_e::TCP);
2058
2059 if (is_first_reliable == is_second_reliable
2060 && its_second_port != ILLEGAL_PORT) {
2061 if (its_ttl > 0) {
2062 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
2063 }
2064 boost::system::error_code ec;
2065 VSOMEIP_ERROR << __func__
2066 << ": Multiple IPv6 endpoint options of same kind referenced! "
2067 << its_sender.to_string(ec) << " session: "
2068 << std::hex << std::setw(4) << std::setfill('0') << its_session
2069 << " is_first_reliable: " << is_first_reliable;
2070 return;
2071 }
2072 } else
2073 if (its_second_port == ILLEGAL_PORT) {
2074 its_second_address = its_ipv6_address;
2075 its_second_port = its_ipv6_option->get_port();
2076 is_second_reliable = (its_ipv6_option->get_layer_four_protocol()
2077 == layer_four_protocol_e::TCP);
2078
2079 if (is_second_reliable == is_first_reliable
2080 && its_first_port != ILLEGAL_PORT) {
2081 if (its_ttl > 0) {
2082 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
2083 }
2084 boost::system::error_code ec;
2085 VSOMEIP_ERROR << __func__
2086 << ": Multiple IPv6 endpoint options of same kind referenced! "
2087 << its_sender.to_string(ec) << " session: "
2088 << std::hex << std::setw(4) << std::setfill('0') << its_session
2089 << " is_second_reliable: " << is_second_reliable;
2090 return;
2091 }
2092 } else {
2093 // TODO: error message, too many endpoint options!
2094 }
2095 } else {
2096 boost::system::error_code ec;
2097 VSOMEIP_ERROR << __func__
2098 << ": Invalid eventgroup option (IPv6 Endpoint) "
2099 << its_sender.to_string(ec) << " session: "
2100 << std::hex << std::setw(4) << std::setfill('0') << its_session;
2101 }
2102 break;
2103 }
2104 case option_type_e::IP4_MULTICAST:
2105 if (entry_type_e::SUBSCRIBE_EVENTGROUP_ACK == its_type) {
2106 std::shared_ptr < ipv4_option_impl > its_ipv4_option =
2107 std::dynamic_pointer_cast < ipv4_option_impl
2108 > (its_option);
2109
2110 boost::asio::ip::address_v4 its_ipv4_address(
2111 its_ipv4_option->get_address());
2112
2113 if (its_first_port == ILLEGAL_PORT) {
2114 its_first_address = its_ipv4_address;
2115 its_first_port = its_ipv4_option->get_port();
2116 } else
2117 if (its_second_port == ILLEGAL_PORT) {
2118 its_second_address = its_ipv4_address;
2119 its_second_port = its_ipv4_option->get_port();
2120 } else {
2121 // TODO: error message, too many endpoint options!
2122 }
2123 // ID: SIP_SD_946, ID: SIP_SD_1144
2124 if (its_first_port != ILLEGAL_PORT
2125 && its_second_port != ILLEGAL_PORT) {
2126 boost::system::error_code ec;
2127 VSOMEIP_ERROR << __func__
2128 << ": Multiple IPv4 multicast options referenced! "
2129 << its_sender.to_string(ec) << " session: "
2130 << std::hex << std::setw(4) << std::setfill('0') << its_session;
2131 return;
2132 }
2133 } else {
2134 boost::system::error_code ec;
2135 VSOMEIP_ERROR << __func__
2136 << ": Invalid eventgroup option (IPv4 Multicast) "
2137 << its_sender.to_string(ec) << " session: "
2138 << std::hex << std::setw(4) << std::setfill('0') << its_session;
2139 }
2140 break;
2141 case option_type_e::IP6_MULTICAST:
2142 if (entry_type_e::SUBSCRIBE_EVENTGROUP_ACK == its_type) {
2143 std::shared_ptr < ipv6_option_impl > its_ipv6_option =
2144 std::dynamic_pointer_cast < ipv6_option_impl
2145 > (its_option);
2146
2147 boost::asio::ip::address_v6 its_ipv6_address(
2148 its_ipv6_option->get_address());
2149
2150 if (its_first_port == ILLEGAL_PORT) {
2151 its_first_address = its_ipv6_address;
2152 its_first_port = its_ipv6_option->get_port();
2153 } else
2154 if (its_second_port == ILLEGAL_PORT) {
2155 its_second_address = its_ipv6_address;
2156 its_second_port = its_ipv6_option->get_port();
2157 } else {
2158 // TODO: error message, too many endpoint options!
2159 }
2160 // ID: SIP_SD_946, ID: SIP_SD_1144
2161 if (its_first_port != ILLEGAL_PORT
2162 && its_second_port != ILLEGAL_PORT) {
2163 boost::system::error_code ec;
2164 VSOMEIP_ERROR << __func__
2165 << "Multiple IPv6 multicast options referenced! "
2166 << its_sender.to_string(ec) << " session: "
2167 << std::hex << std::setw(4) << std::setfill('0') << its_session;
2168 return;
2169 }
2170 } else {
2171 boost::system::error_code ec;
2172 VSOMEIP_ERROR << __func__
2173 << ": Invalid eventgroup option (IPv6 Multicast) "
2174 << its_sender.to_string(ec) << " session: "
2175 << std::hex << std::setw(4) << std::setfill('0') << its_session;
2176 }
2177 break;
2178 case option_type_e::CONFIGURATION: {
2179 break;
2180 }
2181 case option_type_e::SELECTIVE: {
2182 auto its_selective_option
2183 = std::dynamic_pointer_cast<selective_option_impl>(its_option);
2184 if (its_selective_option) {
2185 its_clients = its_selective_option->get_clients();
2186 }
2187 break;
2188 }
2189 case option_type_e::UNKNOWN:
2190 default:
2191 boost::system::error_code ec;
2192 VSOMEIP_WARNING << __func__
2193 << ": Unsupported eventgroup option ["
2194 << std::hex << (int)its_option->get_type() << "] "
2195 << its_sender.to_string(ec) << " session: "
2196 << std::hex << std::setw(4) << std::setfill('0') << its_session;
2197 if (its_ttl > 0) {
2198 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, its_clients);
2199 return;
2200 }
2201 break;
2202 }
2203 }
2204 }
2205
2206 if (entry_type_e::SUBSCRIBE_EVENTGROUP == its_type) {
2207 handle_eventgroup_subscription(its_service, its_instance,
2208 its_eventgroup, its_major, its_ttl, 0, 0,
2209 its_first_address, its_first_port, is_first_reliable,
2210 its_second_address, its_second_port, is_second_reliable,
2211 _acknowledgement, _is_stop_subscribe_subscribe,
2212 _force_initial_events, its_clients, _sd_ac_state, its_info);
2213 } else {
2214 if (entry_type_e::SUBSCRIBE_EVENTGROUP_ACK == its_type) { //this type is used for ACK and NACK messages
2215 if (its_ttl > 0) {
2216 handle_eventgroup_subscription_ack(its_service, its_instance,
2217 its_eventgroup, its_major, its_ttl, 0,
2218 its_clients, _sender,
2219 its_first_address, its_first_port);
2220 } else {
2221 handle_eventgroup_subscription_nack(its_service, its_instance, its_eventgroup,
2222 0, its_clients);
2223 }
2224 }
2225 }
2226 }
2227
2228 void
handle_eventgroup_subscription(service_t _service,instance_t _instance,eventgroup_t _eventgroup,major_version_t _major,ttl_t _ttl,uint8_t _counter,uint16_t _reserved,const boost::asio::ip::address & _first_address,uint16_t _first_port,bool _is_first_reliable,const boost::asio::ip::address & _second_address,uint16_t _second_port,bool _is_second_reliable,std::shared_ptr<remote_subscription_ack> & _acknowledgement,bool _is_stop_subscribe_subscribe,bool _force_initial_events,const std::set<client_t> & _clients,const sd_acceptance_state_t & _sd_ac_state,const std::shared_ptr<eventgroupinfo> & _info)2229 service_discovery_impl::handle_eventgroup_subscription(
2230 service_t _service, instance_t _instance,
2231 eventgroup_t _eventgroup, major_version_t _major,
2232 ttl_t _ttl, uint8_t _counter, uint16_t _reserved,
2233 const boost::asio::ip::address &_first_address, uint16_t _first_port,
2234 bool _is_first_reliable,
2235 const boost::asio::ip::address &_second_address, uint16_t _second_port,
2236 bool _is_second_reliable,
2237 std::shared_ptr<remote_subscription_ack> &_acknowledgement,
2238 bool _is_stop_subscribe_subscribe, bool _force_initial_events,
2239 const std::set<client_t> &_clients,
2240 const sd_acceptance_state_t& _sd_ac_state,
2241 const std::shared_ptr<eventgroupinfo>& _info) {
2242 (void)_counter;
2243 (void)_reserved;
2244
2245 auto its_messages = _acknowledgement->get_messages();
2246
2247 #ifndef VSOMEIP_ENABLE_COMPAT
2248 bool reliablility_nack(false);
2249 if (_info) {
2250 const bool first_port_set(_first_port != ILLEGAL_PORT);
2251 const bool second_port_set(_second_port != ILLEGAL_PORT);
2252 switch (_info->get_reliability()) {
2253 case reliability_type_e::RT_UNRELIABLE:
2254 if (!(first_port_set && !_is_first_reliable)
2255 && !(second_port_set && !_is_second_reliable)) {
2256 reliablility_nack = true;
2257 }
2258 break;
2259 case reliability_type_e::RT_RELIABLE:
2260 if (!(first_port_set && _is_first_reliable)
2261 && !(second_port_set && _is_second_reliable)) {
2262 reliablility_nack = true;
2263 }
2264 break;
2265 case reliability_type_e::RT_BOTH:
2266 if (_first_port == ILLEGAL_PORT || _second_port == ILLEGAL_PORT) {
2267 reliablility_nack = true;
2268 }
2269 if (_is_first_reliable == _is_second_reliable) {
2270 reliablility_nack = true;
2271 }
2272 break;
2273 default:
2274 break;
2275 }
2276 }
2277 if (reliablility_nack && _ttl > 0) {
2278 insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients);
2279 boost::system::error_code ec;
2280 // TODO: Add sender and session id
2281 VSOMEIP_WARNING << __func__
2282 << ": Subscription for ["
2283 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2284 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2285 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
2286 << " not valid: Event configuration ("
2287 << (std::uint32_t)_info->get_reliability()
2288 << ") does not match the provided endpoint options: "
2289 << _first_address.to_string(ec) << ":" << std::dec << _first_port << " "
2290 << _second_address.to_string(ec) << ":" << std::dec << _second_port;
2291
2292 return;
2293 }
2294
2295 #endif
2296
2297
2298 std::shared_ptr<endpoint_definition> its_subscriber;
2299 std::shared_ptr<endpoint_definition> its_reliable;
2300 std::shared_ptr<endpoint_definition> its_unreliable;
2301
2302 // wrong major version
2303 if (_major != _info->get_major()) {
2304 // Create a temporary info object with TTL=0 --> send NACK
2305 auto its_info = std::make_shared<eventgroupinfo>(_service, _instance,
2306 _eventgroup, _major, 0, VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS);
2307 boost::system::error_code ec;
2308 // TODO: Add session id
2309 VSOMEIP_ERROR << __func__
2310 << ": Requested major version:[" << (uint32_t) _major
2311 << "] in subscription to service: ["
2312 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2313 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2314 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
2315 << " does not match with services major version:["
2316 << (uint32_t) _info->get_major() << "] subscriber: "
2317 << _first_address.to_string(ec) << ":" << std::dec << _first_port;
2318 if (_ttl > 0) {
2319 insert_subscription_ack(_acknowledgement, its_info, 0, nullptr, _clients);
2320 }
2321 return;
2322 } else {
2323 boost::asio::ip::address its_first_address, its_second_address;
2324 if (ILLEGAL_PORT != _first_port) {
2325 uint16_t its_first_port(0);
2326 its_subscriber = endpoint_definition::get(
2327 _first_address, _first_port, _is_first_reliable, _service, _instance);
2328 if (!_is_first_reliable &&
2329 _info->get_multicast(its_first_address, its_first_port) &&
2330 _info->is_sending_multicast()) { // udp multicast
2331 its_unreliable = endpoint_definition::get(
2332 its_first_address, its_first_port, false, _service, _instance);
2333 } else if (_is_first_reliable) { // tcp unicast
2334 its_reliable = its_subscriber;
2335 // check if TCP connection is established by client
2336 if (_ttl > 0 && !is_tcp_connected(_service, _instance, its_reliable)) {
2337 insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients);
2338 boost::system::error_code ec;
2339 // TODO: Add sender and session id
2340 VSOMEIP_ERROR << "TCP connection to target1: ["
2341 << its_reliable->get_address().to_string()
2342 << ":" << its_reliable->get_port()
2343 << "] not established for subscription to: ["
2344 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2345 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2346 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] ";
2347 return;
2348 }
2349 } else { // udp unicast
2350 its_unreliable = its_subscriber;
2351 }
2352 }
2353
2354 if (ILLEGAL_PORT != _second_port) {
2355 uint16_t its_second_port(0);
2356 its_subscriber = endpoint_definition::get(
2357 _second_address, _second_port, _is_second_reliable, _service, _instance);
2358 if (!_is_second_reliable &&
2359 _info->get_multicast(its_second_address, its_second_port) &&
2360 _info->is_sending_multicast()) { // udp multicast
2361 its_unreliable = endpoint_definition::get(
2362 its_second_address, its_second_port, false, _service, _instance);
2363 } else if (_is_second_reliable) { // tcp unicast
2364 its_reliable = its_subscriber;
2365 // check if TCP connection is established by client
2366 if (_ttl > 0 && !is_tcp_connected(_service, _instance, its_reliable)) {
2367 insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients);
2368 boost::system::error_code ec;
2369 // TODO: Add sender and session id
2370 VSOMEIP_ERROR << "TCP connection to target2 : ["
2371 << its_reliable->get_address().to_string()
2372 << ":" << its_reliable->get_port()
2373 << "] not established for subscription to: ["
2374 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
2375 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
2376 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] ";
2377 return;
2378 }
2379 } else { // udp unicast
2380 its_unreliable = its_subscriber;
2381 }
2382 }
2383 }
2384
2385 // check if the subscription should be rejected because of sd_acceptance_handling
2386 if (_ttl > 0 && _sd_ac_state.sd_acceptance_required_) {
2387 bool insert_nack(false);
2388 if (_first_port != ILLEGAL_PORT && !_sd_ac_state.accept_entries_
2389 && configuration_->is_protected_port(_first_address,
2390 _first_port, _is_first_reliable)) {
2391 insert_nack = true;
2392 }
2393 if (!insert_nack && _second_port != ILLEGAL_PORT
2394 && !_sd_ac_state.accept_entries_
2395 && configuration_->is_protected_port(_second_address,
2396 _second_port, _is_second_reliable)) {
2397 insert_nack = true;
2398 }
2399 if (insert_nack) {
2400 insert_subscription_ack(_acknowledgement, _info, 0, nullptr, _clients);
2401 return;
2402 }
2403 }
2404
2405 if (its_subscriber) {
2406 // Create subscription object
2407 auto its_subscription = std::make_shared<remote_subscription>();
2408 its_subscription->set_eventgroupinfo(_info);
2409 its_subscription->set_subscriber(its_subscriber);
2410 its_subscription->set_reliable(its_reliable);
2411 its_subscription->set_unreliable(its_unreliable);
2412 its_subscription->reset(_clients);
2413
2414 if (_ttl == 0) { // --> unsubscribe
2415 its_subscription->set_ttl(0);
2416 if (!_is_stop_subscribe_subscribe) {
2417 {
2418 std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
2419 pending_remote_subscriptions_[its_subscription] = _acknowledgement;
2420 _acknowledgement->add_subscription(its_subscription);
2421 }
2422 host_->on_remote_unsubscribe(its_subscription);
2423 }
2424 return;
2425 }
2426
2427 if (_force_initial_events) {
2428 its_subscription->set_force_initial_events(true);
2429 }
2430 its_subscription->set_ttl(_ttl
2431 * get_ttl_factor(_service, _instance, ttl_factor_subscriptions_));
2432
2433 {
2434 std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
2435 pending_remote_subscriptions_[its_subscription] = _acknowledgement;
2436 _acknowledgement->add_subscription(its_subscription);
2437 }
2438
2439 host_->on_remote_subscribe(its_subscription,
2440 std::bind(&service_discovery_impl::update_remote_subscription,
2441 shared_from_this(), std::placeholders::_1));
2442 }
2443 }
2444
2445 void
handle_eventgroup_subscription_nack(service_t _service,instance_t _instance,eventgroup_t _eventgroup,uint8_t _counter,const std::set<client_t> & _clients)2446 service_discovery_impl::handle_eventgroup_subscription_nack(
2447 service_t _service, instance_t _instance, eventgroup_t _eventgroup,
2448 uint8_t _counter, const std::set<client_t> &_clients) {
2449 (void)_counter;
2450
2451 std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
2452 auto found_service = subscribed_.find(_service);
2453 if (found_service != subscribed_.end()) {
2454 auto found_instance = found_service->second.find(_instance);
2455 if (found_instance != found_service->second.end()) {
2456 auto found_eventgroup = found_instance->second.find(_eventgroup);
2457 if (found_eventgroup != found_instance->second.end()) {
2458 auto its_subscription = found_eventgroup->second;
2459 for (const auto& its_client : _clients) {
2460 host_->on_subscribe_nack(its_client,
2461 _service, _instance, _eventgroup, ANY_EVENT,
2462 PENDING_SUBSCRIPTION_ID, false); // TODO: This is a dummy call...
2463 }
2464
2465
2466 if (!its_subscription->is_selective()) {
2467 auto its_reliable = its_subscription->get_endpoint(true);
2468 if (its_reliable)
2469 its_reliable->restart();
2470 }
2471 }
2472 }
2473 }
2474 }
2475
2476 void
handle_eventgroup_subscription_ack(service_t _service,instance_t _instance,eventgroup_t _eventgroup,major_version_t _major,ttl_t _ttl,uint8_t _counter,const std::set<client_t> & _clients,const boost::asio::ip::address & _sender,const boost::asio::ip::address & _address,uint16_t _port)2477 service_discovery_impl::handle_eventgroup_subscription_ack(
2478 service_t _service, instance_t _instance, eventgroup_t _eventgroup,
2479 major_version_t _major, ttl_t _ttl, uint8_t _counter,
2480 const std::set<client_t> &_clients,
2481 const boost::asio::ip::address &_sender,
2482 const boost::asio::ip::address &_address, uint16_t _port) {
2483 (void)_major;
2484 (void)_ttl;
2485 (void)_counter;
2486
2487 std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
2488 auto found_service = subscribed_.find(_service);
2489 if (found_service != subscribed_.end()) {
2490 auto found_instance = found_service->second.find(_instance);
2491 if (found_instance != found_service->second.end()) {
2492 auto found_eventgroup = found_instance->second.find(_eventgroup);
2493 if (found_eventgroup != found_instance->second.end()) {
2494 for (const auto& its_client : _clients) {
2495 if (found_eventgroup->second->get_state(its_client)
2496 == subscription_state_e::ST_NOT_ACKNOWLEDGED) {
2497 found_eventgroup->second->set_state(its_client,
2498 subscription_state_e::ST_ACKNOWLEDGED);
2499 host_->on_subscribe_ack(its_client,
2500 _service, _instance, _eventgroup,
2501 ANY_EVENT, PENDING_SUBSCRIPTION_ID);
2502 }
2503 }
2504 if (_address.is_multicast()) {
2505 host_->on_subscribe_ack_with_multicast(
2506 _service, _instance, _sender, _address, _port);
2507 }
2508 }
2509 }
2510 }
2511 }
2512
is_tcp_connected(service_t _service,instance_t _instance,const std::shared_ptr<endpoint_definition> & its_endpoint)2513 bool service_discovery_impl::is_tcp_connected(service_t _service,
2514 instance_t _instance,
2515 const std::shared_ptr<endpoint_definition>& its_endpoint) {
2516 bool is_connected = false;
2517 std::shared_ptr<serviceinfo> its_info = host_->get_offered_service(_service,
2518 _instance);
2519 if (its_info) {
2520 //get reliable server endpoint
2521 auto its_reliable_server_endpoint = std::dynamic_pointer_cast<
2522 tcp_server_endpoint_impl>(its_info->get_endpoint(true));
2523 if (its_reliable_server_endpoint
2524 && its_reliable_server_endpoint->is_established(its_endpoint)) {
2525 is_connected = true;
2526 }
2527 }
2528 return is_connected;
2529 }
2530
2531 bool
send(const std::vector<std::shared_ptr<message_impl>> & _messages)2532 service_discovery_impl::send(
2533 const std::vector<std::shared_ptr<message_impl> > &_messages) {
2534 bool its_result(true);
2535 std::lock_guard<std::mutex> its_lock(serialize_mutex_);
2536 for (const auto &m : _messages) {
2537 if (m->has_entry()) {
2538 std::pair<session_t, bool> its_session = get_session(unicast_);
2539 m->set_session(its_session.first);
2540 m->set_reboot_flag(its_session.second);
2541 if (host_->send(VSOMEIP_SD_CLIENT, m)) {
2542 increment_session(unicast_);
2543 }
2544 } else {
2545 its_result = false;
2546 }
2547 }
2548 return its_result;
2549 }
2550
2551 bool
serialize_and_send(const std::vector<std::shared_ptr<message_impl>> & _messages,const boost::asio::ip::address & _address)2552 service_discovery_impl::serialize_and_send(
2553 const std::vector<std::shared_ptr<message_impl> > &_messages,
2554 const boost::asio::ip::address &_address) {
2555 bool its_result(true);
2556 if (!_address.is_unspecified()) {
2557 std::lock_guard<std::mutex> its_lock(serialize_mutex_);
2558 for (const auto &m : _messages) {
2559 if (m->has_entry()) {
2560 std::pair<session_t, bool> its_session = get_session(_address);
2561 m->set_session(its_session.first);
2562 m->set_reboot_flag(its_session.second);
2563
2564 if (serializer_->serialize(m.get())) {
2565 if (host_->send_via_sd(endpoint_definition::get(_address, port_,
2566 reliable_, m->get_service(), m->get_instance()),
2567 serializer_->get_data(), serializer_->get_size(),
2568 port_)) {
2569 increment_session(_address);
2570 }
2571 } else {
2572 VSOMEIP_ERROR << "service_discovery_impl::" << __func__
2573 << ": Serialization failed!";
2574 its_result = false;
2575 }
2576 serializer_->reset();
2577 } else {
2578 its_result = false;
2579 }
2580 }
2581 }
2582 return its_result;
2583 }
2584
2585 void
start_ttl_timer()2586 service_discovery_impl::start_ttl_timer() {
2587 std::lock_guard<std::mutex> its_lock(ttl_timer_mutex_);
2588 boost::system::error_code ec;
2589 ttl_timer_.expires_from_now(std::chrono::milliseconds(ttl_timer_runtime_), ec);
2590 ttl_timer_.async_wait(
2591 std::bind(&service_discovery_impl::check_ttl, shared_from_this(),
2592 std::placeholders::_1));
2593 }
2594
2595 void
stop_ttl_timer()2596 service_discovery_impl::stop_ttl_timer() {
2597 std::lock_guard<std::mutex> its_lock(ttl_timer_mutex_);
2598 boost::system::error_code ec;
2599 ttl_timer_.cancel(ec);
2600 }
2601
2602 void
check_ttl(const boost::system::error_code & _error)2603 service_discovery_impl::check_ttl(const boost::system::error_code &_error) {
2604 if (!_error) {
2605 {
2606 std::lock_guard<std::mutex> its_lock(check_ttl_mutex_);
2607 host_->update_routing_info(ttl_timer_runtime_);
2608 }
2609 start_ttl_timer();
2610 }
2611 }
2612
2613 bool
check_static_header_fields(const std::shared_ptr<const message> & _message) const2614 service_discovery_impl::check_static_header_fields(
2615 const std::shared_ptr<const message> &_message) const {
2616 if(_message->get_protocol_version() != protocol_version) {
2617 VSOMEIP_ERROR << "Invalid protocol version in SD header";
2618 return false;
2619 }
2620 if(_message->get_interface_version() != interface_version) {
2621 VSOMEIP_ERROR << "Invalid interface version in SD header";
2622 return false;
2623 }
2624 if(_message->get_message_type() != message_type) {
2625 VSOMEIP_ERROR << "Invalid message type in SD header";
2626 return false;
2627 }
2628 if(_message->get_return_code() > return_code_e::E_OK
2629 && _message->get_return_code()< return_code_e::E_UNKNOWN) {
2630 VSOMEIP_ERROR << "Invalid return code in SD header";
2631 return false;
2632 }
2633 return true;
2634 }
2635
2636 bool
check_layer_four_protocol(const std::shared_ptr<const ip_option_impl> & _ip_option) const2637 service_discovery_impl::check_layer_four_protocol(
2638 const std::shared_ptr<const ip_option_impl>& _ip_option) const {
2639 if (_ip_option->get_layer_four_protocol() == layer_four_protocol_e::UNKNOWN) {
2640 VSOMEIP_ERROR << "Invalid layer 4 protocol in IP endpoint option";
2641 return false;
2642 }
2643 return true;
2644 }
2645
2646 void
start_subscription_expiration_timer()2647 service_discovery_impl::start_subscription_expiration_timer() {
2648 std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_);
2649 start_subscription_expiration_timer_unlocked();
2650 }
2651
2652 void
start_subscription_expiration_timer_unlocked()2653 service_discovery_impl::start_subscription_expiration_timer_unlocked() {
2654 subscription_expiration_timer_.expires_at(next_subscription_expiration_);
2655 subscription_expiration_timer_.async_wait(
2656 std::bind(&service_discovery_impl::expire_subscriptions,
2657 shared_from_this(),
2658 std::placeholders::_1));
2659 }
2660
2661 void
stop_subscription_expiration_timer()2662 service_discovery_impl::stop_subscription_expiration_timer() {
2663 std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_);
2664 stop_subscription_expiration_timer_unlocked();
2665 }
2666
2667 void
stop_subscription_expiration_timer_unlocked()2668 service_discovery_impl::stop_subscription_expiration_timer_unlocked() {
2669 subscription_expiration_timer_.cancel();
2670 }
2671
2672 void
expire_subscriptions(const boost::system::error_code & _error)2673 service_discovery_impl::expire_subscriptions(
2674 const boost::system::error_code &_error) {
2675 if (!_error) {
2676 next_subscription_expiration_ = host_->expire_subscriptions(false);
2677 start_subscription_expiration_timer();
2678 }
2679 }
2680
2681 bool
check_ipv4_address(const boost::asio::ip::address & its_address) const2682 service_discovery_impl::check_ipv4_address(
2683 const boost::asio::ip::address& its_address) const {
2684 //Check unallowed ipv4 address
2685 bool is_valid = true;
2686
2687 const boost::asio::ip::address_v4::bytes_type its_unicast_address =
2688 unicast_.to_v4().to_bytes();
2689 const boost::asio::ip::address_v4::bytes_type endpoint_address =
2690 its_address.to_v4().to_bytes();
2691 const boost::asio::ip::address_v4::bytes_type its_netmask =
2692 configuration_->get_netmask().to_v4().to_bytes();
2693
2694 //same address as unicast address of DUT not allowed
2695 if (its_unicast_address == endpoint_address) {
2696 VSOMEIP_ERROR << "Subscriber's IP address is same as host's address! : "
2697 << its_address;
2698 is_valid = false;
2699 } else {
2700 const std::uint32_t self = VSOMEIP_BYTES_TO_LONG(its_unicast_address[0],
2701 its_unicast_address[1], its_unicast_address[2], its_unicast_address[3]);
2702 const std::uint32_t remote = VSOMEIP_BYTES_TO_LONG(endpoint_address[0],
2703 endpoint_address[1], endpoint_address[2], endpoint_address[3]);
2704 const std::uint32_t netmask = VSOMEIP_BYTES_TO_LONG(its_netmask[0],
2705 its_netmask[1], its_netmask[2], its_netmask[3]);
2706 if ((self & netmask) != (remote & netmask)) {
2707 VSOMEIP_ERROR<< "Subscriber's IP isn't in the same subnet as host's IP: "
2708 << its_address;
2709 is_valid = false;
2710 }
2711 }
2712 return is_valid;
2713 }
2714
2715 void
offer_service(const std::shared_ptr<serviceinfo> & _info)2716 service_discovery_impl::offer_service(const std::shared_ptr<serviceinfo> &_info) {
2717 service_t its_service = _info->get_service();
2718 service_t its_instance = _info->get_instance();
2719
2720 std::lock_guard<std::mutex> its_lock(collected_offers_mutex_);
2721 // check if offer is in map
2722 bool found(false);
2723 const auto its_service_it = collected_offers_.find(its_service);
2724 if (its_service_it != collected_offers_.end()) {
2725 const auto its_instance_it = its_service_it->second.find(its_instance);
2726 if (its_instance_it != its_service_it->second.end()) {
2727 found = true;
2728 }
2729 }
2730 if (!found) {
2731 collected_offers_[its_service][its_instance] = _info;
2732 }
2733 }
2734
2735 void
start_offer_debounce_timer(bool _first_start)2736 service_discovery_impl::start_offer_debounce_timer(bool _first_start) {
2737 std::lock_guard<std::mutex> its_lock(offer_debounce_timer_mutex_);
2738 boost::system::error_code ec;
2739 if (_first_start) {
2740 offer_debounce_timer_.expires_from_now(initial_delay_, ec);
2741 } else {
2742 offer_debounce_timer_.expires_from_now(offer_debounce_time_, ec);
2743 }
2744 if (ec) {
2745 VSOMEIP_ERROR<< "service_discovery_impl::start_offer_debounce_timer "
2746 "setting expiry time of timer failed: " << ec.message();
2747 }
2748 offer_debounce_timer_.async_wait(
2749 std::bind(&service_discovery_impl::on_offer_debounce_timer_expired,
2750 this, std::placeholders::_1));
2751 }
2752
2753
2754
2755 void
start_find_debounce_timer(bool _first_start)2756 service_discovery_impl::start_find_debounce_timer(bool _first_start) {
2757 std::lock_guard<std::mutex> its_lock(find_debounce_timer_mutex_);
2758 boost::system::error_code ec;
2759 if (_first_start) {
2760 find_debounce_timer_.expires_from_now(initial_delay_, ec);
2761 } else {
2762 find_debounce_timer_.expires_from_now(find_debounce_time_, ec);
2763 }
2764 if (ec) {
2765 VSOMEIP_ERROR<< "service_discovery_impl::start_find_debounce_timer "
2766 "setting expiry time of timer failed: " << ec.message();
2767 }
2768 find_debounce_timer_.async_wait(
2769 std::bind(
2770 &service_discovery_impl::on_find_debounce_timer_expired,
2771 this, std::placeholders::_1));
2772 }
2773
2774 // initial delay
2775 void
on_find_debounce_timer_expired(const boost::system::error_code & _error)2776 service_discovery_impl::on_find_debounce_timer_expired(
2777 const boost::system::error_code &_error) {
2778 if(_error) { // timer was canceled
2779 return;
2780 }
2781 // Only copy the accumulated requests of the initial wait phase
2782 // if the sent counter for the request is zero.
2783 requests_t repetition_phase_finds;
2784 bool new_finds(false);
2785 {
2786 std::lock_guard<std::mutex> its_lock(requested_mutex_);
2787 for (const auto& its_service : requested_) {
2788 for (const auto& its_instance : its_service.second) {
2789 if( its_instance.second->get_sent_counter() == 0) {
2790 repetition_phase_finds[its_service.first][its_instance.first] = its_instance.second;
2791 }
2792 }
2793 }
2794 if (repetition_phase_finds.size()) {
2795 new_finds = true;
2796 }
2797 }
2798
2799 if (!new_finds) {
2800 start_find_debounce_timer(false);
2801 return;
2802 }
2803
2804 // Sent out finds for the first time as initial wait phase ended
2805 std::vector<std::shared_ptr<message_impl>> its_messages;
2806 std::shared_ptr<message_impl> its_message(
2807 std::make_shared<message_impl>());
2808 its_messages.push_back(its_message);
2809 // Serialize and send FindService (increments sent counter in requested_ map)
2810 insert_find_entries(its_messages, repetition_phase_finds);
2811 send(its_messages);
2812
2813 std::chrono::milliseconds its_delay(repetitions_base_delay_);
2814 std::uint8_t its_repetitions(1);
2815
2816 std::shared_ptr<boost::asio::steady_timer> its_timer = std::make_shared<
2817 boost::asio::steady_timer>(host_->get_io());
2818 {
2819 std::lock_guard<std::mutex> its_lock(find_repetition_phase_timers_mutex_);
2820 find_repetition_phase_timers_[its_timer] = repetition_phase_finds;
2821 }
2822
2823 boost::system::error_code ec;
2824 its_timer->expires_from_now(its_delay, ec);
2825 if (ec) {
2826 VSOMEIP_ERROR<< "service_discovery_impl::on_find_debounce_timer_expired "
2827 "setting expiry time of timer failed: " << ec.message();
2828 }
2829 its_timer->async_wait(
2830 std::bind(
2831 &service_discovery_impl::on_find_repetition_phase_timer_expired,
2832 this, std::placeholders::_1, its_timer, its_repetitions,
2833 its_delay.count()));
2834 start_find_debounce_timer(false);
2835 }
2836
2837 void
on_offer_debounce_timer_expired(const boost::system::error_code & _error)2838 service_discovery_impl::on_offer_debounce_timer_expired(
2839 const boost::system::error_code &_error) {
2840 if(_error) { // timer was canceled
2841 return;
2842 }
2843
2844 // Copy the accumulated offers of the initial wait phase
2845 services_t repetition_phase_offers;
2846 bool new_offers(false);
2847 {
2848 std::vector<services_t::iterator> non_someip_services;
2849 std::lock_guard<std::mutex> its_lock(collected_offers_mutex_);
2850 if (collected_offers_.size()) {
2851 if (is_diagnosis_) {
2852 for (services_t::iterator its_service = collected_offers_.begin();
2853 its_service != collected_offers_.end(); its_service++) {
2854 for (const auto& its_instance : its_service->second) {
2855 if (!configuration_->is_someip(
2856 its_service->first, its_instance.first)) {
2857 non_someip_services.push_back(its_service);
2858 }
2859 }
2860 }
2861 for (auto its_service : non_someip_services) {
2862 repetition_phase_offers.insert(*its_service);
2863 collected_offers_.erase(its_service);
2864 }
2865 } else {
2866 repetition_phase_offers = collected_offers_;
2867 collected_offers_.clear();
2868 }
2869
2870 new_offers = true;
2871 }
2872 }
2873
2874 if (!new_offers) {
2875 start_offer_debounce_timer(false);
2876 return;
2877 }
2878
2879 // Sent out offers for the first time as initial wait phase ended
2880 std::vector<std::shared_ptr<message_impl>> its_messages;
2881 std::shared_ptr<message_impl> its_message(std::make_shared<message_impl>());
2882 its_messages.push_back(its_message);
2883 insert_offer_entries(its_messages, repetition_phase_offers, true);
2884
2885 // Serialize and send
2886 send(its_messages);
2887
2888 std::chrono::milliseconds its_delay(0);
2889 std::uint8_t its_repetitions(0);
2890 if (repetitions_max_) {
2891 // Start timer for repetition phase the first time
2892 // with 2^0 * repetitions_base_delay
2893 its_delay = repetitions_base_delay_;
2894 its_repetitions = 1;
2895 } else {
2896 // If repetitions_max is set to zero repetition phase is skipped,
2897 // therefore wait one cyclic offer delay before entering main phase
2898 its_delay = cyclic_offer_delay_;
2899 its_repetitions = 0;
2900 }
2901
2902 std::shared_ptr<boost::asio::steady_timer> its_timer = std::make_shared<
2903 boost::asio::steady_timer>(host_->get_io());
2904
2905 {
2906 std::lock_guard<std::mutex> its_lock(repetition_phase_timers_mutex_);
2907 repetition_phase_timers_[its_timer] = repetition_phase_offers;
2908 }
2909
2910 boost::system::error_code ec;
2911 its_timer->expires_from_now(its_delay, ec);
2912 if (ec) {
2913 VSOMEIP_ERROR<< "service_discovery_impl::on_offer_debounce_timer_expired "
2914 "setting expiry time of timer failed: " << ec.message();
2915 }
2916 its_timer->async_wait(
2917 std::bind(
2918 &service_discovery_impl::on_repetition_phase_timer_expired,
2919 this, std::placeholders::_1, its_timer, its_repetitions,
2920 its_delay.count()));
2921 start_offer_debounce_timer(false);
2922 }
2923
2924 void
on_repetition_phase_timer_expired(const boost::system::error_code & _error,const std::shared_ptr<boost::asio::steady_timer> & _timer,std::uint8_t _repetition,std::uint32_t _last_delay)2925 service_discovery_impl::on_repetition_phase_timer_expired(
2926 const boost::system::error_code &_error,
2927 const std::shared_ptr<boost::asio::steady_timer>& _timer,
2928 std::uint8_t _repetition, std::uint32_t _last_delay) {
2929 if (_error) {
2930 return;
2931 }
2932 if (_repetition == 0) {
2933 std::lock_guard<std::mutex> its_lock(repetition_phase_timers_mutex_);
2934 // We waited one cyclic offer delay, the offers can now be sent in the
2935 // main phase and the timer can be deleted
2936 move_offers_into_main_phase(_timer);
2937 } else {
2938 std::lock_guard<std::mutex> its_lock(repetition_phase_timers_mutex_);
2939 auto its_timer_pair = repetition_phase_timers_.find(_timer);
2940 if (its_timer_pair != repetition_phase_timers_.end()) {
2941 std::chrono::milliseconds new_delay(0);
2942 std::uint8_t repetition(0);
2943 bool move_to_main(false);
2944 if (_repetition <= repetitions_max_) {
2945 // Sent offers, double time to wait and start timer again.
2946
2947 new_delay = std::chrono::milliseconds(_last_delay * 2);
2948 repetition = ++_repetition;
2949 } else {
2950 // Repetition phase is now over we have to sleep one cyclic
2951 // offer delay before it's allowed to sent the offer again.
2952 // If the last offer was sent shorter than half the
2953 // configured cyclic_offer_delay_ago the offers are directly
2954 // moved into the mainphase to avoid potentially sleeping twice
2955 // the cyclic offer delay before moving the offers in to main
2956 // phase
2957 if (last_offer_shorter_half_offer_delay_ago()) {
2958 move_to_main = true;
2959 } else {
2960 new_delay = cyclic_offer_delay_;
2961 repetition = 0;
2962 }
2963 }
2964 std::vector<std::shared_ptr<message_impl>> its_messages;
2965 std::shared_ptr<message_impl> its_message(
2966 std::make_shared<message_impl>());
2967 its_messages.push_back(its_message);
2968 insert_offer_entries(its_messages, its_timer_pair->second, true);
2969
2970 // Serialize and send
2971 send(its_messages);
2972 if (move_to_main) {
2973 move_offers_into_main_phase(_timer);
2974 return;
2975 }
2976 boost::system::error_code ec;
2977 its_timer_pair->first->expires_from_now(new_delay, ec);
2978 if (ec) {
2979 VSOMEIP_ERROR <<
2980 "service_discovery_impl::on_repetition_phase_timer_expired "
2981 "setting expiry time of timer failed: " << ec.message();
2982 }
2983 its_timer_pair->first->async_wait(
2984 std::bind(
2985 &service_discovery_impl::on_repetition_phase_timer_expired,
2986 this, std::placeholders::_1, its_timer_pair->first,
2987 repetition, new_delay.count()));
2988 }
2989 }
2990 }
2991
2992
2993 void
on_find_repetition_phase_timer_expired(const boost::system::error_code & _error,const std::shared_ptr<boost::asio::steady_timer> & _timer,std::uint8_t _repetition,std::uint32_t _last_delay)2994 service_discovery_impl::on_find_repetition_phase_timer_expired(
2995 const boost::system::error_code &_error,
2996 const std::shared_ptr<boost::asio::steady_timer>& _timer,
2997 std::uint8_t _repetition, std::uint32_t _last_delay) {
2998 if (_error) {
2999 return;
3000 }
3001
3002 std::lock_guard<std::mutex> its_lock(find_repetition_phase_timers_mutex_);
3003 auto its_timer_pair = find_repetition_phase_timers_.find(_timer);
3004 if (its_timer_pair != find_repetition_phase_timers_.end()) {
3005 std::chrono::milliseconds new_delay(0);
3006 std::uint8_t repetition(0);
3007 if (_repetition <= repetitions_max_) {
3008 // Sent findService entries in one message, double time to wait and start timer again.
3009 std::vector<std::shared_ptr<message_impl>> its_messages;
3010 std::shared_ptr<message_impl> its_message(
3011 std::make_shared<message_impl>());
3012 its_messages.push_back(its_message);
3013 insert_find_entries(its_messages, its_timer_pair->second);
3014 send(its_messages);
3015 new_delay = std::chrono::milliseconds(_last_delay * 2);
3016 repetition = ++_repetition;
3017 } else {
3018 // Repetition phase is now over, erase the timer on next expiry time
3019 find_repetition_phase_timers_.erase(its_timer_pair);
3020 return;
3021 }
3022 boost::system::error_code ec;
3023 its_timer_pair->first->expires_from_now(new_delay, ec);
3024 if (ec) {
3025 VSOMEIP_ERROR << __func__
3026 << "setting expiry time of timer failed: " << ec.message();
3027 }
3028 its_timer_pair->first->async_wait(
3029 std::bind(
3030 &service_discovery_impl::on_find_repetition_phase_timer_expired,
3031 this, std::placeholders::_1, its_timer_pair->first,
3032 repetition, new_delay.count()));
3033 }
3034 }
3035
3036
3037 void
move_offers_into_main_phase(const std::shared_ptr<boost::asio::steady_timer> & _timer)3038 service_discovery_impl::move_offers_into_main_phase(
3039 const std::shared_ptr<boost::asio::steady_timer> &_timer) {
3040 // HINT: make sure to lock the repetition_phase_timers_mutex_ before calling
3041 // this function set flag on all serviceinfos bound to this timer that they
3042 // will be included in the cyclic offers from now on
3043 const auto its_timer = repetition_phase_timers_.find(_timer);
3044 if (its_timer != repetition_phase_timers_.end()) {
3045 for (const auto& its_service : its_timer->second) {
3046 for (const auto& instance : its_service.second) {
3047 instance.second->set_is_in_mainphase(true);
3048 }
3049 }
3050 repetition_phase_timers_.erase(_timer);
3051 }
3052 }
3053
3054 void
stop_offer_service(const std::shared_ptr<serviceinfo> & _info)3055 service_discovery_impl::stop_offer_service(
3056 const std::shared_ptr<serviceinfo> &_info) {
3057 std::lock_guard<std::mutex> its_lock(offer_mutex_);
3058 _info->set_ttl(0);
3059 const service_t its_service = _info->get_service();
3060 const instance_t its_instance = _info->get_instance();
3061 bool stop_offer_required(false);
3062 // Delete from initial phase offers
3063 {
3064 std::lock_guard<std::mutex> its_lock(collected_offers_mutex_);
3065 if (collected_offers_.size()) {
3066 auto its_service_it = collected_offers_.find(its_service);
3067 if (its_service_it != collected_offers_.end()) {
3068 auto its_instance_it = its_service_it->second.find(its_instance);
3069 if (its_instance_it != its_service_it->second.end()) {
3070 if (its_instance_it->second == _info) {
3071 its_service_it->second.erase(its_instance_it);
3072
3073 if (!collected_offers_[its_service].size()) {
3074 collected_offers_.erase(its_service_it);
3075 }
3076 }
3077 }
3078 }
3079 }
3080 // No need to sent out a stop offer message here as all services
3081 // instances contained in the collected offers weren't broadcasted yet
3082 }
3083
3084 // Delete from repetition phase offers
3085 {
3086 std::lock_guard<std::mutex> its_lock(repetition_phase_timers_mutex_);
3087 for (auto rpt = repetition_phase_timers_.begin();
3088 rpt != repetition_phase_timers_.end();) {
3089 auto its_service_it = rpt->second.find(its_service);
3090 if (its_service_it != rpt->second.end()) {
3091 auto its_instance_it = its_service_it->second.find(its_instance);
3092 if (its_instance_it != its_service_it->second.end()) {
3093 if (its_instance_it->second == _info) {
3094 its_service_it->second.erase(its_instance_it);
3095 stop_offer_required = true;
3096 if (!rpt->second[its_service].size()) {
3097 rpt->second.erase(its_service);
3098 }
3099 }
3100 }
3101 }
3102 if (!rpt->second.size()) {
3103 rpt = repetition_phase_timers_.erase(rpt);
3104 } else {
3105 ++rpt;
3106 }
3107 }
3108 }
3109 // Sent stop offer
3110 if(_info->is_in_mainphase() || stop_offer_required) {
3111 send_stop_offer(_info);
3112 }
3113 // sent out NACKs for all pending subscriptions
3114 // TODO: remote_subscription_not_acknowledge_all(its_service, its_instance);
3115 }
3116
3117 bool
send_stop_offer(const std::shared_ptr<serviceinfo> & _info)3118 service_discovery_impl::send_stop_offer(const std::shared_ptr<serviceinfo> &_info) {
3119
3120 if (_info->get_endpoint(false) || _info->get_endpoint(true)) {
3121 std::vector<std::shared_ptr<message_impl> > its_messages;
3122 std::shared_ptr<message_impl> its_current_message(
3123 std::make_shared<message_impl>());
3124 its_messages.push_back(its_current_message);
3125
3126 insert_offer_service(its_messages, _info);
3127
3128 // Serialize and send
3129 return send(its_messages);
3130 }
3131 return false;
3132 }
3133
3134 void
start_main_phase_timer()3135 service_discovery_impl::start_main_phase_timer() {
3136 std::lock_guard<std::mutex> its_lock(main_phase_timer_mutex_);
3137 boost::system::error_code ec;
3138 main_phase_timer_.expires_from_now(cyclic_offer_delay_);
3139 if (ec) {
3140 VSOMEIP_ERROR<< "service_discovery_impl::start_main_phase_timer "
3141 "setting expiry time of timer failed: " << ec.message();
3142 }
3143 main_phase_timer_.async_wait(
3144 std::bind(&service_discovery_impl::on_main_phase_timer_expired,
3145 this, std::placeholders::_1));
3146 }
3147
3148 void
on_main_phase_timer_expired(const boost::system::error_code & _error)3149 service_discovery_impl::on_main_phase_timer_expired(
3150 const boost::system::error_code &_error) {
3151 if (_error) {
3152 return;
3153 }
3154 send(true);
3155 start_main_phase_timer();
3156 }
3157
3158 void
send_uni_or_multicast_offerservice(const std::shared_ptr<const serviceinfo> & _info,bool _unicast_flag)3159 service_discovery_impl::send_uni_or_multicast_offerservice(
3160 const std::shared_ptr<const serviceinfo> &_info, bool _unicast_flag) {
3161 if (_unicast_flag) { // SID_SD_826
3162 if (last_offer_shorter_half_offer_delay_ago()) { // SIP_SD_89
3163 send_unicast_offer_service(_info);
3164 } else { // SIP_SD_90
3165 send_multicast_offer_service(_info);
3166 }
3167 } else { // SID_SD_826
3168 send_unicast_offer_service(_info);
3169 }
3170 }
3171
3172 bool
last_offer_shorter_half_offer_delay_ago()3173 service_discovery_impl::last_offer_shorter_half_offer_delay_ago() {
3174 // Get remaining time to next offer since last offer
3175 std::chrono::milliseconds remaining(0);
3176 {
3177 std::lock_guard<std::mutex> its_lock(main_phase_timer_mutex_);
3178 remaining = std::chrono::duration_cast<std::chrono::milliseconds>(
3179 main_phase_timer_.expires_from_now());
3180 }
3181 if (std::chrono::milliseconds(0) > remaining) {
3182 remaining = cyclic_offer_delay_;
3183 }
3184 const std::chrono::milliseconds half_cyclic_offer_delay =
3185 cyclic_offer_delay_ / 2;
3186
3187 return remaining > half_cyclic_offer_delay;
3188 }
3189
3190 bool
check_source_address(const boost::asio::ip::address & its_source_address) const3191 service_discovery_impl::check_source_address(
3192 const boost::asio::ip::address &its_source_address) const {
3193
3194 bool is_valid = true;
3195 // Check if source address is same as nodes unicast address
3196 if (unicast_ == its_source_address) {
3197 VSOMEIP_ERROR << "Source address of message is same as DUT's unicast address! : "
3198 << its_source_address.to_string();
3199 is_valid = false;
3200 }
3201 return is_valid;
3202 }
3203
3204 void
set_diagnosis_mode(const bool _activate)3205 service_discovery_impl::set_diagnosis_mode(const bool _activate) {
3206
3207 is_diagnosis_ = _activate;
3208 }
3209
3210 bool
get_diagnosis_mode()3211 service_discovery_impl::get_diagnosis_mode() {
3212
3213 return is_diagnosis_;
3214 }
3215
3216 void
update_remote_subscription(const std::shared_ptr<remote_subscription> & _subscription)3217 service_discovery_impl::update_remote_subscription(
3218 const std::shared_ptr<remote_subscription> &_subscription) {
3219
3220 if (!_subscription->is_pending() || 0 == _subscription->get_answers()) {
3221 std::shared_ptr<remote_subscription_ack> its_ack;
3222 {
3223 std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
3224 auto found_ack = pending_remote_subscriptions_.find(_subscription);
3225 if (found_ack != pending_remote_subscriptions_.end()) {
3226 its_ack = found_ack->second;
3227 }
3228 }
3229 if (its_ack) {
3230 std::unique_lock<std::recursive_mutex> its_lock(its_ack->get_lock());
3231 update_acknowledgement(its_ack);
3232 }
3233 }
3234 }
3235
3236 void
update_acknowledgement(const std::shared_ptr<remote_subscription_ack> & _acknowledgement)3237 service_discovery_impl::update_acknowledgement(
3238 const std::shared_ptr<remote_subscription_ack> &_acknowledgement) {
3239
3240 if (_acknowledgement->is_complete()
3241 && !_acknowledgement->is_pending()
3242 && !_acknowledgement->is_done()) {
3243
3244 send_subscription_ack(_acknowledgement);
3245
3246 std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
3247 for (const auto &its_subscription : _acknowledgement->get_subscriptions())
3248 pending_remote_subscriptions_.erase(its_subscription);
3249 }
3250 }
3251
3252
3253 void
update_subscription_expiration_timer(const std::vector<std::shared_ptr<message_impl>> & _messages)3254 service_discovery_impl::update_subscription_expiration_timer(
3255 const std::vector<std::shared_ptr<message_impl> > &_messages) {
3256 std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_);
3257 const std::chrono::steady_clock::time_point now =
3258 std::chrono::steady_clock::now();
3259 stop_subscription_expiration_timer_unlocked();
3260 for (const auto &m : _messages) {
3261 for (const auto &e : m->get_entries()) {
3262 if (e && e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK
3263 && e->get_ttl()) {
3264 const std::chrono::steady_clock::time_point its_expiration = now
3265 + std::chrono::seconds(e->get_ttl()
3266 * get_ttl_factor(
3267 e->get_service(), e->get_instance(),
3268 ttl_factor_subscriptions_));
3269 if (its_expiration < next_subscription_expiration_) {
3270 next_subscription_expiration_ = its_expiration;
3271 }
3272 }
3273 }
3274 }
3275 start_subscription_expiration_timer_unlocked();
3276 }
3277
3278 bool
check_stop_subscribe_subscribe(message_impl::entries_t::const_iterator _iter,message_impl::entries_t::const_iterator _end,const message_impl::options_t & _options) const3279 service_discovery_impl::check_stop_subscribe_subscribe(
3280 message_impl::entries_t::const_iterator _iter,
3281 message_impl::entries_t::const_iterator _end,
3282 const message_impl::options_t& _options) const {
3283 const message_impl::entries_t::const_iterator its_next = std::next(_iter);
3284 if ((*_iter)->get_ttl() > 0
3285 || (*_iter)->get_type() != entry_type_e::STOP_SUBSCRIBE_EVENTGROUP
3286 || its_next == _end
3287 || (*its_next)->get_type() != entry_type_e::SUBSCRIBE_EVENTGROUP) {
3288 return false;
3289 }
3290
3291 return (*static_cast<eventgroupentry_impl*>(_iter->get())).matches(
3292 *(static_cast<eventgroupentry_impl*>(its_next->get())), _options);
3293 }
3294
3295 bool
has_opposite(message_impl::entries_t::const_iterator _iter,message_impl::entries_t::const_iterator _end,const message_impl::options_t & _options) const3296 service_discovery_impl::has_opposite(
3297 message_impl::entries_t::const_iterator _iter,
3298 message_impl::entries_t::const_iterator _end,
3299 const message_impl::options_t &_options) const {
3300 const auto its_entry = std::dynamic_pointer_cast<eventgroupentry_impl>(*_iter);
3301 auto its_other = std::next(_iter);
3302 for (; its_other != _end; its_other++) {
3303 if ((*its_other)->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP) {
3304 const auto its_other_entry
3305 = std::dynamic_pointer_cast<eventgroupentry_impl>(*its_other);
3306 if ((its_entry->get_ttl() == 0 && its_other_entry->get_ttl() > 0)
3307 || (its_entry->get_ttl() > 0 && its_other_entry->get_ttl() == 0)) {
3308 if (its_entry->matches(*(its_other_entry.get()), _options))
3309 return true;
3310 }
3311 }
3312 }
3313 return false;
3314 }
3315
3316 bool
has_same(message_impl::entries_t::const_iterator _iter,message_impl::entries_t::const_iterator _end,const message_impl::options_t & _options) const3317 service_discovery_impl::has_same(
3318 message_impl::entries_t::const_iterator _iter,
3319 message_impl::entries_t::const_iterator _end,
3320 const message_impl::options_t &_options) const {
3321 const auto its_entry = std::dynamic_pointer_cast<eventgroupentry_impl>(*_iter);
3322 auto its_other = std::next(_iter);
3323 for (; its_other != _end; its_other++) {
3324 if (its_entry->get_type() == (*its_other)->get_type()) {
3325 const auto its_other_entry
3326 = std::dynamic_pointer_cast<eventgroupentry_impl>(*its_other);
3327 if (its_entry->get_ttl() == its_other_entry->get_ttl()
3328 && its_entry->matches(*(its_other_entry.get()), _options)) {
3329 return true;
3330 }
3331 }
3332 }
3333 return false;
3334 }
3335
3336 bool
is_subscribed(const std::shared_ptr<eventgroupentry_impl> & _entry,const message_impl::options_t & _options) const3337 service_discovery_impl::is_subscribed(
3338 const std::shared_ptr<eventgroupentry_impl> &_entry,
3339 const message_impl::options_t &_options) const {
3340 const auto its_service = _entry->get_service();
3341 const auto its_instance = _entry->get_instance();
3342 auto its_info = host_->find_eventgroup(
3343 its_service, its_instance, _entry->get_eventgroup());
3344 if (its_info) {
3345 std::shared_ptr<endpoint_definition> its_reliable, its_unreliable;
3346 for (const auto& o : _options) {
3347 if (o->get_type() == option_type_e::IP4_ENDPOINT) {
3348 const auto its_endpoint_option
3349 = std::dynamic_pointer_cast<ipv4_option_impl>(o);
3350 if (its_endpoint_option) {
3351 if (its_endpoint_option->get_layer_four_protocol()
3352 == layer_four_protocol_e::TCP) {
3353 its_reliable = endpoint_definition::get(
3354 boost::asio::ip::address_v4(
3355 its_endpoint_option->get_address()),
3356 its_endpoint_option->get_port(),
3357 true,
3358 its_service, its_instance);
3359 } else if (its_endpoint_option->get_layer_four_protocol()
3360 == layer_four_protocol_e::UDP) {
3361 its_unreliable = endpoint_definition::get(
3362 boost::asio::ip::address_v4(
3363 its_endpoint_option->get_address()),
3364 its_endpoint_option->get_port(),
3365 false,
3366 its_service, its_instance);
3367 }
3368 }
3369 } else if (o->get_type() == option_type_e::IP6_ENDPOINT) {
3370 const auto its_endpoint_option
3371 = std::dynamic_pointer_cast<ipv6_option_impl>(o);
3372 if (its_endpoint_option->get_layer_four_protocol()
3373 == layer_four_protocol_e::TCP) {
3374 its_reliable = endpoint_definition::get(
3375 boost::asio::ip::address_v6(
3376 its_endpoint_option->get_address()),
3377 its_endpoint_option->get_port(),
3378 true,
3379 its_service, its_instance);
3380 } else if (its_endpoint_option->get_layer_four_protocol()
3381 == layer_four_protocol_e::UDP) {
3382 its_unreliable = endpoint_definition::get(
3383 boost::asio::ip::address_v6(
3384 its_endpoint_option->get_address()),
3385 its_endpoint_option->get_port(),
3386 false,
3387 its_service, its_instance);
3388 }
3389 }
3390 }
3391 if (its_reliable || its_unreliable) {
3392 for (const auto& its_subscription : its_info->get_remote_subscriptions()) {
3393 if ((!its_reliable || its_subscription->get_reliable() == its_reliable)
3394 && (!its_unreliable || its_subscription->get_unreliable() == its_unreliable)) {
3395 return true;
3396 }
3397 }
3398 }
3399 }
3400 return false;
3401 }
3402
3403 configuration::ttl_factor_t
get_ttl_factor(service_t _service,instance_t _instance,const configuration::ttl_map_t & _ttl_map) const3404 service_discovery_impl::get_ttl_factor(
3405 service_t _service, instance_t _instance,
3406 const configuration::ttl_map_t& _ttl_map) const {
3407 configuration::ttl_factor_t its_ttl_factor(1);
3408 auto found_service = _ttl_map.find(_service);
3409 if (found_service != _ttl_map.end()) {
3410 auto found_instance = found_service->second.find(_instance);
3411 if (found_instance != found_service->second.end()) {
3412 its_ttl_factor = found_instance->second;
3413 }
3414 }
3415 return its_ttl_factor;
3416 }
3417
3418 void
on_last_msg_received_timer_expired(const boost::system::error_code & _error)3419 service_discovery_impl::on_last_msg_received_timer_expired(
3420 const boost::system::error_code &_error) {
3421
3422 if (!_error) {
3423 // We didn't receive a multicast message within 110% of the cyclic_offer_delay_
3424 VSOMEIP_WARNING << "Didn't receive a multicast SD message for " <<
3425 std::dec << last_msg_received_timer_timeout_.count() << "ms.";
3426
3427 // Rejoin multicast group
3428 if (endpoint_ && !reliable_) {
3429 auto its_endpoint = std::dynamic_pointer_cast<
3430 udp_server_endpoint_impl>(endpoint_);
3431 if (its_endpoint)
3432 its_endpoint->join(sd_multicast_);
3433 }
3434 {
3435 boost::system::error_code ec;
3436 std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_);
3437 last_msg_received_timer_.expires_from_now(last_msg_received_timer_timeout_, ec);
3438 last_msg_received_timer_.async_wait(
3439 std::bind(
3440 &service_discovery_impl::on_last_msg_received_timer_expired,
3441 shared_from_this(), std::placeholders::_1));
3442 }
3443 }
3444 }
3445
3446 void
stop_last_msg_received_timer()3447 service_discovery_impl::stop_last_msg_received_timer() {
3448 std::lock_guard<std::mutex> its_lock(last_msg_received_timer_mutex_);
3449 boost::system::error_code ec;
3450 last_msg_received_timer_.cancel(ec);
3451 }
3452
3453 service_discovery_impl::remote_offer_type_e
get_remote_offer_type(service_t _service,instance_t _instance) const3454 service_discovery_impl::get_remote_offer_type(
3455 service_t _service, instance_t _instance) const {
3456 std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_);
3457 auto found_si = remote_offer_types_.find(std::make_pair(_service, _instance));
3458 if (found_si != remote_offer_types_.end()) {
3459 return found_si->second;
3460 }
3461 return remote_offer_type_e::UNKNOWN;
3462 }
3463
3464 service_discovery_impl::remote_offer_type_e
get_remote_offer_type(const std::shared_ptr<subscription> & _subscription) const3465 service_discovery_impl::get_remote_offer_type(
3466 const std::shared_ptr<subscription> &_subscription) const {
3467 bool has_reliable = (_subscription->get_endpoint(true) != nullptr);
3468 bool has_unreliable = (_subscription->get_endpoint(false) != nullptr);
3469
3470 return (has_reliable ?
3471 (has_unreliable ?
3472 remote_offer_type_e::RELIABLE_UNRELIABLE :
3473 remote_offer_type_e::RELIABLE) :
3474 (has_unreliable ?
3475 remote_offer_type_e::UNRELIABLE :
3476 remote_offer_type_e::UNKNOWN));
3477 }
3478
3479
3480 bool
update_remote_offer_type(service_t _service,instance_t _instance,remote_offer_type_e _offer_type,const boost::asio::ip::address & _reliable_address,std::uint16_t _reliable_port,const boost::asio::ip::address & _unreliable_address,std::uint16_t _unreliable_port)3481 service_discovery_impl::update_remote_offer_type(
3482 service_t _service, instance_t _instance,
3483 remote_offer_type_e _offer_type,
3484 const boost::asio::ip::address &_reliable_address,
3485 std::uint16_t _reliable_port,
3486 const boost::asio::ip::address &_unreliable_address,
3487 std::uint16_t _unreliable_port) {
3488 bool ret(false);
3489 std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_);
3490 const std::pair<service_t, instance_t> its_si_pair = std::make_pair(_service, _instance);
3491 auto found_si = remote_offer_types_.find(its_si_pair);
3492 if (found_si != remote_offer_types_.end()) {
3493 if (found_si->second != _offer_type ) {
3494 found_si->second = _offer_type;
3495 ret = true;
3496 }
3497 } else {
3498 remote_offer_types_[its_si_pair] = _offer_type;
3499 }
3500 switch (_offer_type) {
3501 case remote_offer_type_e::UNRELIABLE:
3502 remote_offers_by_ip_[_unreliable_address][std::make_pair(false,
3503 _unreliable_port)].insert(its_si_pair);
3504 break;
3505 case remote_offer_type_e::RELIABLE:
3506 remote_offers_by_ip_[_reliable_address][std::make_pair(true,
3507 _reliable_port)].insert(its_si_pair);
3508 break;
3509 case remote_offer_type_e::RELIABLE_UNRELIABLE:
3510 remote_offers_by_ip_[_unreliable_address][std::make_pair(false,
3511 _unreliable_port)].insert(its_si_pair);
3512 remote_offers_by_ip_[_unreliable_address][std::make_pair(true,
3513 _reliable_port)].insert(its_si_pair);
3514 break;
3515 case remote_offer_type_e::UNKNOWN:
3516 default:
3517 VSOMEIP_WARNING << __func__ << ": unknown offer type ["
3518 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
3519 << std::hex << std::setw(4) << std::setfill('0') << _instance << "]"
3520 << _offer_type;
3521 break;
3522 }
3523 return ret;
3524 }
3525
3526 void
remove_remote_offer_type(service_t _service,instance_t _instance,const boost::asio::ip::address & _reliable_address,std::uint16_t _reliable_port,const boost::asio::ip::address & _unreliable_address,std::uint16_t _unreliable_port)3527 service_discovery_impl::remove_remote_offer_type(
3528 service_t _service, instance_t _instance,
3529 const boost::asio::ip::address &_reliable_address,
3530 std::uint16_t _reliable_port,
3531 const boost::asio::ip::address &_unreliable_address,
3532 std::uint16_t _unreliable_port) {
3533 std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_);
3534 const std::pair<service_t, instance_t> its_si_pair =
3535 std::make_pair(_service, _instance);
3536 remote_offer_types_.erase(its_si_pair);
3537
3538 auto delete_from_remote_offers_by_ip = [&](
3539 const boost::asio::ip::address& _address, std::uint16_t _port,
3540 bool _reliable) {
3541 const auto found_address = remote_offers_by_ip_.find(_address);
3542 if (found_address != remote_offers_by_ip_.end()) {
3543 auto found_port = found_address->second.find(
3544 std::make_pair(_reliable, _port));
3545 if (found_port != found_address->second.end()) {
3546 if (found_port->second.erase(std::make_pair(_service, _instance))) {
3547 if (found_port->second.empty()) {
3548 found_address->second.erase(found_port);
3549 if (found_address->second.empty()) {
3550 remote_offers_by_ip_.erase(found_address);
3551 }
3552 }
3553 }
3554 }
3555 }
3556 };
3557 if (_reliable_port != ILLEGAL_PORT) {
3558 delete_from_remote_offers_by_ip(_reliable_address, _reliable_port,
3559 true);
3560 }
3561 if (_unreliable_port != ILLEGAL_PORT) {
3562 delete_from_remote_offers_by_ip(_unreliable_address, _unreliable_port,
3563 false);
3564 }
3565 }
3566
remove_remote_offer_type_by_ip(const boost::asio::ip::address & _address)3567 void service_discovery_impl::remove_remote_offer_type_by_ip(
3568 const boost::asio::ip::address &_address) {
3569 remove_remote_offer_type_by_ip(_address, ANY_PORT, false);
3570 }
3571
remove_remote_offer_type_by_ip(const boost::asio::ip::address & _address,std::uint16_t _port,bool _reliable)3572 void service_discovery_impl::remove_remote_offer_type_by_ip(
3573 const boost::asio::ip::address &_address, std::uint16_t _port, bool _reliable) {
3574 std::lock_guard<std::mutex> its_lock(remote_offer_types_mutex_);
3575 const auto found_address = remote_offers_by_ip_.find(_address);
3576 if (found_address != remote_offers_by_ip_.end()) {
3577 if (_port == ANY_PORT) {
3578 for (const auto& port : found_address->second) {
3579 for (const auto& si : port.second) {
3580 remote_offer_types_.erase(si);
3581 }
3582 }
3583 remote_offers_by_ip_.erase(_address);
3584 } else {
3585 const auto its_port_reliability = std::make_pair(_reliable, _port);
3586 const auto found_port = found_address->second.find(its_port_reliability);
3587 if (found_port != found_address->second.end()) {
3588 for (const auto& si : found_port->second) {
3589 remote_offer_types_.erase(si);
3590 }
3591 found_address->second.erase(found_port);
3592 if (found_address->second.empty()) {
3593 remote_offers_by_ip_.erase(found_address);
3594 }
3595 }
3596 }
3597 }
3598 }
3599
3600 std::shared_ptr<subscription>
create_subscription(major_version_t _major,ttl_t _ttl,const std::shared_ptr<endpoint> & _reliable,const std::shared_ptr<endpoint> & _unreliable,const std::shared_ptr<eventgroupinfo> & _info)3601 service_discovery_impl::create_subscription(
3602 major_version_t _major, ttl_t _ttl,
3603 const std::shared_ptr<endpoint> &_reliable,
3604 const std::shared_ptr<endpoint> &_unreliable,
3605 const std::shared_ptr<eventgroupinfo> &_info) {
3606 auto its_subscription = std::make_shared<subscription>();
3607 its_subscription->set_major(_major);
3608 its_subscription->set_ttl(_ttl);
3609
3610 if (_reliable) {
3611 its_subscription->set_endpoint(_reliable, true);
3612 its_subscription->set_tcp_connection_established(_reliable->is_established());
3613 }
3614
3615 if (_unreliable) {
3616 its_subscription->set_endpoint(_unreliable, false);
3617 its_subscription->set_udp_connection_established(_unreliable->is_established());
3618 }
3619
3620 // check whether the eventgroup is selective
3621 its_subscription->set_selective(_info->is_selective());
3622
3623 its_subscription->set_eventgroupinfo(_info);
3624
3625 return its_subscription;
3626 }
3627
3628 void
send_subscription_ack(const std::shared_ptr<remote_subscription_ack> & _acknowledgement)3629 service_discovery_impl::send_subscription_ack(
3630 const std::shared_ptr<remote_subscription_ack> &_acknowledgement) {
3631
3632 if (_acknowledgement->is_done())
3633 return;
3634
3635 _acknowledgement->done();
3636
3637 std::uint32_t its_max_answers(1); // Must be 1 as "_acknowledgement" not
3638 // necessarily contains subscriptions
3639 bool do_not_answer(false);
3640 std::shared_ptr<remote_subscription> its_parent;
3641
3642 // Find highest number of necessary answers
3643 for (const auto& its_subscription : _acknowledgement->get_subscriptions()) {
3644 auto its_answers = its_subscription->get_answers();
3645 if (its_answers > its_max_answers) {
3646 its_max_answers = its_answers;
3647 } else if (its_answers == 0) {
3648 do_not_answer = true;
3649 its_parent = its_subscription->get_parent();
3650 }
3651 }
3652
3653 if (do_not_answer) {
3654 if (its_parent) {
3655 std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
3656 auto its_parent_ack = pending_remote_subscriptions_[its_parent];
3657 if (its_parent_ack) {
3658 for (const auto &its_subscription : its_parent_ack->get_subscriptions()) {
3659 if (its_subscription != its_parent)
3660 its_subscription->set_answers(its_subscription->get_answers() + 1);
3661 }
3662 }
3663 }
3664 return;
3665 }
3666
3667 // send messages
3668 for (std::uint32_t i = 0; i < its_max_answers; i++) {
3669 for (const auto &its_subscription : _acknowledgement->get_subscriptions()) {
3670 if (i < its_subscription->get_answers()) {
3671 if (its_subscription->get_ttl() > 0) {
3672 auto its_info = its_subscription->get_eventgroupinfo();
3673 if (its_info) {
3674 std::set<client_t> its_acked;
3675 std::set<client_t> its_nacked;
3676 for (const auto& its_client : its_subscription->get_clients()) {
3677 if (its_subscription->get_client_state(its_client)
3678 == remote_subscription_state_e::SUBSCRIPTION_ACKED) {
3679 its_acked.insert(its_client);
3680 } else {
3681 its_nacked.insert(its_client);
3682 }
3683 }
3684
3685 if (0 < its_acked.size()) {
3686 insert_subscription_ack(_acknowledgement, its_info,
3687 its_subscription->get_ttl(),
3688 its_subscription->get_subscriber(), its_acked);
3689 }
3690
3691 if (0 < its_nacked.size()) {
3692 insert_subscription_ack(_acknowledgement, its_info,
3693 0,
3694 its_subscription->get_subscriber(), its_nacked);
3695 }
3696 }
3697 }
3698 }
3699 }
3700
3701 auto its_messages = _acknowledgement->get_messages();
3702 serialize_and_send(its_messages, _acknowledgement->get_target_address());
3703 update_subscription_expiration_timer(its_messages);
3704 }
3705
3706 std::this_thread::yield();
3707
3708 // We might need to send initial events
3709 for (const auto &its_subscription : _acknowledgement->get_subscriptions()) {
3710 // Assumption: We do _NOT_ need to check whether this is a child
3711 // subscription, as this only applies to selective events, which
3712 // are owned by exclusive event groups.
3713 if (its_subscription->get_ttl() > 0
3714 && its_subscription->is_initial()) {
3715 its_subscription->set_initial(false);
3716 auto its_info = its_subscription->get_eventgroupinfo();
3717 if (its_info) {
3718 its_info->send_initial_events(
3719 its_subscription->get_reliable(),
3720 its_subscription->get_unreliable());
3721 }
3722 }
3723 }
3724 }
3725
3726 void
add_entry_data(std::vector<std::shared_ptr<message_impl>> & _messages,const entry_data_t & _data)3727 service_discovery_impl::add_entry_data(
3728 std::vector<std::shared_ptr<message_impl> > &_messages,
3729 const entry_data_t &_data) {
3730 auto its_current_message = _messages.back();
3731 const auto is_fitting = its_current_message->add_entry_data(
3732 _data.entry_, _data.options_, _data.other_);
3733 if (!is_fitting) {
3734 its_current_message = std::make_shared<message_impl>();
3735 (void)its_current_message->add_entry_data(
3736 _data.entry_, _data.options_, _data.other_);
3737 _messages.push_back(its_current_message);
3738 }
3739 }
3740
3741 void
add_entry_data_to_remote_subscription_ack_msg(const std::shared_ptr<remote_subscription_ack> & _acknowledgement,const entry_data_t & _data)3742 service_discovery_impl::add_entry_data_to_remote_subscription_ack_msg(
3743 const std::shared_ptr<remote_subscription_ack>& _acknowledgement,
3744 const entry_data_t &_data) {
3745 auto its_current_message = _acknowledgement->get_current_message();
3746 const auto is_fitting = its_current_message->add_entry_data(
3747 _data.entry_, _data.options_, _data.other_);
3748 if (!is_fitting) {
3749 its_current_message = _acknowledgement->add_message();
3750 (void)its_current_message->add_entry_data(
3751 _data.entry_, _data.options_, _data.other_);
3752 }
3753 }
3754
3755 void
register_sd_acceptance_handler(sd_acceptance_handler_t _handler)3756 service_discovery_impl::register_sd_acceptance_handler(
3757 sd_acceptance_handler_t _handler) {
3758 sd_acceptance_handler_ = _handler;
3759 }
3760
3761 void
register_reboot_notification_handler(reboot_notification_handler_t _handler)3762 service_discovery_impl::register_reboot_notification_handler(
3763 reboot_notification_handler_t _handler) {
3764 reboot_notification_handler_ = _handler;
3765 }
3766
get_eventgroup_reliability(service_t _service,instance_t _instance,eventgroup_t _eventgroup,const std::shared_ptr<subscription> & _subscription)3767 reliability_type_e service_discovery_impl::get_eventgroup_reliability(
3768 service_t _service, instance_t _instance, eventgroup_t _eventgroup,
3769 const std::shared_ptr<subscription>& _subscription) {
3770 reliability_type_e its_reliability = reliability_type_e::RT_UNKNOWN;
3771 auto its_info = _subscription->get_eventgroupinfo().lock();
3772 if (its_info) {
3773 its_reliability = its_info->get_reliability();
3774 if (its_reliability == reliability_type_e::RT_UNKNOWN
3775 && its_info->is_reliability_auto_mode()) {
3776 // fallback: determine how service is offered
3777 // and update reliability type of eventgroup
3778 switch (get_remote_offer_type(_service, _instance)) {
3779 case remote_offer_type_e::RELIABLE:
3780 its_reliability = reliability_type_e::RT_RELIABLE;
3781 break;
3782 case remote_offer_type_e::UNRELIABLE:
3783 its_reliability = reliability_type_e::RT_UNRELIABLE;
3784 break;
3785 case remote_offer_type_e::RELIABLE_UNRELIABLE:
3786 its_reliability = reliability_type_e::RT_BOTH;
3787 break;
3788 default:
3789 ;
3790 }
3791 VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't determine eventgroup reliability type for ["
3792 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
3793 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
3794 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
3795 << " using reliability type: "
3796 << std::hex << std::setw(4) << std::setfill('0') << (uint16_t) its_reliability;
3797 its_info->set_reliability(its_reliability);
3798 }
3799 } else {
3800 VSOMEIP_WARNING << "sd::" << __func__ << ": couldn't lock eventgroupinfo ["
3801 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
3802 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
3803 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "] ";
3804 auto its_eg_info = host_->find_eventgroup(_service, _instance, _eventgroup);
3805 if (its_eg_info) {
3806 _subscription->set_eventgroupinfo(its_eg_info);
3807 its_reliability = its_eg_info->get_reliability();
3808 }
3809 }
3810
3811 if (its_reliability == reliability_type_e::RT_UNKNOWN) {
3812 VSOMEIP_WARNING << "sd::" << __func__ << ": eventgroup reliability type is unknown ["
3813 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
3814 << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
3815 << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
3816 }
3817
3818 return its_reliability;
3819 }
3820
3821
3822 } // namespace sd
3823 } // namespace vsomeip_v3
3824