1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <chrono>
7 #include <iomanip>
8 #include <sstream>
9 #include <thread>
10 #include <limits>
11
12 #include <boost/asio/buffer.hpp>
13 #include <boost/asio/ip/tcp.hpp>
14 #include <boost/asio/ip/udp.hpp>
15 #include <boost/asio/local/stream_protocol.hpp>
16
17 #include <vsomeip/defines.hpp>
18 #include <vsomeip/internal/logger.hpp>
19
20 #include "../include/client_endpoint_impl.hpp"
21 #include "../include/endpoint_host.hpp"
22 #include "../../utility/include/utility.hpp"
23 #include "../../utility/include/byteorder.hpp"
24
25 namespace vsomeip_v3 {
26
27 template<typename Protocol>
client_endpoint_impl(const std::shared_ptr<endpoint_host> & _endpoint_host,const std::shared_ptr<routing_host> & _routing_host,const endpoint_type & _local,const endpoint_type & _remote,boost::asio::io_service & _io,std::uint32_t _max_message_size,configuration::endpoint_queue_limit_t _queue_limit,const std::shared_ptr<configuration> & _configuration)28 client_endpoint_impl<Protocol>::client_endpoint_impl(
29 const std::shared_ptr<endpoint_host>& _endpoint_host,
30 const std::shared_ptr<routing_host>& _routing_host,
31 const endpoint_type& _local,
32 const endpoint_type& _remote,
33 boost::asio::io_service &_io,
34 std::uint32_t _max_message_size,
35 configuration::endpoint_queue_limit_t _queue_limit,
36 const std::shared_ptr<configuration>& _configuration)
37 : endpoint_impl<Protocol>(_endpoint_host, _routing_host, _local, _io,
38 _max_message_size, _queue_limit, _configuration),
39 socket_(new socket_type(_io)), remote_(_remote),
40 flush_timer_(_io), connect_timer_(_io),
41 connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable
42 state_(cei_state_e::CLOSED),
43 reconnect_counter_(0),
44 train_(_io),
45 queue_size_(0),
46 was_not_connected_(false),
47 local_port_(0),
48 strand_(_io) {
49 }
50
51 template<typename Protocol>
~client_endpoint_impl()52 client_endpoint_impl<Protocol>::~client_endpoint_impl() {
53 }
54
55 template<typename Protocol>
is_client() const56 bool client_endpoint_impl<Protocol>::is_client() const {
57 return true;
58 }
59
60 template<typename Protocol>
is_established() const61 bool client_endpoint_impl<Protocol>::is_established() const {
62 return state_ == cei_state_e::ESTABLISHED;
63 }
64
65 template<typename Protocol>
is_established_or_connected() const66 bool client_endpoint_impl<Protocol>::is_established_or_connected() const {
67 return (state_ == cei_state_e::ESTABLISHED
68 || state_ == cei_state_e::CONNECTED);
69 }
70
71 template<typename Protocol>
set_established(bool _established)72 void client_endpoint_impl<Protocol>::set_established(bool _established) {
73 if (_established) {
74 if (state_ != cei_state_e::CONNECTING) {
75 std::lock_guard<std::mutex> its_lock(socket_mutex_);
76 if (socket_->is_open()) {
77 state_ = cei_state_e::ESTABLISHED;
78 } else {
79 state_ = cei_state_e::CLOSED;
80 }
81 }
82 } else {
83 state_ = cei_state_e::CLOSED;
84 }
85 }
86
87 template<typename Protocol>
set_connected(bool _connected)88 void client_endpoint_impl<Protocol>::set_connected(bool _connected) {
89 if (_connected) {
90 std::lock_guard<std::mutex> its_lock(socket_mutex_);
91 if (socket_->is_open()) {
92 state_ = cei_state_e::CONNECTED;
93 } else {
94 state_ = cei_state_e::CLOSED;
95 }
96 } else {
97 state_ = cei_state_e::CLOSED;
98 }
99 }
100
101 template<typename Protocol>
prepare_stop(endpoint::prepare_stop_handler_t _handler,service_t _service)102 void client_endpoint_impl<Protocol>::prepare_stop(
103 endpoint::prepare_stop_handler_t _handler, service_t _service) {
104 (void) _handler;
105 (void) _service;
106 }
107
108 template<typename Protocol>
stop()109 void client_endpoint_impl<Protocol>::stop() {
110 {
111 std::lock_guard<std::mutex> its_lock(mutex_);
112 endpoint_impl<Protocol>::sending_blocked_ = true;
113 // delete unsent messages
114 queue_.clear();
115 queue_size_ = 0;
116 }
117 {
118 std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
119 boost::system::error_code ec;
120 connect_timer_.cancel(ec);
121 }
122 connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
123
124 // bind to strand as stop() might be called from different thread
125 strand_.dispatch(std::bind(&client_endpoint_impl::shutdown_and_close_socket,
126 this->shared_from_this(),
127 false)
128 );
129 }
130
131 template<typename Protocol>
get_front()132 message_buffer_ptr_t client_endpoint_impl<Protocol>::get_front() {
133 message_buffer_ptr_t its_buffer;
134 if (queue_.size())
135 its_buffer = queue_.front();
136
137 return (its_buffer);
138 }
139
140
141 template<typename Protocol>
send_to(const std::shared_ptr<endpoint_definition> _target,const byte_t * _data,uint32_t _size)142 bool client_endpoint_impl<Protocol>::send_to(
143 const std::shared_ptr<endpoint_definition> _target, const byte_t *_data,
144 uint32_t _size) {
145 (void)_target;
146 (void)_data;
147 (void)_size;
148 VSOMEIP_ERROR << "Clients endpoints must not be used to "
149 << "send to explicitely specified targets";
150 return false;
151 }
152
153 template<typename Protocol>
send_error(const std::shared_ptr<endpoint_definition> _target,const byte_t * _data,uint32_t _size)154 bool client_endpoint_impl<Protocol>::send_error(
155 const std::shared_ptr<endpoint_definition> _target, const byte_t *_data,
156 uint32_t _size) {
157 (void)_target;
158 (void)_data;
159 (void)_size;
160 VSOMEIP_ERROR << "Clients endpoints must not be used to "
161 << "send errors to explicitly specified targets";
162 return false;
163 }
164
165
166 template<typename Protocol>
send(const uint8_t * _data,uint32_t _size)167 bool client_endpoint_impl<Protocol>::send(const uint8_t *_data, uint32_t _size) {
168 std::lock_guard<std::mutex> its_lock(mutex_);
169 bool must_depart(false);
170 const bool queue_size_zero_on_entry(queue_.empty());
171 #if 0
172 std::stringstream msg;
173 msg << "cei::send: ";
174 for (uint32_t i = 0; i < _size; i++)
175 msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " ";
176 VSOMEIP_DEBUG << msg.str();
177 #endif
178
179 if (endpoint_impl<Protocol>::sending_blocked_ ||
180 !check_queue_limit(_data, _size)) {
181 return false;
182 }
183 switch (check_message_size(_data, _size)) {
184 case endpoint_impl<Protocol>::cms_ret_e::MSG_WAS_SPLIT:
185 return true;
186 break;
187 case endpoint_impl<Protocol>::cms_ret_e::MSG_TOO_BIG:
188 return false;
189 break;
190 case endpoint_impl<Protocol>::cms_ret_e::MSG_OK:
191 default:
192 break;
193 }
194
195 // STEP 1: Determine elapsed time and update the departure time and cancel the departure timer
196 train_.update_departure_time_and_stop_departure();
197
198 // STEP 3: Get configured timings
199 const service_t its_service = VSOMEIP_BYTES_TO_WORD(
200 _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]);
201 const method_t its_method = VSOMEIP_BYTES_TO_WORD(
202 _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]);
203 std::chrono::nanoseconds its_debouncing(0), its_retention(0);
204 get_configured_times_from_endpoint(its_service, its_method,
205 &its_debouncing, &its_retention);
206
207 // STEP 4: Check if the passenger enters an empty train
208 const std::pair<service_t, method_t> its_identifier = std::make_pair(
209 its_service, its_method);
210 if (train_.passengers_.empty()) {
211 train_.departure_ = its_retention;
212 } else {
213 // STEP 4.1: Check whether the current train already contains the message
214 if (train_.passengers_.end() != train_.passengers_.find(its_identifier)) {
215 must_depart = true;
216 } else {
217 // STEP 5: Check whether the current message fits into the current train
218 if (train_.buffer_->size() + _size > endpoint_impl<Protocol>::max_message_size_) {
219 must_depart = true;
220 } else {
221 // STEP 6: Check debouncing time
222 if (its_debouncing > train_.minimal_max_retention_time_) {
223 // train's latest departure would already undershot new
224 // passenger's debounce time
225 must_depart = true;
226 } else {
227 if (its_debouncing > train_.departure_) {
228 // train departs earlier as the new passenger's debounce
229 // time allows
230 must_depart = true;
231 } else {
232 // STEP 7: Check maximum retention time
233 if (its_retention < train_.minimal_debounce_time_) {
234 // train's earliest departure would already exceed
235 // the new passenger's retention time.
236 must_depart = true;
237 } else {
238 if (its_retention < train_.departure_) {
239 train_.departure_ = its_retention;
240 }
241 }
242 }
243 }
244 }
245 }
246 }
247
248 // STEP 8: if necessary, send current buffer and create a new one
249 if (must_depart) {
250 // STEP 8.1: check if debounce time would be undershot here if the train
251 // departs. Block sending until train is allowed to depart.
252 wait_until_debounce_time_reached();
253 train_.passengers_.clear();
254 queue_train(queue_size_zero_on_entry);
255 train_.last_departure_ = std::chrono::steady_clock::now();
256 train_.departure_ = its_retention;
257 train_.minimal_debounce_time_ = std::chrono::nanoseconds::max();
258 train_.minimal_max_retention_time_ = std::chrono::nanoseconds::max();
259 }
260
261 // STEP 9: insert current message buffer
262 train_.buffer_->insert(train_.buffer_->end(), _data, _data + _size);
263 train_.passengers_.insert(its_identifier);
264 // STEP 9.1: update the trains minimal debounce time if necessary
265 if (its_debouncing < train_.minimal_debounce_time_) {
266 train_.minimal_debounce_time_ = its_debouncing;
267 }
268 // STEP 9.2: update the trains minimal maximum retention time if necessary
269 if (its_retention < train_.minimal_max_retention_time_) {
270 train_.minimal_max_retention_time_ = its_retention;
271 }
272
273 // STEP 10: restart timer with current departure time
274 #ifndef _WIN32
275 train_.departure_timer_->expires_from_now(train_.departure_);
276 #else
277 train_.departure_timer_->expires_from_now(
278 std::chrono::duration_cast<
279 std::chrono::steady_clock::duration>(train_.departure_));
280 #endif
281 train_.departure_timer_->async_wait(
282 std::bind(&client_endpoint_impl<Protocol>::flush_cbk,
283 this->shared_from_this(), std::placeholders::_1));
284
285 return true;
286 }
287
288 template<typename Protocol>
send_segments(const tp::tp_split_messages_t & _segments)289 void client_endpoint_impl<Protocol>::send_segments(
290 const tp::tp_split_messages_t &_segments) {
291 if (_segments.size() == 0) {
292 return;
293 }
294 const bool queue_size_zero_on_entry(queue_.empty());
295
296 const service_t its_service = VSOMEIP_BYTES_TO_WORD(
297 (*(_segments[0]))[VSOMEIP_SERVICE_POS_MIN],
298 (*(_segments[0]))[VSOMEIP_SERVICE_POS_MAX]);
299 const method_t its_method = VSOMEIP_BYTES_TO_WORD(
300 (*(_segments[0]))[VSOMEIP_METHOD_POS_MIN],
301 (*(_segments[0]))[VSOMEIP_METHOD_POS_MAX]);
302 std::chrono::nanoseconds its_debouncing(0), its_retention(0);
303 get_configured_times_from_endpoint(its_service, its_method,
304 &its_debouncing, &its_retention);
305 // update the trains minimal debounce time if necessary
306 if (its_debouncing < train_.minimal_debounce_time_) {
307 train_.minimal_debounce_time_ = its_debouncing;
308 }
309 // update the trains minimal maximum retention time if necessary
310 if (its_retention < train_.minimal_max_retention_time_) {
311 train_.minimal_max_retention_time_ = its_retention;
312 }
313
314 // We only need to respect the debouncing. There is no need to wait for further
315 // messages as we will send several now anyway.
316 if (!train_.passengers_.empty()) {
317 wait_until_debounce_time_reached();
318 train_.passengers_.clear();
319 queue_train(queue_size_zero_on_entry);
320 train_.last_departure_ = std::chrono::steady_clock::now();
321 train_.departure_ = its_retention;
322 train_.minimal_debounce_time_ = std::chrono::nanoseconds::max();
323 train_.minimal_max_retention_time_ = std::chrono::nanoseconds::max();
324 }
325 const bool queue_size_still_zero(queue_.empty());
326 for (const auto& s : _segments) {
327 queue_.emplace_back(s);
328 queue_size_ += s->size();
329 }
330
331 if (queue_size_still_zero && !queue_.empty()) { // no writing in progress
332 // respect minimal debounce time
333 wait_until_debounce_time_reached();
334 // ignore retention time and send immediately as the train is full anyway
335 auto its_buffer = get_front();
336 if (its_buffer) {
337 strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
338 this->shared_from_this(), its_buffer));
339 }
340 }
341 train_.last_departure_ = std::chrono::steady_clock::now();
342 }
343
344 template<typename Protocol>
wait_until_debounce_time_reached() const345 void client_endpoint_impl<Protocol>::wait_until_debounce_time_reached() const {
346 const std::chrono::nanoseconds time_since_last_departure =
347 std::chrono::duration_cast<std::chrono::nanoseconds>(
348 std::chrono::steady_clock::now() - train_.last_departure_);
349 if (time_since_last_departure < train_.minimal_debounce_time_) {
350 std::this_thread::sleep_for(
351 train_.minimal_debounce_time_ - time_since_last_departure);
352 }
353 }
354
355 template<typename Protocol>
send(const std::vector<byte_t> & _cmd_header,const byte_t * _data,uint32_t _size)356 bool client_endpoint_impl<Protocol>::send(const std::vector<byte_t>& _cmd_header,
357 const byte_t *_data, uint32_t _size) {
358 (void) _cmd_header;
359 (void) _data;
360 (void) _size;
361 return false;
362 }
363
364 template<typename Protocol>
flush()365 bool client_endpoint_impl<Protocol>::flush() {
366 bool is_successful(true);
367 std::lock_guard<std::mutex> its_lock(mutex_);
368 if (!train_.buffer_->empty()) {
369 queue_train(!queue_.size());
370 train_.last_departure_ = std::chrono::steady_clock::now();
371 train_.passengers_.clear();
372 train_.minimal_debounce_time_ = std::chrono::nanoseconds::max();
373 train_.minimal_max_retention_time_ = std::chrono::nanoseconds::max();
374 } else {
375 is_successful = false;
376 }
377
378 return is_successful;
379 }
380
381 template<typename Protocol>
connect_cbk(boost::system::error_code const & _error)382 void client_endpoint_impl<Protocol>::connect_cbk(
383 boost::system::error_code const &_error) {
384 if (_error == boost::asio::error::operation_aborted
385 || endpoint_impl<Protocol>::sending_blocked_) {
386 // endpoint was stopped
387 shutdown_and_close_socket(false);
388 return;
389 }
390 std::shared_ptr<endpoint_host> its_host = this->endpoint_host_.lock();
391 if (its_host) {
392 if (_error && _error != boost::asio::error::already_connected) {
393 shutdown_and_close_socket(true);
394
395 if (state_ != cei_state_e::ESTABLISHED) {
396 state_ = cei_state_e::CLOSED;
397 its_host->on_disconnect(this->shared_from_this());
398 }
399 if (get_max_allowed_reconnects() == MAX_RECONNECTS_UNLIMITED ||
400 get_max_allowed_reconnects() >= ++reconnect_counter_) {
401 start_connect_timer();
402 } else {
403 max_allowed_reconnects_reached();
404 }
405 // Double the timeout as long as the maximum allowed is larger
406 if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT)
407 connect_timeout_ = (connect_timeout_ << 1);
408 } else {
409 {
410 std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
411 connect_timer_.cancel();
412 }
413 connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT; // TODO: use config variable
414 reconnect_counter_ = 0;
415 set_local_port();
416 if (was_not_connected_) {
417 was_not_connected_ = false;
418 std::lock_guard<std::mutex> its_lock(mutex_);
419 auto its_buffer = get_front();
420 if (its_buffer) {
421 strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
422 this->shared_from_this(), its_buffer));
423 VSOMEIP_WARNING << __func__ << ": resume sending to: "
424 << get_remote_information();
425 }
426 }
427 if (state_ != cei_state_e::ESTABLISHED) {
428 its_host->on_connect(this->shared_from_this());
429 }
430 receive();
431 }
432 }
433 }
434
435 template<typename Protocol>
wait_connect_cbk(boost::system::error_code const & _error)436 void client_endpoint_impl<Protocol>::wait_connect_cbk(
437 boost::system::error_code const &_error) {
438 if (!_error && !client_endpoint_impl<Protocol>::sending_blocked_) {
439 auto self = this->shared_from_this();
440 strand_.dispatch(std::bind(&client_endpoint_impl::connect,
441 this->shared_from_this()));
442 }
443 }
444
445 template<typename Protocol>
send_cbk(boost::system::error_code const & _error,std::size_t _bytes,const message_buffer_ptr_t & _sent_msg)446 void client_endpoint_impl<Protocol>::send_cbk(
447 boost::system::error_code const &_error, std::size_t _bytes,
448 const message_buffer_ptr_t& _sent_msg) {
449 (void)_bytes;
450 if (!_error) {
451 std::lock_guard<std::mutex> its_lock(mutex_);
452 if (queue_.size() > 0) {
453 queue_size_ -= queue_.front()->size();
454 queue_.pop_front();
455 auto its_buffer = get_front();
456 if (its_buffer)
457 send_queued(its_buffer);
458 }
459 } else if (_error == boost::asio::error::broken_pipe) {
460 state_ = cei_state_e::CLOSED;
461 bool stopping(false);
462 {
463 std::lock_guard<std::mutex> its_lock(mutex_);
464 stopping = endpoint_impl<Protocol>::sending_blocked_;
465 if (stopping) {
466 queue_.clear();
467 queue_size_ = 0;
468 } else {
469 service_t its_service(0);
470 method_t its_method(0);
471 client_t its_client(0);
472 session_t its_session(0);
473 if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
474 its_service = VSOMEIP_BYTES_TO_WORD(
475 (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
476 (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
477 its_method = VSOMEIP_BYTES_TO_WORD(
478 (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
479 (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
480 its_client = VSOMEIP_BYTES_TO_WORD(
481 (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
482 (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
483 its_session = VSOMEIP_BYTES_TO_WORD(
484 (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
485 (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
486 }
487 VSOMEIP_WARNING << "cei::send_cbk received error: "
488 << _error.message() << " (" << std::dec
489 << _error.value() << ") " << get_remote_information()
490 << " " << std::dec << queue_.size()
491 << " " << std::dec << queue_size_ << " ("
492 << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
493 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
494 << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
495 << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
496 }
497 }
498 if (!stopping) {
499 print_status();
500 }
501 was_not_connected_ = true;
502 shutdown_and_close_socket(true);
503 strand_.dispatch(std::bind(&client_endpoint_impl::connect,
504 this->shared_from_this()));
505 } else if (_error == boost::asio::error::not_connected
506 || _error == boost::asio::error::bad_descriptor
507 || _error == boost::asio::error::no_permission) {
508 state_ = cei_state_e::CLOSED;
509 if (_error == boost::asio::error::no_permission) {
510 VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
511 << " (" << std::dec << _error.value() << ") "
512 << get_remote_information();
513 std::lock_guard<std::mutex> its_lock(mutex_);
514 queue_.clear();
515 queue_size_ = 0;
516 }
517 was_not_connected_ = true;
518 shutdown_and_close_socket(true);
519 strand_.dispatch(std::bind(&client_endpoint_impl::connect,
520 this->shared_from_this()));
521 } else if (_error == boost::asio::error::operation_aborted) {
522 VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message();
523 // endpoint was stopped
524 endpoint_impl<Protocol>::sending_blocked_ = true;
525 shutdown_and_close_socket(false);
526 } else if (_error == boost::system::errc::destination_address_required) {
527 VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
528 << " (" << std::dec << _error.value() << ") "
529 << get_remote_information();
530 was_not_connected_ = true;
531 } else {
532 service_t its_service(0);
533 method_t its_method(0);
534 client_t its_client(0);
535 session_t its_session(0);
536 if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) {
537 its_service = VSOMEIP_BYTES_TO_WORD(
538 (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN],
539 (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]);
540 its_method = VSOMEIP_BYTES_TO_WORD(
541 (*_sent_msg)[VSOMEIP_METHOD_POS_MIN],
542 (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]);
543 its_client = VSOMEIP_BYTES_TO_WORD(
544 (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN],
545 (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]);
546 its_session = VSOMEIP_BYTES_TO_WORD(
547 (*_sent_msg)[VSOMEIP_SESSION_POS_MIN],
548 (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]);
549 }
550 VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message()
551 << " (" << std::dec << _error.value() << ") "
552 << get_remote_information() << " "
553 << " " << std::dec << queue_.size()
554 << " " << std::dec << queue_size_ << " ("
555 << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
556 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
557 << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
558 << std::hex << std::setw(4) << std::setfill('0') << its_session << "]";
559 print_status();
560 }
561 }
562
563 template<typename Protocol>
flush_cbk(boost::system::error_code const & _error)564 void client_endpoint_impl<Protocol>::flush_cbk(
565 boost::system::error_code const &_error) {
566 if (!_error) {
567 (void) flush();
568 }
569 }
570
571 template<typename Protocol>
shutdown_and_close_socket(bool _recreate_socket)572 void client_endpoint_impl<Protocol>::shutdown_and_close_socket(bool _recreate_socket) {
573 std::lock_guard<std::mutex> its_lock(socket_mutex_);
574 shutdown_and_close_socket_unlocked(_recreate_socket);
575 }
576
577 template<typename Protocol>
shutdown_and_close_socket_unlocked(bool _recreate_socket)578 void client_endpoint_impl<Protocol>::shutdown_and_close_socket_unlocked(bool _recreate_socket) {
579 local_port_ = 0;
580 if (socket_->is_open()) {
581 #ifndef _WIN32
582 if (-1 == fcntl(socket_->native_handle(), F_GETFD)) {
583 VSOMEIP_ERROR << "cei::shutdown_and_close_socket_unlocked: socket/handle closed already '"
584 << std::string(std::strerror(errno))
585 << "' (" << errno << ") " << get_remote_information();
586 }
587 #endif
588 boost::system::error_code its_error;
589 socket_->shutdown(Protocol::socket::shutdown_both, its_error);
590 socket_->close(its_error);
591 }
592 if (_recreate_socket) {
593 socket_.reset(new socket_type(endpoint_impl<Protocol>::service_));
594 }
595 }
596
597 template<typename Protocol>
get_remote_address(boost::asio::ip::address & _address) const598 bool client_endpoint_impl<Protocol>::get_remote_address(
599 boost::asio::ip::address &_address) const {
600 (void)_address;
601 return false;
602 }
603
604 template<typename Protocol>
get_remote_port() const605 std::uint16_t client_endpoint_impl<Protocol>::get_remote_port() const {
606 return 0;
607 }
608
609 template<typename Protocol>
get_local_port() const610 std::uint16_t client_endpoint_impl<Protocol>::get_local_port() const {
611 return local_port_;
612 }
613
614 template<typename Protocol>
set_local_port(uint16_t _port)615 void client_endpoint_impl<Protocol>::set_local_port(uint16_t _port) {
616 local_port_ = _port;
617 }
618
619 template<typename Protocol>
start_connect_timer()620 void client_endpoint_impl<Protocol>::start_connect_timer() {
621 std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
622 connect_timer_.expires_from_now(
623 std::chrono::milliseconds(connect_timeout_));
624 connect_timer_.async_wait(
625 std::bind(&client_endpoint_impl<Protocol>::wait_connect_cbk,
626 this->shared_from_this(), std::placeholders::_1));
627 }
628
629 template<typename Protocol>
check_message_size(const std::uint8_t * const _data,std::uint32_t _size)630 typename endpoint_impl<Protocol>::cms_ret_e client_endpoint_impl<Protocol>::check_message_size(
631 const std::uint8_t * const _data, std::uint32_t _size) {
632 typename endpoint_impl<Protocol>::cms_ret_e ret(endpoint_impl<Protocol>::cms_ret_e::MSG_OK);
633 if (endpoint_impl<Protocol>::max_message_size_ != MESSAGE_SIZE_UNLIMITED
634 && _size > endpoint_impl<Protocol>::max_message_size_) {
635 if (endpoint_impl<Protocol>::is_supporting_someip_tp_ && _data != nullptr) {
636 const service_t its_service = VSOMEIP_BYTES_TO_WORD(
637 _data[VSOMEIP_SERVICE_POS_MIN],
638 _data[VSOMEIP_SERVICE_POS_MAX]);
639 const method_t its_method = VSOMEIP_BYTES_TO_WORD(
640 _data[VSOMEIP_METHOD_POS_MIN],
641 _data[VSOMEIP_METHOD_POS_MAX]);
642 if (tp_segmentation_enabled(its_service, its_method)) {
643 send_segments(tp::tp::tp_split_message(_data, _size));
644 return endpoint_impl<Protocol>::cms_ret_e::MSG_WAS_SPLIT;
645 }
646 }
647 VSOMEIP_ERROR << "cei::check_message_size: Dropping too big message ("
648 << std::dec << _size << " Bytes). Maximum allowed message size is: "
649 << endpoint_impl<Protocol>::max_message_size_ << " Bytes.";
650 ret = endpoint_impl<Protocol>::cms_ret_e::MSG_TOO_BIG;
651 }
652 return ret;
653 }
654
655 template<typename Protocol>
check_queue_limit(const uint8_t * _data,std::uint32_t _size) const656 bool client_endpoint_impl<Protocol>::check_queue_limit(const uint8_t *_data, std::uint32_t _size) const {
657 if (endpoint_impl<Protocol>::queue_limit_ != QUEUE_SIZE_UNLIMITED
658 && queue_size_ + _size > endpoint_impl<Protocol>::queue_limit_) {
659 service_t its_service(0);
660 method_t its_method(0);
661 client_t its_client(0);
662 session_t its_session(0);
663 if (_size >= VSOMEIP_SESSION_POS_MAX) {
664 // this will yield wrong IDs for local communication as the commands
665 // are prepended to the actual payload
666 // it will print:
667 // (lowbyte service ID + highbyte methoid)
668 // [(Command + lowerbyte sender's client ID).
669 // highbyte sender's client ID + lowbyte command size.
670 // lowbyte methodid + highbyte vsomeip length]
671 its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
672 _data[VSOMEIP_SERVICE_POS_MAX]);
673 its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
674 _data[VSOMEIP_METHOD_POS_MAX]);
675 its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
676 _data[VSOMEIP_CLIENT_POS_MAX]);
677 its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
678 _data[VSOMEIP_SESSION_POS_MAX]);
679 }
680 VSOMEIP_ERROR << "cei::check_queue_limit: queue size limit (" << std::dec
681 << endpoint_impl<Protocol>::queue_limit_
682 << ") reached. Dropping message ("
683 << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): ["
684 << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
685 << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
686 << std::hex << std::setw(4) << std::setfill('0') << its_session << "] "
687 << "queue_size: " << std::dec << queue_size_
688 << " data size: " << std::dec << _size;
689 return false;
690 }
691 return true;
692 }
693
694 template<typename Protocol>
queue_train(bool _queue_size_zero_on_entry)695 void client_endpoint_impl<Protocol>::queue_train(bool _queue_size_zero_on_entry) {
696 queue_.push_back(train_.buffer_);
697 queue_size_ += train_.buffer_->size();
698 train_.buffer_ = std::make_shared<message_buffer_t>();
699 if (_queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress
700 auto its_buffer = get_front();
701 if (its_buffer) {
702 strand_.dispatch(std::bind(&client_endpoint_impl::send_queued,
703 this->shared_from_this(), its_buffer));
704 }
705 }
706 }
707
708 template<typename Protocol>
get_queue_size() const709 size_t client_endpoint_impl<Protocol>::get_queue_size() const {
710 std::lock_guard<std::mutex> its_lock(mutex_);
711 return queue_size_;
712 }
713
714 // Instantiate template
715 #ifndef _WIN32
716 template class client_endpoint_impl<boost::asio::local::stream_protocol>;
717 #endif
718 template class client_endpoint_impl<boost::asio::ip::tcp>;
719 template class client_endpoint_impl<boost::asio::ip::udp>;
720
721 } // namespace vsomeip_v3
722
723