1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 
6 #include <iomanip>
7 #include <sstream>
8 
9 #include <vsomeip/constants.hpp>
10 #include <vsomeip/defines.hpp>
11 #include <vsomeip/message.hpp>
12 #include <vsomeip/payload.hpp>
13 #include <vsomeip/runtime.hpp>
14 #include <vsomeip/internal/logger.hpp>
15 
16 #include "../include/event.hpp"
17 #include "../include/routing_manager.hpp"
18 #include "../../message/include/payload_impl.hpp"
19 
20 #include "../../endpoints/include/endpoint_definition.hpp"
21 
22 namespace vsomeip_v3 {
23 
event(routing_manager * _routing,bool _is_shadow)24 event::event(routing_manager *_routing, bool _is_shadow) :
25         routing_(_routing),
26         message_(runtime::get()->create_notification()),
27         type_(event_type_e::ET_EVENT),
28         cycle_timer_(_routing->get_io()),
29         cycle_(std::chrono::milliseconds::zero()),
30         change_resets_cycle_(false),
31         is_updating_on_change_(true),
32         is_set_(false),
33         is_provided_(false),
34         is_shadow_(_is_shadow),
35         is_cache_placeholder_(false),
36         epsilon_change_func_(std::bind(&event::compare, this,
37                 std::placeholders::_1, std::placeholders::_2)),
38         reliability_(reliability_type_e::RT_UNKNOWN) {
39 }
40 
get_service() const41 service_t event::get_service() const {
42     return (message_->get_service());
43 }
44 
set_service(service_t _service)45 void event::set_service(service_t _service) {
46     message_->set_service(_service);
47 }
48 
get_instance() const49 instance_t event::get_instance() const {
50     return (message_->get_instance());
51 }
52 
set_instance(instance_t _instance)53 void event::set_instance(instance_t _instance) {
54     message_->set_instance(_instance);
55 }
56 
get_version() const57 major_version_t event::get_version() const {
58     return message_->get_interface_version();
59 }
60 
set_version(major_version_t _major)61 void event::set_version(major_version_t _major) {
62     message_->set_interface_version(_major);
63 }
64 
get_event() const65 event_t event::get_event() const {
66     return (message_->get_method());
67 }
68 
set_event(event_t _event)69 void event::set_event(event_t _event) {
70     message_->set_method(_event);
71 }
72 
get_type() const73 event_type_e event::get_type() const {
74     return (type_);
75 }
76 
set_type(const event_type_e _type)77 void event::set_type(const event_type_e _type) {
78     type_ = _type;
79 }
80 
is_field() const81 bool event::is_field() const {
82     return (type_ == event_type_e::ET_FIELD);
83 }
84 
is_provided() const85 bool event::is_provided() const {
86     return (is_provided_);
87 }
88 
set_provided(bool _is_provided)89 void event::set_provided(bool _is_provided) {
90     is_provided_ = _is_provided;
91 }
92 
is_set() const93 bool event::is_set() const {
94     return is_set_;
95 }
96 
get_payload() const97 const std::shared_ptr<payload> event::get_payload() const {
98     std::lock_guard<std::mutex> its_lock(mutex_);
99     return (message_->get_payload());
100 }
101 
set_payload_dont_notify(const std::shared_ptr<payload> & _payload)102 bool event::set_payload_dont_notify(const std::shared_ptr<payload> &_payload) {
103     std::lock_guard<std::mutex> its_lock(mutex_);
104     if (is_cache_placeholder_) {
105         reset_payload(_payload);
106         is_set_ = true;
107     } else {
108         if (set_payload_helper(_payload, false)) {
109             reset_payload(_payload);
110         } else {
111             return false;
112         }
113     }
114     return true;
115 }
116 
set_payload(const std::shared_ptr<payload> & _payload,bool _force)117 void event::set_payload(const std::shared_ptr<payload> &_payload, bool _force) {
118     std::lock_guard<std::mutex> its_lock(mutex_);
119     if (is_provided_) {
120         if (set_payload_helper(_payload, _force)) {
121             reset_payload(_payload);
122             if (is_updating_on_change_) {
123                 if (change_resets_cycle_)
124                     stop_cycle();
125 
126                 notify();
127 
128                 if (change_resets_cycle_)
129                     start_cycle();
130             }
131         }
132     } else {
133         VSOMEIP_INFO << "Can't set payload for event "
134                 << std::hex << std::setw(4) << std::setfill('0')
135                 << get_service() << "." << get_instance() << "." << get_event()
136                 << " as it isn't provided";
137     }
138 }
139 
set_payload(const std::shared_ptr<payload> & _payload,client_t _client,bool _force)140 void event::set_payload(const std::shared_ptr<payload> &_payload, client_t _client,
141             bool _force) {
142     std::lock_guard<std::mutex> its_lock(mutex_);
143     if (is_provided_) {
144         if (set_payload_helper(_payload, _force)) {
145             reset_payload(_payload);
146             if (is_updating_on_change_) {
147                 notify_one_unlocked(_client);
148             }
149         }
150     } else {
151         VSOMEIP_INFO << "Can't set payload for event "
152                 << std::hex << std::setw(4) << std::setfill('0')
153                 << get_service() << "." << get_instance() << "." << get_event()
154                 << ". It isn't provided";
155     }
156 }
157 
set_payload(const std::shared_ptr<payload> & _payload,const client_t _client,const std::shared_ptr<endpoint_definition> & _target,bool _force)158 void event::set_payload(const std::shared_ptr<payload> &_payload,
159         const client_t _client,
160         const std::shared_ptr<endpoint_definition>& _target,
161         bool _force) {
162     std::lock_guard<std::mutex> its_lock(mutex_);
163     if (is_provided_) {
164         if (set_payload_helper(_payload, _force)) {
165             reset_payload(_payload);
166             if (is_updating_on_change_) {
167                 notify_one_unlocked(_client, _target);
168             }
169         }
170     } else {
171         VSOMEIP_INFO << "Can't set payload for event "
172                 << std::hex << std::setw(4) << std::setfill('0')
173                 << get_service() << "." << get_instance() << "." << get_event()
174                 << ". It isn't provided";
175     }
176 }
177 
set_payload_notify_pending(const std::shared_ptr<payload> & _payload)178 bool event::set_payload_notify_pending(const std::shared_ptr<payload> &_payload) {
179     std::lock_guard<std::mutex> its_lock(mutex_);
180     if (!is_set_ && is_provided_) {
181         reset_payload(_payload);
182 
183         // Send pending initial events.
184         for (const auto &its_target : pending_) {
185             message_->set_session(routing_->get_session());
186             routing_->send_to(VSOMEIP_ROUTING_CLIENT,
187                     its_target, message_);
188         }
189         pending_.clear();
190 
191         return true;
192     }
193 
194     return false;
195 }
196 
unset_payload(bool _force)197 void event::unset_payload(bool _force) {
198     std::lock_guard<std::mutex> its_lock(mutex_);
199     if (_force) {
200         is_set_ = false;
201         stop_cycle();
202         message_->set_payload(std::make_shared<payload_impl>());
203     } else {
204         if (is_provided_) {
205             is_set_ = false;
206             stop_cycle();
207             message_->set_payload(std::make_shared<payload_impl>());
208         }
209     }
210 }
211 
set_update_cycle(std::chrono::milliseconds & _cycle)212 void event::set_update_cycle(std::chrono::milliseconds &_cycle) {
213     if (is_provided_) {
214         std::lock_guard<std::mutex> its_lock(mutex_);
215         stop_cycle();
216         cycle_ = _cycle;
217         start_cycle();
218     }
219 }
220 
set_change_resets_cycle(bool _change_resets_cycle)221 void event::set_change_resets_cycle(bool _change_resets_cycle) {
222     change_resets_cycle_ = _change_resets_cycle;
223 }
224 
set_update_on_change(bool _is_active)225 void event::set_update_on_change(bool _is_active) {
226     if (is_provided_) {
227         is_updating_on_change_ = _is_active;
228     }
229 }
230 
set_epsilon_change_function(const epsilon_change_func_t & _epsilon_change_func)231 void event::set_epsilon_change_function(const epsilon_change_func_t &_epsilon_change_func) {
232     if (_epsilon_change_func) {
233         std::lock_guard<std::mutex> its_lock(mutex_);
234         epsilon_change_func_ = _epsilon_change_func;
235     }
236 }
237 
get_eventgroups() const238 const std::set<eventgroup_t> event::get_eventgroups() const {
239     std::set<eventgroup_t> its_eventgroups;
240     {
241         std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
242         for (const auto& e : eventgroups_) {
243             its_eventgroups.insert(e.first);
244         }
245     }
246     return its_eventgroups;
247 }
248 
get_eventgroups(client_t _client) const249 std::set<eventgroup_t> event::get_eventgroups(client_t _client) const {
250     std::set<eventgroup_t> its_eventgroups;
251 
252     std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
253     for (auto e : eventgroups_) {
254         if (e.second.find(_client) != e.second.end())
255             its_eventgroups.insert(e.first);
256     }
257     return its_eventgroups;
258 }
259 
add_eventgroup(eventgroup_t _eventgroup)260 void event::add_eventgroup(eventgroup_t _eventgroup) {
261     std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
262     if (eventgroups_.find(_eventgroup) == eventgroups_.end())
263         eventgroups_[_eventgroup] = std::set<client_t>();
264 }
265 
set_eventgroups(const std::set<eventgroup_t> & _eventgroups)266 void event::set_eventgroups(const std::set<eventgroup_t> &_eventgroups) {
267     std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
268     for (auto e : _eventgroups)
269         eventgroups_[e] = std::set<client_t>();
270 }
271 
update_cbk(boost::system::error_code const & _error)272 void event::update_cbk(boost::system::error_code const &_error) {
273     if (!_error) {
274         std::lock_guard<std::mutex> its_lock(mutex_);
275         cycle_timer_.expires_from_now(cycle_);
276         notify();
277         auto its_handler =
278                 std::bind(&event::update_cbk, shared_from_this(),
279                         std::placeholders::_1);
280         cycle_timer_.async_wait(its_handler);
281     }
282 }
283 
notify()284 void event::notify() {
285     if (is_set_) {
286         message_->set_session(routing_->get_session());
287         routing_->send(VSOMEIP_ROUTING_CLIENT, message_);
288     } else {
289         VSOMEIP_INFO << __func__
290                 << ": Notifying "
291                 << std::hex << std::setw(4) << std::setfill('0')
292                 << get_service() << "." << get_instance() << "." << get_event()
293                 << " failed. Event payload not (yet) set!";
294     }
295 }
296 
notify_one(client_t _client,const std::shared_ptr<endpoint_definition> & _target)297 void event::notify_one(client_t _client,
298         const std::shared_ptr<endpoint_definition> &_target) {
299     if (_target) {
300         std::lock_guard<std::mutex> its_lock(mutex_);
301         notify_one_unlocked(_client, _target);
302     } else {
303         VSOMEIP_WARNING << __func__
304                 << ": Notifying "
305                 << std::hex << std::setw(4) << std::setfill('0')
306                 << get_service() << "." << get_instance() << "." << get_event()
307                 << " failed. Target undefined";
308     }
309 }
310 
notify_one_unlocked(client_t _client,const std::shared_ptr<endpoint_definition> & _target)311 void event::notify_one_unlocked(client_t _client,
312         const std::shared_ptr<endpoint_definition> &_target) {
313     if (_target) {
314         if (is_set_) {
315             message_->set_session(routing_->get_session());
316             routing_->send_to(_client, _target, message_);
317         } else {
318             VSOMEIP_INFO << __func__
319                     << ": Notifying "
320                     << std::hex << std::setw(4) << std::setfill('0')
321                     << get_service() << "." << get_instance() << "." << get_event()
322                     << " failed. Event payload not (yet) set!";
323             pending_.insert(_target);
324         }
325     } else {
326         VSOMEIP_WARNING << __func__
327                 << ": Notifying "
328                 << std::hex << std::setw(4) << std::setfill('0')
329                 << get_service() << "." << get_instance() << "." << get_event()
330                 << " failed. Target undefined";
331     }
332 }
333 
notify_one(client_t _client)334 void event::notify_one(client_t _client) {
335     std::lock_guard<std::mutex> its_lock(mutex_);
336     notify_one_unlocked(_client);
337 }
338 
notify_one_unlocked(client_t _client)339 void event::notify_one_unlocked(client_t _client) {
340     if (is_set_) {
341         message_->set_session(routing_->get_session());
342         routing_->send(_client, message_);
343     } else {
344         VSOMEIP_INFO << __func__
345                 << ": Notifying "
346                 << std::hex << std::setw(4) << std::setfill('0')
347                 << get_service() << "." << get_instance() << "." << get_event()
348                 << " to client " << _client
349                 << " failed. Event payload not set!";
350     }
351 }
352 
set_payload_helper(const std::shared_ptr<payload> & _payload,bool _force)353 bool event::set_payload_helper(const std::shared_ptr<payload> &_payload, bool _force) {
354     std::shared_ptr<payload> its_payload = message_->get_payload();
355     bool is_change(type_ != event_type_e::ET_FIELD);
356     if (!is_change) {
357         is_change = _force || epsilon_change_func_(its_payload, _payload);
358     }
359     return is_change;
360 }
361 
reset_payload(const std::shared_ptr<payload> & _payload)362 void event::reset_payload(const std::shared_ptr<payload> &_payload) {
363     std::shared_ptr<payload> its_new_payload
364         = runtime::get()->create_payload(
365               _payload->get_data(), _payload->get_length());
366     message_->set_payload(its_new_payload);
367 
368     if (!is_set_)
369         start_cycle();
370 
371     is_set_ = true;
372 }
373 
add_ref(client_t _client,bool _is_provided)374 void event::add_ref(client_t _client, bool _is_provided) {
375     std::lock_guard<std::mutex> its_lock(refs_mutex_);
376     auto its_client = refs_.find(_client);
377     if (its_client == refs_.end()) {
378         refs_[_client][_is_provided] = 1;
379     } else {
380         auto its_provided = its_client->second.find(_is_provided);
381         if (its_provided == its_client->second.end()) {
382             refs_[_client][_is_provided] = 1;
383         } else {
384             its_provided->second++;
385         }
386     }
387 }
388 
remove_ref(client_t _client,bool _is_provided)389 void event::remove_ref(client_t _client, bool _is_provided) {
390     std::lock_guard<std::mutex> its_lock(refs_mutex_);
391     auto its_client = refs_.find(_client);
392     if (its_client != refs_.end()) {
393         auto its_provided = its_client->second.find(_is_provided);
394         if (its_provided != its_client->second.end()) {
395             its_provided->second--;
396             if (0 == its_provided->second) {
397                 its_client->second.erase(_is_provided);
398                 if (0 == its_client->second.size()) {
399                     refs_.erase(_client);
400                 }
401             }
402         }
403     }
404 }
405 
has_ref()406 bool event::has_ref() {
407     std::lock_guard<std::mutex> its_lock(refs_mutex_);
408     return refs_.size() != 0;
409 }
410 
add_subscriber(eventgroup_t _eventgroup,client_t _client,bool _force)411 bool event::add_subscriber(eventgroup_t _eventgroup, client_t _client, bool _force) {
412     std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
413     bool ret = false;
414     if (_force // remote events managed by rm_impl
415             || is_provided_ // events provided by rm_proxies
416             || is_shadow_ // local events managed by rm_impl
417             || is_cache_placeholder_) {
418         ret = eventgroups_[_eventgroup].insert(_client).second;
419     } else {
420         VSOMEIP_WARNING << __func__ << ": Didnt' insert client "
421                 << std::hex << std::setw(4) << std::setfill('0')
422                 << _client
423                 << " to eventgroup "
424                 << std::hex << std::setw(4) << std::setfill('0')
425                 << get_service() << "." << get_instance() << "."
426                 << _eventgroup;
427     }
428     return ret;
429 }
430 
remove_subscriber(eventgroup_t _eventgroup,client_t _client)431 void event::remove_subscriber(eventgroup_t _eventgroup, client_t _client) {
432     std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
433     auto find_eventgroup = eventgroups_.find(_eventgroup);
434     if (find_eventgroup != eventgroups_.end())
435         find_eventgroup->second.erase(_client);
436 }
437 
has_subscriber(eventgroup_t _eventgroup,client_t _client)438 bool event::has_subscriber(eventgroup_t _eventgroup, client_t _client) {
439     std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
440     auto find_eventgroup = eventgroups_.find(_eventgroup);
441     if (find_eventgroup != eventgroups_.end()) {
442         if (_client == ANY_CLIENT) {
443             return (find_eventgroup->second.size() > 0);
444         } else {
445             return (find_eventgroup->second.find(_client)
446                     != find_eventgroup->second.end());
447         }
448     }
449     return false;
450 }
451 
get_subscribers()452 std::set<client_t> event::get_subscribers() {
453     std::set<client_t> its_subscribers;
454     std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
455     for (const auto &e : eventgroups_)
456         its_subscribers.insert(e.second.begin(), e.second.end());
457     return its_subscribers;
458 }
459 
clear_subscribers()460 void event::clear_subscribers() {
461     std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
462     for (auto &e : eventgroups_)
463         e.second.clear();
464 }
465 
has_ref(client_t _client,bool _is_provided)466 bool event::has_ref(client_t _client, bool _is_provided) {
467     std::lock_guard<std::mutex> its_lock(refs_mutex_);
468     auto its_client = refs_.find(_client);
469     if (its_client != refs_.end()) {
470         auto its_provided = its_client->second.find(_is_provided);
471         if (its_provided != its_client->second.end()) {
472             if(its_provided->second > 0) {
473                 return true;
474             }
475         }
476     }
477     return false;
478 }
479 
is_shadow() const480 bool event::is_shadow() const {
481     return is_shadow_;
482 }
483 
set_shadow(bool _shadow)484 void event::set_shadow(bool _shadow) {
485     is_shadow_ = _shadow;
486 }
487 
is_cache_placeholder() const488 bool event::is_cache_placeholder() const {
489     return is_cache_placeholder_;
490 }
491 
set_cache_placeholder(bool _is_cache_place_holder)492 void event::set_cache_placeholder(bool _is_cache_place_holder) {
493     is_cache_placeholder_ = _is_cache_place_holder;
494 }
495 
start_cycle()496 void event::start_cycle() {
497     if (std::chrono::milliseconds::zero() != cycle_) {
498         cycle_timer_.expires_from_now(cycle_);
499         auto its_handler =
500                 std::bind(&event::update_cbk, shared_from_this(),
501                         std::placeholders::_1);
502         cycle_timer_.async_wait(its_handler);
503     }
504 }
505 
stop_cycle()506 void event::stop_cycle() {
507     if (std::chrono::milliseconds::zero() != cycle_) {
508         boost::system::error_code ec;
509         cycle_timer_.cancel(ec);
510     }
511 }
512 
compare(const std::shared_ptr<payload> & _lhs,const std::shared_ptr<payload> & _rhs) const513 bool event::compare(const std::shared_ptr<payload> &_lhs,
514         const std::shared_ptr<payload> &_rhs) const {
515     bool is_change = (_lhs->get_length() != _rhs->get_length());
516     if (!is_change) {
517         std::size_t its_pos = 0;
518         const byte_t *its_old_data = _lhs->get_data();
519         const byte_t *its_new_data = _rhs->get_data();
520         while (!is_change && its_pos < _lhs->get_length()) {
521             is_change = (*its_old_data++ != *its_new_data++);
522             its_pos++;
523         }
524     }
525     return is_change;
526 }
527 
get_subscribers(eventgroup_t _eventgroup)528 std::set<client_t> event::get_subscribers(eventgroup_t _eventgroup) {
529     std::set<client_t> its_subscribers;
530     std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
531     auto found_eventgroup = eventgroups_.find(_eventgroup);
532     if (found_eventgroup != eventgroups_.end()) {
533         its_subscribers = found_eventgroup->second;
534     }
535     return its_subscribers;
536 }
537 
is_subscribed(client_t _client)538 bool event::is_subscribed(client_t _client) {
539     std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
540     for (const auto &egp : eventgroups_) {
541         if (egp.second.find(_client) != egp.second.end()) {
542             return true;
543         }
544     }
545     return false;
546 }
547 
548 reliability_type_e
get_reliability() const549 event::get_reliability() const {
550     return reliability_;
551 }
552 
553 void
set_reliability(const reliability_type_e _reliability)554 event::set_reliability(const reliability_type_e _reliability) {
555     reliability_ = _reliability;
556 }
557 
558 void
remove_pending(const std::shared_ptr<endpoint_definition> & _target)559 event::remove_pending(const std::shared_ptr<endpoint_definition> &_target) {
560     std::lock_guard<std::mutex> its_lock(mutex_);
561     pending_.erase(_target);
562 }
563 
564 } // namespace vsomeip_v3
565