1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 
6 #include <algorithm>
7 #include <iomanip>
8 
9 #include <vsomeip/constants.hpp>
10 #include <vsomeip/internal/logger.hpp>
11 
12 #include "../include/eventgroupinfo.hpp"
13 #include "../include/event.hpp"
14 #include "../include/remote_subscription.hpp"
15 #include "../../endpoints/include/endpoint_definition.hpp"
16 
17 namespace vsomeip_v3 {
18 
eventgroupinfo()19 eventgroupinfo::eventgroupinfo()
20     : service_(0),
21       instance_(0),
22       eventgroup_(0),
23       major_(DEFAULT_MAJOR),
24       ttl_(DEFAULT_TTL),
25       port_(ILLEGAL_PORT),
26       threshold_(0),
27       id_(PENDING_SUBSCRIPTION_ID),
28       reliability_(reliability_type_e::RT_UNKNOWN),
29       reliability_auto_mode_(false),
30       max_remote_subscribers_(VSOMEIP_DEFAULT_MAX_REMOTE_SUBSCRIBERS) {
31 }
32 
eventgroupinfo(const service_t _service,const instance_t _instance,const eventgroup_t _eventgroup,const major_version_t _major,const ttl_t _ttl,const uint8_t _max_remote_subscribers)33 eventgroupinfo::eventgroupinfo(
34         const service_t _service, const instance_t _instance,
35         const eventgroup_t _eventgroup, const major_version_t _major,
36         const ttl_t _ttl, const uint8_t _max_remote_subscribers)
37     : service_(_service),
38       instance_(_instance),
39       eventgroup_(_eventgroup),
40       major_(_major),
41       ttl_(_ttl),
42       port_(ILLEGAL_PORT),
43       threshold_(0),
44       id_(PENDING_SUBSCRIPTION_ID),
45       reliability_(reliability_type_e::RT_UNKNOWN),
46       reliability_auto_mode_(false),
47       max_remote_subscribers_(_max_remote_subscribers) {
48 }
49 
~eventgroupinfo()50 eventgroupinfo::~eventgroupinfo() {
51 }
52 
get_service() const53 service_t eventgroupinfo::get_service() const {
54     return service_;
55 }
56 
set_service(const service_t _service)57 void eventgroupinfo::set_service(const service_t _service) {
58     service_ = _service;
59 }
60 
get_instance() const61 instance_t eventgroupinfo::get_instance() const {
62     return instance_;
63 }
64 
set_instance(const instance_t _instance)65 void eventgroupinfo::set_instance(const instance_t _instance) {
66     instance_ = _instance;
67 }
68 
get_eventgroup() const69 eventgroup_t eventgroupinfo::get_eventgroup() const {
70     return eventgroup_;
71 }
72 
set_eventgroup(const eventgroup_t _eventgroup)73 void eventgroupinfo::set_eventgroup(const eventgroup_t _eventgroup) {
74     eventgroup_ = _eventgroup;
75 }
76 
get_major() const77 major_version_t eventgroupinfo::get_major() const {
78     return major_;
79 }
80 
set_major(const major_version_t _major)81 void eventgroupinfo::set_major(const major_version_t _major) {
82     major_ = _major;
83 }
84 
get_ttl() const85 ttl_t eventgroupinfo::get_ttl() const {
86     return ttl_;
87 }
88 
set_ttl(const ttl_t _ttl)89 void eventgroupinfo::set_ttl(const ttl_t _ttl) {
90     ttl_ = _ttl;
91 }
92 
is_multicast() const93 bool eventgroupinfo::is_multicast() const {
94     std::lock_guard<std::mutex> its_lock(address_mutex_);
95     return address_.is_multicast();
96 }
97 
is_sending_multicast() const98 bool eventgroupinfo::is_sending_multicast() const {
99     return (is_multicast() &&
100             threshold_ != 0 &&
101             get_unreliable_target_count() >= threshold_);
102 }
103 
get_multicast(boost::asio::ip::address & _address,uint16_t & _port) const104 bool eventgroupinfo::get_multicast(boost::asio::ip::address &_address,
105         uint16_t &_port) const {
106     std::lock_guard<std::mutex> its_lock(address_mutex_);
107     if (address_.is_multicast()) {
108         _address = address_;
109         _port = port_;
110         return true;
111     }
112     return false;
113 }
114 
set_multicast(const boost::asio::ip::address & _address,uint16_t _port)115 void eventgroupinfo::set_multicast(const boost::asio::ip::address &_address,
116         uint16_t _port) {
117     std::lock_guard<std::mutex> its_lock(address_mutex_);
118     address_ = _address;
119     port_ = _port;
120 }
121 
get_events() const122 const std::set<std::shared_ptr<event> > eventgroupinfo::get_events() const {
123     std::lock_guard<std::mutex> its_lock(events_mutex_);
124     return events_;
125 }
126 
add_event(const std::shared_ptr<event> & _event)127 void eventgroupinfo::add_event(const std::shared_ptr<event>& _event) {
128     std::lock_guard<std::mutex> its_lock(events_mutex_);
129     events_.insert(_event);
130 
131     if (!reliability_auto_mode_ &&
132             _event->get_reliability() == reliability_type_e::RT_UNKNOWN) {
133         reliability_auto_mode_ = true;
134         return;
135     }
136 
137     switch (_event->get_reliability()) {
138     case reliability_type_e::RT_RELIABLE:
139         if (reliability_ == reliability_type_e::RT_UNRELIABLE) {
140             reliability_ = reliability_type_e::RT_BOTH;
141         } else if (reliability_ != reliability_type_e::RT_BOTH) {
142             reliability_ = reliability_type_e::RT_RELIABLE;
143         }
144         break;
145     case reliability_type_e::RT_UNRELIABLE:
146         if (reliability_ == reliability_type_e::RT_RELIABLE) {
147             reliability_ = reliability_type_e::RT_BOTH;
148         } else if (reliability_ != reliability_type_e::RT_BOTH) {
149             reliability_ = reliability_type_e::RT_UNRELIABLE;
150         }
151         break;
152     case reliability_type_e::RT_BOTH:
153         reliability_ = reliability_type_e::RT_BOTH;
154         break;
155     default:
156         ;
157     }
158 }
159 
remove_event(const std::shared_ptr<event> & _event)160 void eventgroupinfo::remove_event(const std::shared_ptr<event>& _event) {
161     std::lock_guard<std::mutex> its_lock(events_mutex_);
162     events_.erase(_event);
163 }
164 
get_reliability() const165 reliability_type_e eventgroupinfo::get_reliability() const {
166     return reliability_;
167 }
168 
set_reliability(reliability_type_e _reliability)169 void eventgroupinfo::set_reliability(reliability_type_e _reliability) {
170     reliability_ = _reliability;
171 }
172 
is_reliability_auto_mode() const173 bool eventgroupinfo::is_reliability_auto_mode() const {
174     return reliability_auto_mode_;
175 }
176 
177 uint32_t
get_unreliable_target_count() const178 eventgroupinfo::get_unreliable_target_count() const {
179     uint32_t its_count(0);
180 
181     std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
182     for (const auto &s : subscriptions_) {
183         auto its_subscription = s.second;
184         if (!its_subscription->get_parent()
185                 && its_subscription->get_unreliable()) {
186             its_count++;
187         }
188     }
189 
190     return its_count;
191 }
192 
get_threshold() const193 uint8_t eventgroupinfo::get_threshold() const {
194     return threshold_;
195 }
196 
set_threshold(uint8_t _threshold)197 void eventgroupinfo::set_threshold(uint8_t _threshold) {
198     threshold_ = _threshold;
199 }
200 
201 std::set<std::shared_ptr<remote_subscription> >
get_remote_subscriptions() const202 eventgroupinfo::get_remote_subscriptions() const {
203     std::set<std::shared_ptr<remote_subscription> > its_subscriptions;
204 
205     std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
206     for (const auto &i : subscriptions_)
207         its_subscriptions.insert(i.second);
208 
209     return its_subscriptions;
210 }
211 
212 bool
update_remote_subscription(const std::shared_ptr<remote_subscription> & _subscription,const std::chrono::steady_clock::time_point & _expiration,std::set<client_t> & _changed,remote_subscription_id_t & _id,const bool _is_subscribe)213 eventgroupinfo::update_remote_subscription(
214         const std::shared_ptr<remote_subscription> &_subscription,
215         const std::chrono::steady_clock::time_point &_expiration,
216         std::set<client_t> &_changed, remote_subscription_id_t &_id,
217         const bool _is_subscribe) {
218 
219     bool its_result(false);
220     std::shared_ptr<endpoint_definition> its_subscriber;
221     std::set<std::shared_ptr<event> > its_events;
222 
223     {
224         std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
225 
226         for (const auto& its_item : subscriptions_) {
227             if (its_item.second->equals(_subscription)) {
228                 // update existing subscription
229                 _changed = its_item.second->update(
230                     _subscription->get_clients(), _expiration, _is_subscribe);
231                 _id = its_item.second->get_id();
232 
233                 // Copy acknowledgment states from existing subscription
234                 for (const auto& its_client : _subscription->get_clients()) {
235                     const auto its_state = its_item.second->get_client_state(its_client);
236                     if (_is_subscribe &&
237                             its_state == remote_subscription_state_e::SUBSCRIPTION_UNKNOWN) {
238                         _subscription->set_client_state(its_client,
239                                 remote_subscription_state_e::SUBSCRIPTION_PENDING);
240                         _changed.insert(its_client);
241                     } else {
242                         _subscription->set_client_state(its_client, its_state);
243                     }
244                 }
245 
246                 if (_is_subscribe) {
247                     if (!_changed.empty()) {
248                         // New clients:
249                         // Let this be a child subscription
250                         _subscription->set_parent(its_item.second);
251                         update_id();
252                         _subscription->set_id(id_);
253                         subscriptions_[id_] = _subscription;
254                     } else {
255                         if (!_subscription->is_pending()) {
256                             if (!_subscription->force_initial_events()) {
257                                 _subscription->set_initial(false);
258                             }
259                         } else {
260                             its_item.second->set_answers(
261                                     its_item.second->get_answers() + 1);
262                             _subscription->set_parent(its_item.second);
263                             _subscription->set_answers(0);
264                         }
265                     }
266                 } else {
267                     if (its_item.second->is_pending()) {
268                         its_subscriber = its_item.second->get_subscriber();
269                     }
270                 }
271 
272                 its_result = true;
273                 break;
274             }
275         }
276     }
277 
278     if (its_subscriber) {
279         {
280             // Build set of events first to avoid having to
281             // hold the "events_mutex_" in parallel to the internal event mutexes.
282             std::lock_guard<std::mutex> its_lock(events_mutex_);
283             for (const auto &its_event : events_)
284                 its_events.insert(its_event);
285         }
286         for (const auto &its_event : its_events)
287             its_event->remove_pending(its_subscriber);
288     }
289 
290     return (its_result);
291 }
292 
293 bool
is_remote_subscription_limit_reached(const std::shared_ptr<remote_subscription> & _subscription)294 eventgroupinfo::is_remote_subscription_limit_reached(
295         const std::shared_ptr<remote_subscription> &_subscription) {
296     bool limit_reached(false);
297 
298     if (subscriptions_.size() <= max_remote_subscribers_) {
299         return false;
300     }
301 
302     boost::asio::ip::address its_address;
303     if (_subscription->get_ip_address(its_address)) {
304         auto find_address = remote_subscribers_count_.find(its_address);
305         if (find_address != remote_subscribers_count_.end()) {
306             if (find_address->second > max_remote_subscribers_) {
307                 VSOMEIP_WARNING << ": remote subscriber limit [" << std::dec
308                         << (uint32_t)max_remote_subscribers_ << "] to ["
309                         << std::hex << std::setw(4) << std::setfill('0') << service_ << "."
310                         << std::hex << std::setw(4) << std::setfill('0') << instance_ << "."
311                         << std::hex << std::setw(4) << std::setfill('0') << eventgroup_ << "]"
312                         << " reached for remote address: " << its_address.to_string()
313                         << " rejecting subscription!";
314                 return true;
315             }
316         }
317     }
318     return limit_reached;
319 }
320 
321 remote_subscription_id_t
add_remote_subscription(const std::shared_ptr<remote_subscription> & _subscription)322 eventgroupinfo::add_remote_subscription(
323         const std::shared_ptr<remote_subscription> &_subscription) {
324     std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
325 
326     update_id();
327 
328     _subscription->set_id(id_);
329     subscriptions_[id_] = _subscription;
330 
331     boost::asio::ip::address its_address;
332     if (_subscription->get_ip_address(its_address)) {
333         remote_subscribers_count_[its_address]++;
334     }
335     return id_;
336 }
337 
338 std::shared_ptr<remote_subscription>
get_remote_subscription(const remote_subscription_id_t _id)339 eventgroupinfo::get_remote_subscription(
340         const remote_subscription_id_t _id) {
341     std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
342 
343     auto find_subscription = subscriptions_.find(_id);
344     if (find_subscription != subscriptions_.end())
345         return find_subscription->second;
346 
347     return nullptr;
348 }
349 
350 void
remove_remote_subscription(const remote_subscription_id_t _id)351 eventgroupinfo::remove_remote_subscription(
352         const remote_subscription_id_t _id) {
353     std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
354 
355     auto find_subscription = subscriptions_.find(_id);
356     if (find_subscription != subscriptions_.end()) {
357         boost::asio::ip::address its_address;
358         if (find_subscription->second->get_ip_address(its_address)) {
359             auto find_address = remote_subscribers_count_.find(its_address);
360             if (find_address != remote_subscribers_count_.end()) {
361                 if(find_address->second != 0) {
362                     find_address->second--;
363                 }
364             }
365         }
366     }
367 
368     subscriptions_.erase(_id);
369 }
370 
371 void
clear_remote_subscriptions()372 eventgroupinfo::clear_remote_subscriptions() {
373     std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
374     subscriptions_.clear();
375     remote_subscribers_count_.clear();
376 }
377 
378 std::set<std::shared_ptr<endpoint_definition> >
get_unicast_targets() const379 eventgroupinfo::get_unicast_targets() const {
380     std::set<std::shared_ptr<endpoint_definition>> its_targets;
381 
382     std::lock_guard<std::mutex> its_lock(subscriptions_mutex_);
383     for (const auto &s : subscriptions_) {
384         const auto its_reliable = s.second->get_reliable();
385         if (its_reliable)
386             its_targets.insert(its_reliable);
387         const auto its_unreliable = s.second->get_unreliable();
388         if (its_unreliable)
389             its_targets.insert(its_unreliable);
390     }
391 
392     return its_targets;
393 }
394 
395 std::set<std::shared_ptr<endpoint_definition> >
get_multicast_targets() const396 eventgroupinfo::get_multicast_targets() const {
397     std::set<std::shared_ptr<endpoint_definition>> its_targets;
398     return its_targets;
399 }
400 
is_selective() const401 bool eventgroupinfo::is_selective() const {
402     // Selective eventgroups always contain a single event
403     std::lock_guard<std::mutex> its_lock(events_mutex_);
404     if (events_.size() != 1)
405         return false;
406 
407     return ((*events_.begin())->get_type()
408             == event_type_e::ET_SELECTIVE_EVENT);
409 }
410 
411 void
update_id()412 eventgroupinfo::update_id() {
413     id_++;
414     if (id_ == PENDING_SUBSCRIPTION_ID)
415         id_ = 1;
416 }
417 
418 void
send_initial_events(const std::shared_ptr<endpoint_definition> & _reliable,const std::shared_ptr<endpoint_definition> & _unreliable) const419 eventgroupinfo::send_initial_events(
420         const std::shared_ptr<endpoint_definition> &_reliable,
421         const std::shared_ptr<endpoint_definition> &_unreliable) const {
422 
423     std::set<std::shared_ptr<event> > its_reliable_events, its_unreliable_events;
424 
425     // Build sets of reliable/unreliable events first to avoid having to
426     // hold the "events_mutex_" in parallel to the internal event mutexes.
427     {
428         std::lock_guard<std::mutex> its_lock(events_mutex_);
429         for (const auto &its_event : events_) {
430             if (its_event && its_event->get_type() == event_type_e::ET_FIELD) {
431                 auto its_reliability = its_event->get_reliability();
432 #ifdef VSOMEIP_ENABLE_COMPAT
433                 if (its_reliability == reliability_type_e::RT_UNKNOWN) {
434                     if (_reliable) {
435                         if (_unreliable) {
436                             its_reliability = reliability_type_e::RT_BOTH;
437                         } else {
438                             its_reliability = reliability_type_e::RT_RELIABLE;
439                         }
440                     } else if (_unreliable) {
441                         its_reliability = reliability_type_e::RT_UNRELIABLE;
442                     }
443                 }
444 #endif
445                 switch (its_reliability) {
446                 case reliability_type_e::RT_RELIABLE:
447                     its_reliable_events.insert(its_event);
448                     break;
449                 case reliability_type_e::RT_UNRELIABLE:
450                     its_unreliable_events.insert(its_event);
451                     break;
452                 case reliability_type_e::RT_BOTH:
453                     its_reliable_events.insert(its_event);
454                     its_unreliable_events.insert(its_event);
455                     break;
456                 default:
457                     VSOMEIP_WARNING << __func__ << "Event reliability unknown: ["
458                         << std::hex << std::setw(4) << std::setfill('0') << service_ << "."
459                         << std::hex << std::setw(4) << std::setfill('0') << instance_ << "."
460                         << std::hex << std::setw(4) << std::setfill('0') << eventgroup_ << "."
461                         << std::hex << std::setw(4) << std::setfill('0') << its_event->get_event() << "]";
462                 }
463             }
464         }
465     }
466 
467     // Send events
468     for (const auto &its_event : its_reliable_events)
469         its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _reliable);
470 
471     for (const auto &its_event : its_unreliable_events)
472         its_event->notify_one(VSOMEIP_ROUTING_CLIENT, _unreliable);
473 }
474 
get_max_remote_subscribers() const475 uint8_t eventgroupinfo::get_max_remote_subscribers() const {
476     return max_remote_subscribers_;
477 }
478 
set_max_remote_subscribers(uint8_t _max_remote_subscribers)479 void eventgroupinfo::set_max_remote_subscribers(uint8_t _max_remote_subscribers) {
480     max_remote_subscribers_ = _max_remote_subscribers;
481 }
482 
483 }  // namespace vsomeip_v3
484