1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <iomanip>
7 #include <sstream>
8
9 #include <vsomeip/constants.hpp>
10 #include <vsomeip/defines.hpp>
11 #include <vsomeip/message.hpp>
12 #include <vsomeip/payload.hpp>
13 #include <vsomeip/runtime.hpp>
14 #include <vsomeip/internal/logger.hpp>
15
16 #include "../include/event.hpp"
17 #include "../include/routing_manager.hpp"
18 #include "../../message/include/payload_impl.hpp"
19
20 #include "../../endpoints/include/endpoint_definition.hpp"
21
22 namespace vsomeip_v3 {
23
event(routing_manager * _routing,bool _is_shadow)24 event::event(routing_manager *_routing, bool _is_shadow) :
25 routing_(_routing),
26 message_(runtime::get()->create_notification()),
27 type_(event_type_e::ET_EVENT),
28 cycle_timer_(_routing->get_io()),
29 cycle_(std::chrono::milliseconds::zero()),
30 change_resets_cycle_(false),
31 is_updating_on_change_(true),
32 is_set_(false),
33 is_provided_(false),
34 is_shadow_(_is_shadow),
35 is_cache_placeholder_(false),
36 epsilon_change_func_(std::bind(&event::compare, this,
37 std::placeholders::_1, std::placeholders::_2)),
38 reliability_(reliability_type_e::RT_UNKNOWN) {
39 }
40
get_service() const41 service_t event::get_service() const {
42 return (message_->get_service());
43 }
44
set_service(service_t _service)45 void event::set_service(service_t _service) {
46 message_->set_service(_service);
47 }
48
get_instance() const49 instance_t event::get_instance() const {
50 return (message_->get_instance());
51 }
52
set_instance(instance_t _instance)53 void event::set_instance(instance_t _instance) {
54 message_->set_instance(_instance);
55 }
56
get_version() const57 major_version_t event::get_version() const {
58 return message_->get_interface_version();
59 }
60
set_version(major_version_t _major)61 void event::set_version(major_version_t _major) {
62 message_->set_interface_version(_major);
63 }
64
get_event() const65 event_t event::get_event() const {
66 return (message_->get_method());
67 }
68
set_event(event_t _event)69 void event::set_event(event_t _event) {
70 message_->set_method(_event);
71 }
72
get_type() const73 event_type_e event::get_type() const {
74 return (type_);
75 }
76
set_type(const event_type_e _type)77 void event::set_type(const event_type_e _type) {
78 type_ = _type;
79 }
80
is_field() const81 bool event::is_field() const {
82 return (type_ == event_type_e::ET_FIELD);
83 }
84
is_provided() const85 bool event::is_provided() const {
86 return (is_provided_);
87 }
88
set_provided(bool _is_provided)89 void event::set_provided(bool _is_provided) {
90 is_provided_ = _is_provided;
91 }
92
is_set() const93 bool event::is_set() const {
94 return is_set_;
95 }
96
get_payload() const97 const std::shared_ptr<payload> event::get_payload() const {
98 std::lock_guard<std::mutex> its_lock(mutex_);
99 return (message_->get_payload());
100 }
101
set_payload_dont_notify(const std::shared_ptr<payload> & _payload)102 bool event::set_payload_dont_notify(const std::shared_ptr<payload> &_payload) {
103 std::lock_guard<std::mutex> its_lock(mutex_);
104 if (is_cache_placeholder_) {
105 reset_payload(_payload);
106 is_set_ = true;
107 } else {
108 if (set_payload_helper(_payload, false)) {
109 reset_payload(_payload);
110 } else {
111 return false;
112 }
113 }
114 return true;
115 }
116
set_payload(const std::shared_ptr<payload> & _payload,bool _force)117 void event::set_payload(const std::shared_ptr<payload> &_payload, bool _force) {
118 std::lock_guard<std::mutex> its_lock(mutex_);
119 if (is_provided_) {
120 if (set_payload_helper(_payload, _force)) {
121 reset_payload(_payload);
122 if (is_updating_on_change_) {
123 if (change_resets_cycle_)
124 stop_cycle();
125
126 notify();
127
128 if (change_resets_cycle_)
129 start_cycle();
130 }
131 }
132 } else {
133 VSOMEIP_INFO << "Can't set payload for event "
134 << std::hex << std::setw(4) << std::setfill('0')
135 << get_service() << "." << get_instance() << "." << get_event()
136 << " as it isn't provided";
137 }
138 }
139
set_payload(const std::shared_ptr<payload> & _payload,client_t _client,bool _force)140 void event::set_payload(const std::shared_ptr<payload> &_payload, client_t _client,
141 bool _force) {
142 std::lock_guard<std::mutex> its_lock(mutex_);
143 if (is_provided_) {
144 if (set_payload_helper(_payload, _force)) {
145 reset_payload(_payload);
146 if (is_updating_on_change_) {
147 notify_one_unlocked(_client);
148 }
149 }
150 } else {
151 VSOMEIP_INFO << "Can't set payload for event "
152 << std::hex << std::setw(4) << std::setfill('0')
153 << get_service() << "." << get_instance() << "." << get_event()
154 << ". It isn't provided";
155 }
156 }
157
set_payload(const std::shared_ptr<payload> & _payload,const client_t _client,const std::shared_ptr<endpoint_definition> & _target,bool _force)158 void event::set_payload(const std::shared_ptr<payload> &_payload,
159 const client_t _client,
160 const std::shared_ptr<endpoint_definition>& _target,
161 bool _force) {
162 std::lock_guard<std::mutex> its_lock(mutex_);
163 if (is_provided_) {
164 if (set_payload_helper(_payload, _force)) {
165 reset_payload(_payload);
166 if (is_updating_on_change_) {
167 notify_one_unlocked(_client, _target);
168 }
169 }
170 } else {
171 VSOMEIP_INFO << "Can't set payload for event "
172 << std::hex << std::setw(4) << std::setfill('0')
173 << get_service() << "." << get_instance() << "." << get_event()
174 << ". It isn't provided";
175 }
176 }
177
set_payload_notify_pending(const std::shared_ptr<payload> & _payload)178 bool event::set_payload_notify_pending(const std::shared_ptr<payload> &_payload) {
179 std::lock_guard<std::mutex> its_lock(mutex_);
180 if (!is_set_ && is_provided_) {
181 reset_payload(_payload);
182
183 // Send pending initial events.
184 for (const auto &its_target : pending_) {
185 message_->set_session(routing_->get_session());
186 routing_->send_to(VSOMEIP_ROUTING_CLIENT,
187 its_target, message_);
188 }
189 pending_.clear();
190
191 return true;
192 }
193
194 return false;
195 }
196
unset_payload(bool _force)197 void event::unset_payload(bool _force) {
198 std::lock_guard<std::mutex> its_lock(mutex_);
199 if (_force) {
200 is_set_ = false;
201 stop_cycle();
202 message_->set_payload(std::make_shared<payload_impl>());
203 } else {
204 if (is_provided_) {
205 is_set_ = false;
206 stop_cycle();
207 message_->set_payload(std::make_shared<payload_impl>());
208 }
209 }
210 }
211
set_update_cycle(std::chrono::milliseconds & _cycle)212 void event::set_update_cycle(std::chrono::milliseconds &_cycle) {
213 if (is_provided_) {
214 std::lock_guard<std::mutex> its_lock(mutex_);
215 stop_cycle();
216 cycle_ = _cycle;
217 start_cycle();
218 }
219 }
220
set_change_resets_cycle(bool _change_resets_cycle)221 void event::set_change_resets_cycle(bool _change_resets_cycle) {
222 change_resets_cycle_ = _change_resets_cycle;
223 }
224
set_update_on_change(bool _is_active)225 void event::set_update_on_change(bool _is_active) {
226 if (is_provided_) {
227 is_updating_on_change_ = _is_active;
228 }
229 }
230
set_epsilon_change_function(const epsilon_change_func_t & _epsilon_change_func)231 void event::set_epsilon_change_function(const epsilon_change_func_t &_epsilon_change_func) {
232 if (_epsilon_change_func) {
233 std::lock_guard<std::mutex> its_lock(mutex_);
234 epsilon_change_func_ = _epsilon_change_func;
235 }
236 }
237
get_eventgroups() const238 const std::set<eventgroup_t> event::get_eventgroups() const {
239 std::set<eventgroup_t> its_eventgroups;
240 {
241 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
242 for (const auto& e : eventgroups_) {
243 its_eventgroups.insert(e.first);
244 }
245 }
246 return its_eventgroups;
247 }
248
get_eventgroups(client_t _client) const249 std::set<eventgroup_t> event::get_eventgroups(client_t _client) const {
250 std::set<eventgroup_t> its_eventgroups;
251
252 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
253 for (auto e : eventgroups_) {
254 if (e.second.find(_client) != e.second.end())
255 its_eventgroups.insert(e.first);
256 }
257 return its_eventgroups;
258 }
259
add_eventgroup(eventgroup_t _eventgroup)260 void event::add_eventgroup(eventgroup_t _eventgroup) {
261 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
262 if (eventgroups_.find(_eventgroup) == eventgroups_.end())
263 eventgroups_[_eventgroup] = std::set<client_t>();
264 }
265
set_eventgroups(const std::set<eventgroup_t> & _eventgroups)266 void event::set_eventgroups(const std::set<eventgroup_t> &_eventgroups) {
267 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
268 for (auto e : _eventgroups)
269 eventgroups_[e] = std::set<client_t>();
270 }
271
update_cbk(boost::system::error_code const & _error)272 void event::update_cbk(boost::system::error_code const &_error) {
273 if (!_error) {
274 std::lock_guard<std::mutex> its_lock(mutex_);
275 cycle_timer_.expires_from_now(cycle_);
276 notify();
277 auto its_handler =
278 std::bind(&event::update_cbk, shared_from_this(),
279 std::placeholders::_1);
280 cycle_timer_.async_wait(its_handler);
281 }
282 }
283
notify()284 void event::notify() {
285 if (is_set_) {
286 message_->set_session(routing_->get_session());
287 routing_->send(VSOMEIP_ROUTING_CLIENT, message_);
288 } else {
289 VSOMEIP_INFO << __func__
290 << ": Notifying "
291 << std::hex << std::setw(4) << std::setfill('0')
292 << get_service() << "." << get_instance() << "." << get_event()
293 << " failed. Event payload not (yet) set!";
294 }
295 }
296
notify_one(client_t _client,const std::shared_ptr<endpoint_definition> & _target)297 void event::notify_one(client_t _client,
298 const std::shared_ptr<endpoint_definition> &_target) {
299 if (_target) {
300 std::lock_guard<std::mutex> its_lock(mutex_);
301 notify_one_unlocked(_client, _target);
302 } else {
303 VSOMEIP_WARNING << __func__
304 << ": Notifying "
305 << std::hex << std::setw(4) << std::setfill('0')
306 << get_service() << "." << get_instance() << "." << get_event()
307 << " failed. Target undefined";
308 }
309 }
310
notify_one_unlocked(client_t _client,const std::shared_ptr<endpoint_definition> & _target)311 void event::notify_one_unlocked(client_t _client,
312 const std::shared_ptr<endpoint_definition> &_target) {
313 if (_target) {
314 if (is_set_) {
315 message_->set_session(routing_->get_session());
316 routing_->send_to(_client, _target, message_);
317 } else {
318 VSOMEIP_INFO << __func__
319 << ": Notifying "
320 << std::hex << std::setw(4) << std::setfill('0')
321 << get_service() << "." << get_instance() << "." << get_event()
322 << " failed. Event payload not (yet) set!";
323 pending_.insert(_target);
324 }
325 } else {
326 VSOMEIP_WARNING << __func__
327 << ": Notifying "
328 << std::hex << std::setw(4) << std::setfill('0')
329 << get_service() << "." << get_instance() << "." << get_event()
330 << " failed. Target undefined";
331 }
332 }
333
notify_one(client_t _client)334 void event::notify_one(client_t _client) {
335 std::lock_guard<std::mutex> its_lock(mutex_);
336 notify_one_unlocked(_client);
337 }
338
notify_one_unlocked(client_t _client)339 void event::notify_one_unlocked(client_t _client) {
340 if (is_set_) {
341 message_->set_session(routing_->get_session());
342 routing_->send(_client, message_);
343 } else {
344 VSOMEIP_INFO << __func__
345 << ": Notifying "
346 << std::hex << std::setw(4) << std::setfill('0')
347 << get_service() << "." << get_instance() << "." << get_event()
348 << " to client " << _client
349 << " failed. Event payload not set!";
350 }
351 }
352
set_payload_helper(const std::shared_ptr<payload> & _payload,bool _force)353 bool event::set_payload_helper(const std::shared_ptr<payload> &_payload, bool _force) {
354 std::shared_ptr<payload> its_payload = message_->get_payload();
355 bool is_change(type_ != event_type_e::ET_FIELD);
356 if (!is_change) {
357 is_change = _force || epsilon_change_func_(its_payload, _payload);
358 }
359 return is_change;
360 }
361
reset_payload(const std::shared_ptr<payload> & _payload)362 void event::reset_payload(const std::shared_ptr<payload> &_payload) {
363 std::shared_ptr<payload> its_new_payload
364 = runtime::get()->create_payload(
365 _payload->get_data(), _payload->get_length());
366 message_->set_payload(its_new_payload);
367
368 if (!is_set_)
369 start_cycle();
370
371 is_set_ = true;
372 }
373
add_ref(client_t _client,bool _is_provided)374 void event::add_ref(client_t _client, bool _is_provided) {
375 std::lock_guard<std::mutex> its_lock(refs_mutex_);
376 auto its_client = refs_.find(_client);
377 if (its_client == refs_.end()) {
378 refs_[_client][_is_provided] = 1;
379 } else {
380 auto its_provided = its_client->second.find(_is_provided);
381 if (its_provided == its_client->second.end()) {
382 refs_[_client][_is_provided] = 1;
383 } else {
384 its_provided->second++;
385 }
386 }
387 }
388
remove_ref(client_t _client,bool _is_provided)389 void event::remove_ref(client_t _client, bool _is_provided) {
390 std::lock_guard<std::mutex> its_lock(refs_mutex_);
391 auto its_client = refs_.find(_client);
392 if (its_client != refs_.end()) {
393 auto its_provided = its_client->second.find(_is_provided);
394 if (its_provided != its_client->second.end()) {
395 its_provided->second--;
396 if (0 == its_provided->second) {
397 its_client->second.erase(_is_provided);
398 if (0 == its_client->second.size()) {
399 refs_.erase(_client);
400 }
401 }
402 }
403 }
404 }
405
has_ref()406 bool event::has_ref() {
407 std::lock_guard<std::mutex> its_lock(refs_mutex_);
408 return refs_.size() != 0;
409 }
410
add_subscriber(eventgroup_t _eventgroup,client_t _client,bool _force)411 bool event::add_subscriber(eventgroup_t _eventgroup, client_t _client, bool _force) {
412 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
413 bool ret = false;
414 if (_force // remote events managed by rm_impl
415 || is_provided_ // events provided by rm_proxies
416 || is_shadow_ // local events managed by rm_impl
417 || is_cache_placeholder_) {
418 ret = eventgroups_[_eventgroup].insert(_client).second;
419 } else {
420 VSOMEIP_WARNING << __func__ << ": Didnt' insert client "
421 << std::hex << std::setw(4) << std::setfill('0')
422 << _client
423 << " to eventgroup "
424 << std::hex << std::setw(4) << std::setfill('0')
425 << get_service() << "." << get_instance() << "."
426 << _eventgroup;
427 }
428 return ret;
429 }
430
remove_subscriber(eventgroup_t _eventgroup,client_t _client)431 void event::remove_subscriber(eventgroup_t _eventgroup, client_t _client) {
432 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
433 auto find_eventgroup = eventgroups_.find(_eventgroup);
434 if (find_eventgroup != eventgroups_.end())
435 find_eventgroup->second.erase(_client);
436 }
437
has_subscriber(eventgroup_t _eventgroup,client_t _client)438 bool event::has_subscriber(eventgroup_t _eventgroup, client_t _client) {
439 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
440 auto find_eventgroup = eventgroups_.find(_eventgroup);
441 if (find_eventgroup != eventgroups_.end()) {
442 if (_client == ANY_CLIENT) {
443 return (find_eventgroup->second.size() > 0);
444 } else {
445 return (find_eventgroup->second.find(_client)
446 != find_eventgroup->second.end());
447 }
448 }
449 return false;
450 }
451
get_subscribers()452 std::set<client_t> event::get_subscribers() {
453 std::set<client_t> its_subscribers;
454 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
455 for (const auto &e : eventgroups_)
456 its_subscribers.insert(e.second.begin(), e.second.end());
457 return its_subscribers;
458 }
459
clear_subscribers()460 void event::clear_subscribers() {
461 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
462 for (auto &e : eventgroups_)
463 e.second.clear();
464 }
465
has_ref(client_t _client,bool _is_provided)466 bool event::has_ref(client_t _client, bool _is_provided) {
467 std::lock_guard<std::mutex> its_lock(refs_mutex_);
468 auto its_client = refs_.find(_client);
469 if (its_client != refs_.end()) {
470 auto its_provided = its_client->second.find(_is_provided);
471 if (its_provided != its_client->second.end()) {
472 if(its_provided->second > 0) {
473 return true;
474 }
475 }
476 }
477 return false;
478 }
479
is_shadow() const480 bool event::is_shadow() const {
481 return is_shadow_;
482 }
483
set_shadow(bool _shadow)484 void event::set_shadow(bool _shadow) {
485 is_shadow_ = _shadow;
486 }
487
is_cache_placeholder() const488 bool event::is_cache_placeholder() const {
489 return is_cache_placeholder_;
490 }
491
set_cache_placeholder(bool _is_cache_place_holder)492 void event::set_cache_placeholder(bool _is_cache_place_holder) {
493 is_cache_placeholder_ = _is_cache_place_holder;
494 }
495
start_cycle()496 void event::start_cycle() {
497 if (std::chrono::milliseconds::zero() != cycle_) {
498 cycle_timer_.expires_from_now(cycle_);
499 auto its_handler =
500 std::bind(&event::update_cbk, shared_from_this(),
501 std::placeholders::_1);
502 cycle_timer_.async_wait(its_handler);
503 }
504 }
505
stop_cycle()506 void event::stop_cycle() {
507 if (std::chrono::milliseconds::zero() != cycle_) {
508 boost::system::error_code ec;
509 cycle_timer_.cancel(ec);
510 }
511 }
512
compare(const std::shared_ptr<payload> & _lhs,const std::shared_ptr<payload> & _rhs) const513 bool event::compare(const std::shared_ptr<payload> &_lhs,
514 const std::shared_ptr<payload> &_rhs) const {
515 bool is_change = (_lhs->get_length() != _rhs->get_length());
516 if (!is_change) {
517 std::size_t its_pos = 0;
518 const byte_t *its_old_data = _lhs->get_data();
519 const byte_t *its_new_data = _rhs->get_data();
520 while (!is_change && its_pos < _lhs->get_length()) {
521 is_change = (*its_old_data++ != *its_new_data++);
522 its_pos++;
523 }
524 }
525 return is_change;
526 }
527
get_subscribers(eventgroup_t _eventgroup)528 std::set<client_t> event::get_subscribers(eventgroup_t _eventgroup) {
529 std::set<client_t> its_subscribers;
530 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
531 auto found_eventgroup = eventgroups_.find(_eventgroup);
532 if (found_eventgroup != eventgroups_.end()) {
533 its_subscribers = found_eventgroup->second;
534 }
535 return its_subscribers;
536 }
537
is_subscribed(client_t _client)538 bool event::is_subscribed(client_t _client) {
539 std::lock_guard<std::mutex> its_lock(eventgroups_mutex_);
540 for (const auto &egp : eventgroups_) {
541 if (egp.second.find(_client) != egp.second.end()) {
542 return true;
543 }
544 }
545 return false;
546 }
547
548 reliability_type_e
get_reliability() const549 event::get_reliability() const {
550 return reliability_;
551 }
552
553 void
set_reliability(const reliability_type_e _reliability)554 event::set_reliability(const reliability_type_e _reliability) {
555 reliability_ = _reliability;
556 }
557
558 void
remove_pending(const std::shared_ptr<endpoint_definition> & _target)559 event::remove_pending(const std::shared_ptr<endpoint_definition> &_target) {
560 std::lock_guard<std::mutex> its_lock(mutex_);
561 pending_.erase(_target);
562 }
563
564 } // namespace vsomeip_v3
565