1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <algorithm>
7 #include <iomanip>
8
9 #include <vsomeip/constants.hpp>
10 #include <vsomeip/internal/logger.hpp>
11
12 #include "../include/eventgroupinfo.hpp"
13 #include "../include/event.hpp"
14 #include "../include/remote_subscription.hpp"
15 #include "../../endpoints/include/endpoint_definition.hpp"
16
17 namespace vsomeip_v3 {
18
eventgroupinfo()19 eventgroupinfo::eventgroupinfo()
20 : service_(0),
21 instance_(0),
22 eventgroup_(0),
23 major_(DEFAULT_MAJOR),
24 ttl_(DEFAULT_TTL),
25 port_(ILLEGAL_PORT),
26 threshold_(0),
27 id_(PENDING_SUBSCRIPTION_ID),
28 reliability_(reliability_type_e::RT_UNKNOWN),
29 reliability_auto_mode_(false),
30 max_remote_subscribers_(VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS) {
31 }
32
eventgroupinfo(const service_t _service,const instance_t _instance,const eventgroup_t _eventgroup,const major_version_t _major,const ttl_t _ttl,const uint8_t _max_remote_subscribers)33 eventgroupinfo::eventgroupinfo(
34 const service_t _service, const instance_t _instance,
35 const eventgroup_t _eventgroup, const major_version_t _major,
36 const ttl_t _ttl, const uint8_t _max_remote_subscribers)
37 : service_(_service),
38 instance_(_instance),
39 eventgroup_(_eventgroup),
40 major_(_major),
41 ttl_(_ttl),
42 port_(ILLEGAL_PORT),
43 threshold_(0),
44 id_(PENDING_SUBSCRIPTION_ID),
45 reliability_(reliability_type_e::RT_UNKNOWN),
46 reliability_auto_mode_(false),
47 max_remote_subscribers_(_max_remote_subscribers) {
48 }
49
~eventgroupinfo()50 eventgroupinfo::~eventgroupinfo() {
51 }
52
get_service() const53 service_t eventgroupinfo::get_service() const {
54 return service_;
55 }
56
set_service(const service_t _service)57 void eventgroupinfo::set_service(const service_t _service) {
58 service_ = _service;
59 }
60
get_instance() const61 instance_t eventgroupinfo::get_instance() const {
62 return instance_;
63 }
64
set_instance(const instance_t _instance)65 void eventgroupinfo::set_instance(const instance_t _instance) {
66 instance_ = _instance;
67 }
68
get_eventgroup() const69 eventgroup_t eventgroupinfo::get_eventgroup() const {
70 return eventgroup_;
71 }
72
set_eventgroup(const eventgroup_t _eventgroup)73 void eventgroupinfo::set_eventgroup(const eventgroup_t _eventgroup) {
74 eventgroup_ = _eventgroup;
75 }
76
get_major() const77 major_version_t eventgroupinfo::get_major() const {
78 return major_;
79 }
80
set_major(const major_version_t _major)81 void eventgroupinfo::set_major(const major_version_t _major) {
82 major_ = _major;
83 }
84
get_ttl() const85 ttl_t eventgroupinfo::get_ttl() const {
86 return ttl_;
87 }
88
set_ttl(const ttl_t _ttl)89 void eventgroupinfo::set_ttl(const ttl_t _ttl) {
90 ttl_ = _ttl;
91 }
92
is_multicast() const93 bool eventgroupinfo::is_multicast() const {
94 std::lock_guard<std::mutex> its_lock(address_mutex_);
95 return address_.is_multicast();
96 }
97
is_sending_multicast() const98 bool eventgroupinfo::is_sending_multicast() const {
99 return (is_multicast() &&
100 threshold_ != 0 &&
101 get_unreliable_target_count() >= threshold_);
102 }
103
get_multicast(boost::asio::ip::address & _address,uint16_t & _port) const104 bool eventgroupinfo::get_multicast(boost::asio::ip::address &_address,
105 uint16_t &_port) const {
106 std::lock_guard<std::mutex> its_lock(address_mutex_);
107 if (address_.is_multicast()) {
108 _address = address_;
109 _port = port_;
110 return true;
111 }
112 return false;
113 }
114
set_multicast(const boost::asio::ip::address & _address,uint16_t _port)115 void eventgroupinfo::set_multicast(const boost::asio::ip::address &_address,
116 uint16_t _port) {
117 std::lock_guard<std::mutex> its_lock(address_mutex_);
118 address_ = _address;
119 port_ = _port;
120 }
121
get_events() const122 const std::set<std::shared_ptr<event> > eventgroupinfo::get_events() const {
123 std::lock_guard<std::mutex> its_lock(events_mutex_);
124 return events_;
125 }
126
add_event(const std::shared_ptr<event> & _event)127 void eventgroupinfo::add_event(const std::shared_ptr<event>& _event) {
128 std::lock_guard<std::mutex> its_lock(events_mutex_);
129 events_.insert(_event);
130
131 if (!reliability_auto_mode_ &&
132 _event->get_reliability() == reliability_type_e::RT_UNKNOWN) {
133 reliability_auto_mode_ = true;
134 return;
135 }
136
137 switch (_event->get_reliability()) {
138 case reliability_type_e::RT_RELIABLE:
139 if (reliability_ == reliability_type_e::RT_UNRELIABLE) {
140 reliability_ = reliability_type_e::RT_BOTH;
141 } else if (reliability_ != reliability_type_e::RT_BOTH) {
142 reliability_ = reliability_type_e::RT_RELIABLE;
143 }
144 break;
145 case reliability_type_e::RT_UNRELIABLE:
146 if (reliability_ == reliability_type_e::RT_RELIABLE) {
147 reliability_ = reliability_type_e::RT_BOTH;
148 } else if (reliability_ != reliability_type_e::RT_BOTH) {
149 reliability_ = reliability_type_e::RT_UNRELIABLE;
150 }
151 break;
152 case reliability_type_e::RT_BOTH:
153 reliability_ = reliability_type_e::RT_BOTH;
154 break;
155 default:
156 ;
157 }
158 }
159
remove_event(const std::shared_ptr<event> & _event)160 void eventgroupinfo::remove_event(const std::shared_ptr<event>& _event) {
161 std::lock_guard<std::mutex> its_lock(events_mutex_);
162 events_.erase(_event);
163 }
164
get_reliability() const165 reliability_type_e eventgroupinfo::get_reliability() const {
166 return reliability_;
167 }
168
set_reliability(reliability_type_e _reliability)169 void eventgroupinfo::set_reliability(reliability_type_e _reliability) {
170 reliability_ = _reliability;
171 }
172
is_reliability_auto_mode() const173 bool eventgroupinfo::is_reliability_auto_mode() const {
174 return reliability_auto_mode_;
175 }
176
177 uint32_t
get_unreliable_target_count() const178 eventgroupinfo::get_unreliable_target_count() const {
179 uint32_t its_count(0);
180
181 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
182 for (const auto &s : subscriptions_) {
183 auto its_subscription = s.second;
184 if (!its_subscription->get_parent()
185 && its_subscription->get_unreliable()) {
186 its_count++;
187 }
188 }
189
190 return its_count;
191 }
192
get_threshold() const193 uint8_t eventgroupinfo::get_threshold() const {
194 return threshold_;
195 }
196
set_threshold(uint8_t _threshold)197 void eventgroupinfo::set_threshold(uint8_t _threshold) {
198 threshold_ = _threshold;
199 }
200
201 std::set<std::shared_ptr<remote_subscription> >
get_remote_subscriptions() const202 eventgroupinfo::get_remote_subscriptions() const {
203 std::set<std::shared_ptr<remote_subscription> > its_subscriptions;
204
205 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
206 for (const auto &i : subscriptions_)
207 its_subscriptions.insert(i.second);
208
209 return its_subscriptions;
210 }
211
212 bool
update_remote_subscription(const std::shared_ptr<remote_subscription> & _subscription,const std::chrono::steady_clock::time_point & _expiration,std::set<client_t> & _changed,remote_subscription_id_t & _id,const bool _is_subscribe)213 eventgroupinfo::update_remote_subscription(
214 const std::shared_ptr<remote_subscription> &_subscription,
215 const std::chrono::steady_clock::time_point &_expiration,
216 std::set<client_t> &_changed, remote_subscription_id_t &_id,
217 const bool _is_subscribe) {
218
219 bool its_result(false);
220 std::shared_ptr<endpoint_definition> its_subscriber;
221 std::set<std::shared_ptr<event> > its_events;
222
223 {
224 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
225
226 for (const auto& its_item : subscriptions_) {
227 if (its_item.second->equals(_subscription)) {
228 // update existing subscription
229 _changed = its_item.second->update(
230 _subscription->get_clients(), _expiration, _is_subscribe);
231 _id = its_item.second->get_id();
232
233 // Copy acknowledgment states from existing subscription
234 for (const auto& its_client : _subscription->get_clients()) {
235 const auto its_state = its_item.second->get_client_state(its_client);
236 if (_is_subscribe &&
237 its_state == remote_subscription_state_e::SUBSCRIPTION_UNKNOWN) {
238 _subscription->set_client_state(its_client,
239 remote_subscription_state_e::SUBSCRIPTION_PENDING);
240 _changed.insert(its_client);
241 } else {
242 _subscription->set_client_state(its_client, its_state);
243 }
244 }
245
246 if (_is_subscribe) {
247 if (!_changed.empty()) {
248 // New clients:
249 // Let this be a child subscription
250 _subscription->set_parent(its_item.second);
251 update_id();
252 _subscription->set_id(id_);
253 subscriptions_[id_] = _subscription;
254 } else {
255 if (!_subscription->is_pending()) {
256 if (!_subscription->force_initial_events()) {
257 _subscription->set_initial(false);
258 }
259 } else {
260 its_item.second->set_answers(
261 its_item.second->get_answers() + 1);
262 _subscription->set_parent(its_item.second);
263 _subscription->set_answers(0);
264 }
265 }
266 } else {
267 if (its_item.second->is_pending()) {
268 its_subscriber = its_item.second->get_subscriber();
269 }
270 }
271
272 its_result = true;
273 break;
274 }
275 }
276 }
277
278 if (its_subscriber) {
279 {
280 // Build set of events first to avoid having to
281 // hold the "events_mutex_" in parallel to the internal event mutexes.
282 std::lock_guard<std::mutex> its_lock(events_mutex_);
283 for (const auto &its_event : events_)
284 its_events.insert(its_event);
285 }
286 for (const auto &its_event : its_events)
287 its_event->remove_pending(its_subscriber);
288 }
289
290 return (its_result);
291 }
292
293 bool
is_remote_subscription_limit_reached(const std::shared_ptr<remote_subscription> & _subscription)294 eventgroupinfo::is_remote_subscription_limit_reached(
295 const std::shared_ptr<remote_subscription> &_subscription) {
296 bool limit_reached(false);
297
298 if (subscriptions_.size() <= max_remote_subscribers_) {
299 return false;
300 }
301
302 boost::asio::ip::address its_address;
303 if (_subscription->get_ip_address(its_address)) {
304 auto find_address = remote_subscribers_count_.find(its_address);
305 if (find_address != remote_subscribers_count_.end()) {
306 if (find_address->second > max_remote_subscribers_) {
307 VSOMEIP_WARNING << ": remote subscriber limit [" << std::dec
308 << (uint32_t)max_remote_subscribers_ << "] to ["
309 << std::hex << std::setw(4) << std::setfill('0') << service_ << "."
310 << std::hex << std::setw(4) << std::setfill('0') << instance_ << "."
311 << std::hex << std::setw(4) << std::setfill('0') << eventgroup_ << "]"
312 << " reached for remote address: " << its_address.to_string()
313 << " rejecting subscription!";
314 return true;
315 }
316 }
317 }
318 return limit_reached;
319 }
320
321 remote_subscription_id_t
add_remote_subscription(const std::shared_ptr<remote_subscription> & _subscription)322 eventgroupinfo::add_remote_subscription(
323 const std::shared_ptr<remote_subscription> &_subscription) {
324 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
325
326 update_id();
327
328 _subscription->set_id(id_);
329 subscriptions_[id_] = _subscription;
330
331 boost::asio::ip::address its_address;
332 if (_subscription->get_ip_address(its_address)) {
333 remote_subscribers_count_[its_address]++;
334 }
335 return id_;
336 }
337
338 std::shared_ptr<remote_subscription>
get_remote_subscription(const remote_subscription_id_t _id)339 eventgroupinfo::get_remote_subscription(
340 const remote_subscription_id_t _id) {
341 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
342
343 auto find_subscription = subscriptions_.find(_id);
344 if (find_subscription != subscriptions_.end())
345 return find_subscription->second;
346
347 return nullptr;
348 }
349
350 void
remove_remote_subscription(const remote_subscription_id_t _id)351 eventgroupinfo::remove_remote_subscription(
352 const remote_subscription_id_t _id) {
353 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
354
355 auto find_subscription = subscriptions_.find(_id);
356 if (find_subscription != subscriptions_.end()) {
357 boost::asio::ip::address its_address;
358 if (find_subscription->second->get_ip_address(its_address)) {
359 auto find_address = remote_subscribers_count_.find(its_address);
360 if (find_address != remote_subscribers_count_.end()) {
361 if(find_address->second != 0) {
362 find_address->second--;
363 }
364 }
365 }
366 }
367
368 subscriptions_.erase(_id);
369 }
370
371 void
clear_remote_subscriptions()372 eventgroupinfo::clear_remote_subscriptions() {
373 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
374 subscriptions_.clear();
375 remote_subscribers_count_.clear();
376 }
377
378 std::set<std::shared_ptr<endpoint_definition> >
get_unicast_targets() const379 eventgroupinfo::get_unicast_targets() const {
380 std::set<std::shared_ptr<endpoint_definition>> its_targets;
381
382 std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
383 for (const auto &s : subscriptions_) {
384 const auto its_reliable = s.second->get_reliable();
385 if (its_reliable)
386 its_targets.insert(its_reliable);
387 const auto its_unreliable = s.second->get_unreliable();
388 if (its_unreliable)
389 its_targets.insert(its_unreliable);
390 }
391
392 return its_targets;
393 }
394
395 std::set<std::shared_ptr<endpoint_definition> >
get_multicast_targets() const396 eventgroupinfo::get_multicast_targets() const {
397 std::set<std::shared_ptr<endpoint_definition>> its_targets;
398 return its_targets;
399 }
400
is_selective() const401 bool eventgroupinfo::is_selective() const {
402 // Selective eventgroups always contain a single event
403 std::lock_guard<std::mutex> its_lock(events_mutex_);
404 if (events_.size() != 1)
405 return false;
406
407 return ((*events_.begin())->get_type()
408 == event_type_e::ET_SELECTIVE_EVENT);
409 }
410
411 void
update_id()412 eventgroupinfo::update_id() {
413 id_++;
414 if (id_ == PENDING_SUBSCRIPTION_ID)
415 id_ = 1;
416 }
417
418 void
send_initial_events(const std::shared_ptr<endpoint_definition> & _reliable,const std::shared_ptr<endpoint_definition> & _unreliable) const419 eventgroupinfo::send_initial_events(
420 const std::shared_ptr<endpoint_definition> &_reliable,
421 const std::shared_ptr<endpoint_definition> &_unreliable) const {
422
423 std::set<std::shared_ptr<event> > its_reliable_events, its_unreliable_events;
424
425 // Build sets of reliable/unreliable events first to avoid having to
426 // hold the "events_mutex_" in parallel to the internal event mutexes.
427 {
428 std::lock_guard<std::mutex> its_lock(events_mutex_);
429 for (const auto &its_event : events_) {
430 if (its_event && its_event->get_type() == event_type_e::ET_FIELD) {
431 auto its_reliability = its_event->get_reliability();
432 #ifdef VSOMEIP_ENABLE_COMPAT
433 if (its_reliability == reliability_type_e::RT_UNKNOWN) {
434 if (_reliable) {
435 if (_unreliable) {
436 its_reliability = reliability_type_e::RT_BOTH;
437 } else {
438 its_reliability = reliability_type_e::RT_RELIABLE;
439 }
440 } else if (_unreliable) {
441 its_reliability = reliability_type_e::RT_UNRELIABLE;
442 }
443 }
444 #endif
445 switch (its_reliability) {
446 case reliability_type_e::RT_RELIABLE:
447 its_reliable_events.insert(its_event);
448 break;
449 case reliability_type_e::RT_UNRELIABLE:
450 its_unreliable_events.insert(its_event);
451 break;
452 case reliability_type_e::RT_BOTH:
453 its_reliable_events.insert(its_event);
454 its_unreliable_events.insert(its_event);
455 break;
456 default:
457 VSOMEIP_WARNING << __func__ << "Event reliability unknown: ["
458 << std::hex << std::setw(4) << std::setfill('0') << service_ << "."
459 << std::hex << std::setw(4) << std::setfill('0') << instance_ << "."
460 << std::hex << std::setw(4) << std::setfill('0') << eventgroup_ << "."
461 << std::hex << std::setw(4) << std::setfill('0') << its_event->get_event() << "]";
462 }
463 }
464 }
465 }
466
467 // Send events
468 for (const auto &its_event : its_reliable_events)
469 its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _reliable);
470
471 for (const auto &its_event : its_unreliable_events)
472 its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _unreliable);
473 }
474
get_max_remote_subscribers() const475 uint8_t eventgroupinfo::get_max_remote_subscribers() const {
476 return max_remote_subscribers_;
477 }
478
set_max_remote_subscribers(uint8_t _max_remote_subscribers)479 void eventgroupinfo::set_max_remote_subscribers(uint8_t _max_remote_subscribers) {
480 max_remote_subscribers_ = _max_remote_subscribers;
481 }
482
483 } // namespace vsomeip_v3
484