1
2 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7 #include <iomanip>
8
9 #include <boost/asio/write.hpp>
10
11 #include <vsomeip/constants.hpp>
12 #include <vsomeip/internal/logger.hpp>
13
14 #include "../include/endpoint_definition.hpp"
15 #include "../include/endpoint_host.hpp"
16 #include "../../routing/include/routing_host.hpp"
17 #include "../include/tcp_server_endpoint_impl.hpp"
18 #include "../../utility/include/utility.hpp"
19 #include "../../utility/include/byteorder.hpp"
20
21 namespace ip = boost::asio::ip;
22
23 namespace vsomeip_v3 {
24
tcp_server_endpoint_impl(const std::shared_ptr<endpoint_host> & _endpoint_host,const std::shared_ptr<routing_host> & _routing_host,const endpoint_type & _local,boost::asio::io_service & _io,const std::shared_ptr<configuration> & _configuration)25 tcp_server_endpoint_impl::tcp_server_endpoint_impl(
26 const std::shared_ptr<endpoint_host>& _endpoint_host,
27 const std::shared_ptr<routing_host>& _routing_host,
28 const endpoint_type& _local,
29 boost::asio::io_service &_io,
30 const std::shared_ptr<configuration>& _configuration)
31 : tcp_server_endpoint_base_impl(_endpoint_host, _routing_host, _local, _io,
32 _configuration->get_max_message_size_reliable(_local.address().to_string(), _local.port()),
33 _configuration->get_endpoint_queue_limit(_local.address().to_string(), _local.port()),
34 _configuration),
35 acceptor_(_io),
36 buffer_shrink_threshold_(configuration_->get_buffer_shrink_threshold()),
37 local_port_(_local.port()),
38 // send timeout after 2/3 of configured ttl, warning after 1/3
39 send_timeout_(configuration_->get_sd_ttl() * 666) {
40 is_supporting_magic_cookies_ = true;
41
42 boost::system::error_code ec;
43 acceptor_.open(_local.protocol(), ec);
44 boost::asio::detail::throw_error(ec, "acceptor open");
45 acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
46 boost::asio::detail::throw_error(ec, "acceptor set_option");
47
48 #ifndef _WIN32
49 // If specified, bind to device
50 std::string its_device(configuration_->get_device());
51 if (its_device != "") {
52 if (setsockopt(acceptor_.native_handle(),
53 SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
54 VSOMEIP_WARNING << "TCP Server: Could not bind to device \"" << its_device << "\"";
55 }
56 }
57 #endif
58
59 acceptor_.bind(_local, ec);
60 boost::asio::detail::throw_error(ec, "acceptor bind");
61 acceptor_.listen(boost::asio::socket_base::max_connections, ec);
62 boost::asio::detail::throw_error(ec, "acceptor listen");
63 }
64
~tcp_server_endpoint_impl()65 tcp_server_endpoint_impl::~tcp_server_endpoint_impl() {
66 }
67
is_local() const68 bool tcp_server_endpoint_impl::is_local() const {
69 return false;
70 }
71
start()72 void tcp_server_endpoint_impl::start() {
73 std::lock_guard<std::mutex> its_lock(acceptor_mutex_);
74 if (acceptor_.is_open()) {
75 connection::ptr new_connection = connection::create(
76 std::dynamic_pointer_cast<tcp_server_endpoint_impl>(
77 shared_from_this()), max_message_size_,
78 buffer_shrink_threshold_, has_enabled_magic_cookies_,
79 service_, send_timeout_);
80
81 {
82 std::unique_lock<std::mutex> its_socket_lock(new_connection->get_socket_lock());
83 acceptor_.async_accept(new_connection->get_socket(),
84 std::bind(&tcp_server_endpoint_impl::accept_cbk,
85 std::dynamic_pointer_cast<tcp_server_endpoint_impl>(
86 shared_from_this()), new_connection,
87 std::placeholders::_1));
88 }
89 }
90 }
91
stop()92 void tcp_server_endpoint_impl::stop() {
93 server_endpoint_impl::stop();
94 {
95 std::lock_guard<std::mutex> its_lock(acceptor_mutex_);
96 if(acceptor_.is_open()) {
97 boost::system::error_code its_error;
98 acceptor_.close(its_error);
99 }
100 }
101 {
102 std::lock_guard<std::mutex> its_lock(connections_mutex_);
103 for (const auto &c : connections_) {
104 c.second->stop();
105 }
106 connections_.clear();
107 }
108 }
109
send_to(const std::shared_ptr<endpoint_definition> _target,const byte_t * _data,uint32_t _size)110 bool tcp_server_endpoint_impl::send_to(
111 const std::shared_ptr<endpoint_definition> _target,
112 const byte_t *_data, uint32_t _size) {
113 std::lock_guard<std::mutex> its_lock(mutex_);
114 endpoint_type its_target(_target->get_address(), _target->get_port());
115 return send_intern(its_target, _data, _size);
116 }
117
send_error(const std::shared_ptr<endpoint_definition> _target,const byte_t * _data,uint32_t _size)118 bool tcp_server_endpoint_impl::send_error(
119 const std::shared_ptr<endpoint_definition> _target,
120 const byte_t *_data, uint32_t _size) {
121 bool ret(false);
122 std::lock_guard<std::mutex> its_lock(mutex_);
123 const endpoint_type its_target(_target->get_address(), _target->get_port());
124 const queue_iterator_type target_queue_iterator(find_or_create_queue_unlocked(its_target));
125 auto& its_qpair = target_queue_iterator->second;
126 const bool queue_size_zero_on_entry(its_qpair.second.empty());
127
128 if (check_message_size(nullptr, _size, its_target) == endpoint_impl::cms_ret_e::MSG_OK &&
129 check_queue_limit(_data, _size, its_qpair.first)) {
130 its_qpair.second.emplace_back(
131 std::make_shared<message_buffer_t>(_data, _data + _size));
132 its_qpair.first += _size;
133
134 if (queue_size_zero_on_entry) { // no writing in progress
135 send_queued(target_queue_iterator);
136 }
137 ret = true;
138 }
139 return ret;
140 }
141
send_queued(const queue_iterator_type _queue_iterator)142 void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iterator) {
143 connection::ptr its_connection;
144 {
145 std::lock_guard<std::mutex> its_lock(connections_mutex_);
146 auto connection_iterator = connections_.find(_queue_iterator->first);
147 if (connection_iterator != connections_.end()) {
148 its_connection = connection_iterator->second;
149 } else {
150 VSOMEIP_INFO << "Didn't find connection: "
151 << _queue_iterator->first.address().to_string() << ":" << std::dec
152 << static_cast<std::uint16_t>(_queue_iterator->first.port())
153 << " dropping outstanding messages (" << std::dec
154 << _queue_iterator->second.second.size() << ").";
155
156 if (_queue_iterator->second.second.size()) {
157 std::set<service_t> its_services;
158
159 // check all outstanding messages of this connection
160 // whether stop handlers need to be called
161 for (const auto &its_buffer : _queue_iterator->second.second) {
162 if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) {
163 service_t its_service = VSOMEIP_BYTES_TO_WORD(
164 (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
165 (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
166 its_services.insert(its_service);
167 }
168 }
169
170 for (auto its_service : its_services) {
171 auto found_cbk = prepare_stop_handlers_.find(its_service);
172 if (found_cbk != prepare_stop_handlers_.end()) {
173 VSOMEIP_INFO << "Calling prepare stop handler "
174 << "for service: 0x"
175 << std::hex << std::setw(4) << std::setfill('0')
176 << its_service;
177 auto handler = found_cbk->second;
178 auto ptr = this->shared_from_this();
179 service_.post([ptr, handler, its_service](){
180 handler(ptr, its_service);
181 });
182 prepare_stop_handlers_.erase(found_cbk);
183 }
184 }
185 }
186
187 queues_.erase(_queue_iterator->first);
188 }
189 }
190 if (its_connection) {
191 its_connection->send_queued(_queue_iterator);
192 }
193 }
194
get_configured_times_from_endpoint(service_t _service,method_t _method,std::chrono::nanoseconds * _debouncing,std::chrono::nanoseconds * _maximum_retention) const195 void tcp_server_endpoint_impl::get_configured_times_from_endpoint(
196 service_t _service, method_t _method,
197 std::chrono::nanoseconds *_debouncing,
198 std::chrono::nanoseconds *_maximum_retention) const {
199 configuration_->get_configured_timing_responses(_service,
200 tcp_server_endpoint_base_impl::local_.address().to_string(),
201 tcp_server_endpoint_base_impl::local_.port(), _method,
202 _debouncing, _maximum_retention);
203 }
204
is_established(const std::shared_ptr<endpoint_definition> & _endpoint)205 bool tcp_server_endpoint_impl::is_established(const std::shared_ptr<endpoint_definition>& _endpoint) {
206 bool is_connected = false;
207 endpoint_type endpoint(_endpoint->get_address(), _endpoint->get_port());
208 {
209 std::lock_guard<std::mutex> its_lock(connections_mutex_);
210 auto connection_iterator = connections_.find(endpoint);
211 if (connection_iterator != connections_.end()) {
212 is_connected = true;
213 } else {
214 VSOMEIP_INFO << "Didn't find TCP connection: Subscription "
215 << "rejected for: " << endpoint.address().to_string() << ":"
216 << std::dec << static_cast<std::uint16_t>(endpoint.port());
217 }
218 }
219 return is_connected;
220 }
221
get_default_target(service_t,tcp_server_endpoint_impl::endpoint_type &) const222 bool tcp_server_endpoint_impl::get_default_target(service_t,
223 tcp_server_endpoint_impl::endpoint_type &) const {
224 return false;
225 }
226
remove_connection(tcp_server_endpoint_impl::connection * _connection)227 void tcp_server_endpoint_impl::remove_connection(
228 tcp_server_endpoint_impl::connection *_connection) {
229 std::lock_guard<std::mutex> its_lock(connections_mutex_);
230 for (auto it = connections_.begin(); it != connections_.end();) {
231 if (it->second.get() == _connection) {
232 it = connections_.erase(it);
233 break;
234 } else {
235 ++it;
236 }
237 }
238 }
239
accept_cbk(const connection::ptr & _connection,boost::system::error_code const & _error)240 void tcp_server_endpoint_impl::accept_cbk(const connection::ptr& _connection,
241 boost::system::error_code const &_error) {
242
243 if (!_error) {
244 boost::system::error_code its_error;
245 endpoint_type remote;
246 {
247 std::unique_lock<std::mutex> its_socket_lock(_connection->get_socket_lock());
248 socket_type &new_connection_socket = _connection->get_socket();
249 remote = new_connection_socket.remote_endpoint(its_error);
250 _connection->set_remote_info(remote);
251 // Nagle algorithm off
252 new_connection_socket.set_option(ip::tcp::no_delay(true), its_error);
253
254 new_connection_socket.set_option(boost::asio::socket_base::keep_alive(true), its_error);
255 if (its_error) {
256 VSOMEIP_WARNING << "tcp_server_endpoint::connect: couldn't enable "
257 << "keep_alive: " << its_error.message();
258 }
259 }
260 if (!its_error) {
261 {
262 std::lock_guard<std::mutex> its_lock(connections_mutex_);
263 connections_[remote] = _connection;
264 }
265 _connection->start();
266 }
267 }
268 if (_error != boost::asio::error::bad_descriptor
269 && _error != boost::asio::error::operation_aborted
270 && _error != boost::asio::error::no_descriptors) {
271 start();
272 } else if (_error == boost::asio::error::no_descriptors) {
273 VSOMEIP_ERROR<< "tcp_server_endpoint_impl::accept_cbk: "
274 << _error.message() << " (" << std::dec << _error.value()
275 << ") Will try to accept again in 1000ms";
276 std::shared_ptr<boost::asio::steady_timer> its_timer =
277 std::make_shared<boost::asio::steady_timer>(service_,
278 std::chrono::milliseconds(1000));
279 auto its_ep = std::dynamic_pointer_cast<tcp_server_endpoint_impl>(
280 shared_from_this());
281 its_timer->async_wait([its_timer, its_ep]
282 (const boost::system::error_code& _error) {
283 if (!_error) {
284 its_ep->start();
285 }
286 });
287 }
288 }
289
get_local_port() const290 std::uint16_t tcp_server_endpoint_impl::get_local_port() const {
291 return local_port_;
292 }
293
set_local_port(std::uint16_t _port)294 void tcp_server_endpoint_impl::set_local_port(std::uint16_t _port) {
295 (void)_port;
296 }
297
is_reliable() const298 bool tcp_server_endpoint_impl::is_reliable() const {
299 return true;
300 }
301
302 ///////////////////////////////////////////////////////////////////////////////
303 // class tcp_service_impl::connection
304 ///////////////////////////////////////////////////////////////////////////////
connection(const std::weak_ptr<tcp_server_endpoint_impl> & _server,std::uint32_t _max_message_size,std::uint32_t _recv_buffer_size_initial,std::uint32_t _buffer_shrink_threshold,bool _magic_cookies_enabled,boost::asio::io_service & _io_service,std::chrono::milliseconds _send_timeout)305 tcp_server_endpoint_impl::connection::connection(
306 const std::weak_ptr<tcp_server_endpoint_impl>& _server,
307 std::uint32_t _max_message_size,
308 std::uint32_t _recv_buffer_size_initial,
309 std::uint32_t _buffer_shrink_threshold,
310 bool _magic_cookies_enabled,
311 boost::asio::io_service &_io_service,
312 std::chrono::milliseconds _send_timeout) :
313 socket_(_io_service),
314 server_(_server),
315 max_message_size_(_max_message_size),
316 recv_buffer_size_initial_(_recv_buffer_size_initial),
317 recv_buffer_(_recv_buffer_size_initial, 0),
318 recv_buffer_size_(0),
319 missing_capacity_(0),
320 shrink_count_(0),
321 buffer_shrink_threshold_(_buffer_shrink_threshold),
322 remote_port_(0),
323 magic_cookies_enabled_(_magic_cookies_enabled),
324 last_cookie_sent_(std::chrono::steady_clock::now() - std::chrono::seconds(11)),
325 send_timeout_(_send_timeout),
326 send_timeout_warning_(_send_timeout / 2) {
327 }
328
329 tcp_server_endpoint_impl::connection::ptr
create(const std::weak_ptr<tcp_server_endpoint_impl> & _server,std::uint32_t _max_message_size,std::uint32_t _buffer_shrink_threshold,bool _magic_cookies_enabled,boost::asio::io_service & _io_service,std::chrono::milliseconds _send_timeout)330 tcp_server_endpoint_impl::connection::create(
331 const std::weak_ptr<tcp_server_endpoint_impl>& _server,
332 std::uint32_t _max_message_size,
333 std::uint32_t _buffer_shrink_threshold,
334 bool _magic_cookies_enabled,
335 boost::asio::io_service & _io_service,
336 std::chrono::milliseconds _send_timeout) {
337 const std::uint32_t its_initial_receveive_buffer_size =
338 VSOMEIP_SOMEIP_HEADER_SIZE + 8 + MAGIC_COOKIE_SIZE + 8;
339 return ptr(new connection(_server, _max_message_size,
340 its_initial_receveive_buffer_size,
341 _buffer_shrink_threshold, _magic_cookies_enabled,
342 _io_service, _send_timeout));
343 }
344
345 tcp_server_endpoint_impl::socket_type &
get_socket()346 tcp_server_endpoint_impl::connection::get_socket() {
347 return socket_;
348 }
349
350 std::unique_lock<std::mutex>
get_socket_lock()351 tcp_server_endpoint_impl::connection::get_socket_lock() {
352 return std::unique_lock<std::mutex>(socket_mutex_);
353 }
354
start()355 void tcp_server_endpoint_impl::connection::start() {
356 receive();
357 }
358
receive()359 void tcp_server_endpoint_impl::connection::receive() {
360 std::lock_guard<std::mutex> its_lock(socket_mutex_);
361 if(socket_.is_open()) {
362 const std::size_t its_capacity(recv_buffer_.capacity());
363 size_t buffer_size = its_capacity - recv_buffer_size_;
364 try {
365 if (missing_capacity_) {
366 if (missing_capacity_ > MESSAGE_SIZE_UNLIMITED) {
367 VSOMEIP_ERROR << "Missing receive buffer capacity exceeds allowed maximum!";
368 return;
369 }
370 const std::size_t its_required_capacity(recv_buffer_size_ + missing_capacity_);
371 if (its_capacity < its_required_capacity) {
372 recv_buffer_.reserve(its_required_capacity);
373 recv_buffer_.resize(its_required_capacity, 0x0);
374 if (recv_buffer_.size() > 1048576) {
375 VSOMEIP_INFO << "tse: recv_buffer size is: " <<
376 recv_buffer_.size()
377 << " local: " << get_address_port_local()
378 << " remote: " << get_address_port_remote();
379 }
380 }
381 buffer_size = missing_capacity_;
382 missing_capacity_ = 0;
383 } else if (buffer_shrink_threshold_
384 && shrink_count_ > buffer_shrink_threshold_
385 && recv_buffer_size_ == 0) {
386 recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
387 recv_buffer_.shrink_to_fit();
388 buffer_size = recv_buffer_size_initial_;
389 shrink_count_ = 0;
390 }
391 } catch (const std::exception &e) {
392 handle_recv_buffer_exception(e);
393 // don't start receiving again
394 return;
395 }
396 socket_.async_receive(boost::asio::buffer(&recv_buffer_[recv_buffer_size_], buffer_size),
397 std::bind(&tcp_server_endpoint_impl::connection::receive_cbk,
398 shared_from_this(), std::placeholders::_1,
399 std::placeholders::_2));
400 }
401 }
402
stop()403 void tcp_server_endpoint_impl::connection::stop() {
404 std::lock_guard<std::mutex> its_lock(socket_mutex_);
405 if (socket_.is_open()) {
406 boost::system::error_code its_error;
407 socket_.shutdown(socket_.shutdown_both, its_error);
408 socket_.close(its_error);
409 }
410 }
411
send_queued(const queue_iterator_type _queue_iterator)412 void tcp_server_endpoint_impl::connection::send_queued(
413 const queue_iterator_type _queue_iterator) {
414 std::shared_ptr<tcp_server_endpoint_impl> its_server(server_.lock());
415 if (!its_server) {
416 VSOMEIP_TRACE << "tcp_server_endpoint_impl::connection::send_queued "
417 " couldn't lock server_";
418 return;
419 }
420 message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front();
421 const service_t its_service = VSOMEIP_BYTES_TO_WORD(
422 (*its_buffer)[VSOMEIP_SERVICE_POS_MIN],
423 (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]);
424 const method_t its_method = VSOMEIP_BYTES_TO_WORD(
425 (*its_buffer)[VSOMEIP_METHOD_POS_MIN],
426 (*its_buffer)[VSOMEIP_METHOD_POS_MAX]);
427 const client_t its_client = VSOMEIP_BYTES_TO_WORD(
428 (*its_buffer)[VSOMEIP_CLIENT_POS_MIN],
429 (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]);
430 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
431 (*its_buffer)[VSOMEIP_SESSION_POS_MIN],
432 (*its_buffer)[VSOMEIP_SESSION_POS_MAX]);
433 if (magic_cookies_enabled_) {
434 const std::chrono::steady_clock::time_point now =
435 std::chrono::steady_clock::now();
436 if (std::chrono::duration_cast<std::chrono::milliseconds>(
437 now - last_cookie_sent_) > std::chrono::milliseconds(10000)) {
438 if (send_magic_cookie(its_buffer)) {
439 last_cookie_sent_ = now;
440 _queue_iterator->second.first += sizeof(SERVICE_COOKIE);
441 }
442 }
443 }
444
445 {
446 std::lock_guard<std::mutex> its_lock(socket_mutex_);
447 {
448 std::lock_guard<std::mutex> its_sent_lock(its_server->sent_mutex_);
449 its_server->is_sending_ = true;
450 }
451
452 boost::asio::async_write(socket_, boost::asio::buffer(*its_buffer),
453 std::bind(&tcp_server_endpoint_impl::connection::write_completion_condition,
454 shared_from_this(),
455 std::placeholders::_1,
456 std::placeholders::_2,
457 its_buffer->size(),
458 its_service, its_method, its_client, its_session,
459 std::chrono::steady_clock::now()),
460 std::bind(&tcp_server_endpoint_base_impl::send_cbk,
461 its_server,
462 _queue_iterator,
463 std::placeholders::_1,
464 std::placeholders::_2));
465 }
466 }
467
send_queued_sync(const queue_iterator_type _queue_iterator)468 void tcp_server_endpoint_impl::connection::send_queued_sync(
469 const queue_iterator_type _queue_iterator) {
470 message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front();
471 if (magic_cookies_enabled_) {
472 const std::chrono::steady_clock::time_point now =
473 std::chrono::steady_clock::now();
474 if (std::chrono::duration_cast<std::chrono::milliseconds>(
475 now - last_cookie_sent_) > std::chrono::milliseconds(10000)) {
476 if (send_magic_cookie(its_buffer)) {
477 last_cookie_sent_ = now;
478 _queue_iterator->second.first += sizeof(SERVICE_COOKIE);
479 }
480 }
481 }
482
483 try {
484 std::lock_guard<std::mutex> its_lock(socket_mutex_);
485 boost::asio::write(socket_, boost::asio::buffer(*its_buffer));
486 } catch (const boost::system::system_error &e) {
487 if (e.code() != boost::asio::error::broken_pipe) {
488 VSOMEIP_ERROR << "tcp_server_endpoint_impl::connection::"
489 << __func__ << " " << e.what();
490 }
491 }
492 }
493
send_magic_cookie(message_buffer_ptr_t & _buffer)494 bool tcp_server_endpoint_impl::connection::send_magic_cookie(
495 message_buffer_ptr_t &_buffer) {
496 if (max_message_size_ == MESSAGE_SIZE_UNLIMITED
497 || max_message_size_ - _buffer->size() >=
498 VSOMEIP_SOMEIP_HEADER_SIZE + VSOMEIP_SOMEIP_MAGIC_COOKIE_SIZE) {
499 _buffer->insert(_buffer->begin(), SERVICE_COOKIE,
500 SERVICE_COOKIE + sizeof(SERVICE_COOKIE));
501 return true;
502 }
503 return false;
504 }
505
is_magic_cookie(size_t _offset) const506 bool tcp_server_endpoint_impl::connection::is_magic_cookie(size_t _offset) const {
507 return (0 == std::memcmp(CLIENT_COOKIE, &recv_buffer_[_offset],
508 sizeof(CLIENT_COOKIE)));
509 }
510
receive_cbk(boost::system::error_code const & _error,std::size_t _bytes)511 void tcp_server_endpoint_impl::connection::receive_cbk(
512 boost::system::error_code const &_error,
513 std::size_t _bytes) {
514 if (_error == boost::asio::error::operation_aborted) {
515 // endpoint was stopped
516 return;
517 }
518 std::shared_ptr<tcp_server_endpoint_impl> its_server(server_.lock());
519 if (!its_server) {
520 VSOMEIP_ERROR << "tcp_server_endpoint_impl::connection::receive_cbk "
521 " couldn't lock server_";
522 return;
523 }
524 #if 0
525 std::stringstream msg;
526 for (std::size_t i = 0; i < _bytes + recv_buffer_size_; ++i)
527 msg << std::hex << std::setw(2) << std::setfill('0')
528 << (int) recv_buffer_[i] << " ";
529 VSOMEIP_INFO << msg.str();
530 #endif
531 std::shared_ptr<routing_host> its_host = its_server->routing_host_.lock();
532 if (its_host) {
533 if (!_error && 0 < _bytes) {
534 if (recv_buffer_size_ + _bytes < recv_buffer_size_) {
535 VSOMEIP_ERROR << "receive buffer overflow in tcp client endpoint ~> abort!";
536 return;
537 }
538 recv_buffer_size_ += _bytes;
539
540 size_t its_iteration_gap = 0;
541 bool has_full_message;
542 do {
543 uint64_t read_message_size
544 = utility::get_message_size(&recv_buffer_[its_iteration_gap],
545 recv_buffer_size_);
546 if (read_message_size > MESSAGE_SIZE_UNLIMITED) {
547 VSOMEIP_ERROR << "Message size exceeds allowed maximum!";
548 return;
549 }
550 uint32_t current_message_size = static_cast<uint32_t>(read_message_size);
551 has_full_message = (current_message_size > VSOMEIP_RETURN_CODE_POS
552 && current_message_size <= recv_buffer_size_);
553 if (has_full_message) {
554 bool needs_forwarding(true);
555 if (is_magic_cookie(its_iteration_gap)) {
556 magic_cookies_enabled_ = true;
557 } else {
558 if (magic_cookies_enabled_) {
559 uint32_t its_offset
560 = its_server->find_magic_cookie(&recv_buffer_[its_iteration_gap],
561 recv_buffer_size_);
562 if (its_offset < current_message_size) {
563 {
564 std::lock_guard<std::mutex> its_lock(socket_mutex_);
565 VSOMEIP_ERROR << "Detected Magic Cookie within message data. Resyncing."
566 << " local: " << get_address_port_local()
567 << " remote: " << get_address_port_remote();
568 }
569 if (!is_magic_cookie(its_iteration_gap)) {
570 auto its_endpoint_host = its_server->endpoint_host_.lock();
571 if (its_endpoint_host) {
572 its_endpoint_host->on_error(&recv_buffer_[its_iteration_gap],
573 static_cast<length_t>(recv_buffer_size_),its_server.get(),
574 remote_address_, remote_port_);
575 }
576 }
577 current_message_size = its_offset;
578 needs_forwarding = false;
579 }
580 }
581 }
582 if (needs_forwarding) {
583 if (utility::is_request(
584 recv_buffer_[its_iteration_gap
585 + VSOMEIP_MESSAGE_TYPE_POS])) {
586 const client_t its_client = VSOMEIP_BYTES_TO_WORD(
587 recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MIN],
588 recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MAX]);
589 if (its_client != MAGIC_COOKIE_CLIENT) {
590 const service_t its_service = VSOMEIP_BYTES_TO_WORD(
591 recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MIN],
592 recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MAX]);
593 const method_t its_method = VSOMEIP_BYTES_TO_WORD(
594 recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MIN],
595 recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MAX]);
596 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
597 recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MIN],
598 recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MAX]);
599
600 std::lock_guard<std::mutex> its_requests_guard(its_server->requests_mutex_);
601 its_server->requests_[its_client]
602 [std::make_tuple(its_service, its_method, its_session)] = remote_;
603 }
604 }
605 if (!magic_cookies_enabled_) {
606 its_host->on_message(&recv_buffer_[its_iteration_gap],
607 current_message_size, its_server.get(),
608 boost::asio::ip::address(),
609 VSOMEIP_ROUTING_CLIENT,
610 std::make_pair(ANY_UID, ANY_GID),
611 remote_address_, remote_port_);
612 } else {
613 // Only call on_message without a magic cookie in front of the buffer!
614 if (!is_magic_cookie(its_iteration_gap)) {
615 its_host->on_message(&recv_buffer_[its_iteration_gap],
616 current_message_size, its_server.get(),
617 boost::asio::ip::address(),
618 VSOMEIP_ROUTING_CLIENT,
619 std::make_pair(ANY_UID, ANY_GID),
620 remote_address_, remote_port_);
621 }
622 }
623 }
624 calculate_shrink_count();
625 missing_capacity_ = 0;
626 recv_buffer_size_ -= current_message_size;
627 its_iteration_gap += current_message_size;
628 } else if (magic_cookies_enabled_ && recv_buffer_size_ > 0) {
629 uint32_t its_offset =
630 its_server->find_magic_cookie(&recv_buffer_[its_iteration_gap],
631 recv_buffer_size_);
632 if (its_offset < recv_buffer_size_) {
633 {
634 std::lock_guard<std::mutex> its_lock(socket_mutex_);
635 VSOMEIP_ERROR << "Detected Magic Cookie within message data. Resyncing."
636 << " local: " << get_address_port_local()
637 << " remote: " << get_address_port_remote();
638 }
639 if (!is_magic_cookie(its_iteration_gap)) {
640 auto its_endpoint_host = its_server->endpoint_host_.lock();
641 if (its_endpoint_host) {
642 its_endpoint_host->on_error(&recv_buffer_[its_iteration_gap],
643 static_cast<length_t>(recv_buffer_size_), its_server.get(),
644 remote_address_, remote_port_);
645 }
646 }
647 recv_buffer_size_ -= its_offset;
648 its_iteration_gap += its_offset;
649 has_full_message = true; // trigger next loop
650 if (!is_magic_cookie(its_iteration_gap)) {
651 auto its_endpoint_host = its_server->endpoint_host_.lock();
652 if (its_endpoint_host) {
653 its_endpoint_host->on_error(&recv_buffer_[its_iteration_gap],
654 static_cast<length_t>(recv_buffer_size_), its_server.get(),
655 remote_address_, remote_port_);
656 }
657 }
658 }
659 }
660
661 if (!has_full_message) {
662 if (recv_buffer_size_ > VSOMEIP_RETURN_CODE_POS &&
663 (recv_buffer_[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION ||
664 !utility::is_valid_message_type(static_cast<message_type_e>(recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])) ||
665 !utility::is_valid_return_code(static_cast<return_code_e>(recv_buffer_[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))
666 )) {
667 if (recv_buffer_[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
668 {
669 std::lock_guard<std::mutex> its_lock(socket_mutex_);
670 VSOMEIP_ERROR << "tse: Wrong protocol version: 0x"
671 << std::hex << std::setw(2) << std::setfill('0')
672 << std::uint32_t(recv_buffer_[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS])
673 << " local: " << get_address_port_local()
674 << " remote: " << get_address_port_remote()
675 << ". Closing connection due to missing/broken data TCP stream.";
676 }
677 // ensure to send back a error message w/ wrong protocol version
678 its_host->on_message(&recv_buffer_[its_iteration_gap],
679 VSOMEIP_SOMEIP_HEADER_SIZE + 8, its_server.get(),
680 boost::asio::ip::address(),
681 VSOMEIP_ROUTING_CLIENT,
682 std::make_pair(ANY_UID, ANY_GID),
683 remote_address_, remote_port_);
684 } else if (!utility::is_valid_message_type(static_cast<message_type_e>(
685 recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS]))) {
686 std::lock_guard<std::mutex> its_lock(socket_mutex_);
687 VSOMEIP_ERROR << "tse: Invalid message type: 0x"
688 << std::hex << std::setw(2) << std::setfill('0')
689 << std::uint32_t(recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])
690 << " local: " << get_address_port_local()
691 << " remote: " << get_address_port_remote()
692 << ". Closing connection due to missing/broken data TCP stream.";
693 } else if (!utility::is_valid_return_code(static_cast<return_code_e>(
694 recv_buffer_[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))) {
695 std::lock_guard<std::mutex> its_lock(socket_mutex_);
696 VSOMEIP_ERROR << "tse: Invalid return code: 0x"
697 << std::hex << std::setw(2) << std::setfill('0')
698 << std::uint32_t(recv_buffer_[its_iteration_gap + VSOMEIP_RETURN_CODE_POS])
699 << " local: " << get_address_port_local()
700 << " remote: " << get_address_port_remote()
701 << ". Closing connection due to missing/broken data TCP stream.";
702 }
703 wait_until_sent(boost::asio::error::operation_aborted);
704 return;
705 } else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED
706 && current_message_size > max_message_size_) {
707 recv_buffer_size_ = 0;
708 recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
709 recv_buffer_.shrink_to_fit();
710 if (magic_cookies_enabled_) {
711 std::lock_guard<std::mutex> its_lock(socket_mutex_);
712 VSOMEIP_ERROR << "Received a TCP message which exceeds "
713 << "maximum message size ("
714 << std::dec << current_message_size
715 << " > " << std::dec << max_message_size_
716 << "). Magic Cookies are enabled: "
717 << "Resetting receiver. local: "
718 << get_address_port_local() << " remote: "
719 << get_address_port_remote();
720 } else {
721 {
722 std::lock_guard<std::mutex> its_lock(socket_mutex_);
723 VSOMEIP_ERROR << "Received a TCP message which exceeds "
724 << "maximum message size ("
725 << std::dec << current_message_size
726 << " > " << std::dec << max_message_size_
727 << ") Magic cookies are disabled: "
728 << "Connection will be closed! local: "
729 << get_address_port_local() << " remote: "
730 << get_address_port_remote();
731 }
732 wait_until_sent(boost::asio::error::operation_aborted);
733 return;
734 }
735 } else if (current_message_size > recv_buffer_size_) {
736 missing_capacity_ = current_message_size
737 - static_cast<std::uint32_t>(recv_buffer_size_);
738 } else if (VSOMEIP_SOMEIP_HEADER_SIZE > recv_buffer_size_) {
739 missing_capacity_ = VSOMEIP_SOMEIP_HEADER_SIZE
740 - static_cast<std::uint32_t>(recv_buffer_size_);
741 } else if (magic_cookies_enabled_ && recv_buffer_size_ > 0) {
742 // no need to check for magic cookie here again: has_full_message
743 // would have been set to true if there was one present in the data
744 recv_buffer_size_ = 0;
745 recv_buffer_.resize(recv_buffer_size_initial_, 0x0);
746 recv_buffer_.shrink_to_fit();
747 missing_capacity_ = 0;
748 std::lock_guard<std::mutex> its_lock(socket_mutex_);
749 VSOMEIP_ERROR << "Didn't find magic cookie in broken"
750 << " data, trying to resync."
751 << " local: " << get_address_port_local()
752 << " remote: " << get_address_port_remote();
753 } else {
754 {
755 std::lock_guard<std::mutex> its_lock(socket_mutex_);
756 VSOMEIP_ERROR << "tse::c<" << this
757 << ">rcb: recv_buffer_size is: " << std::dec
758 << recv_buffer_size_ << " but couldn't read "
759 "out message_size. recv_buffer_capacity: "
760 << recv_buffer_.capacity()
761 << " its_iteration_gap: " << its_iteration_gap
762 << "local: " << get_address_port_local()
763 << " remote: " << get_address_port_remote()
764 << ". Closing connection due to missing/broken data TCP stream.";
765 }
766 wait_until_sent(boost::asio::error::operation_aborted);
767 return;
768 }
769 }
770 } while (has_full_message && recv_buffer_size_);
771 if (its_iteration_gap) {
772 // Copy incomplete message to front for next receive_cbk iteration
773 for (size_t i = 0; i < recv_buffer_size_; ++i) {
774 recv_buffer_[i] = recv_buffer_[i + its_iteration_gap];
775 }
776 // Still more capacity needed after shifting everything to front?
777 if (missing_capacity_ &&
778 missing_capacity_ <= recv_buffer_.capacity() - recv_buffer_size_) {
779 missing_capacity_ = 0;
780 }
781 }
782 receive();
783 }
784 }
785 if (_error == boost::asio::error::eof
786 || _error == boost::asio::error::connection_reset
787 || _error == boost::asio::error::timed_out) {
788 if(_error == boost::asio::error::timed_out) {
789 std::lock_guard<std::mutex> its_lock(socket_mutex_);
790 VSOMEIP_WARNING << "tcp_server_endpoint receive_cbk: " << _error.message()
791 << " local: " << get_address_port_local()
792 << " remote: " << get_address_port_remote();
793 }
794 wait_until_sent(boost::asio::error::operation_aborted);
795 }
796 }
797
calculate_shrink_count()798 void tcp_server_endpoint_impl::connection::calculate_shrink_count() {
799 if (buffer_shrink_threshold_) {
800 if (recv_buffer_.capacity() != recv_buffer_size_initial_) {
801 if (recv_buffer_size_ < (recv_buffer_.capacity() >> 1)) {
802 shrink_count_++;
803 } else {
804 shrink_count_ = 0;
805 }
806 }
807 }
808 }
809
set_remote_info(const endpoint_type & _remote)810 void tcp_server_endpoint_impl::connection::set_remote_info(
811 const endpoint_type &_remote) {
812 remote_ = _remote;
813 remote_address_ = _remote.address();
814 remote_port_ = _remote.port();
815 }
816
get_address_port_remote() const817 const std::string tcp_server_endpoint_impl::connection::get_address_port_remote() const {
818 std::string its_address_port;
819 its_address_port.reserve(21);
820 boost::system::error_code ec;
821 its_address_port += remote_address_.to_string(ec);
822 its_address_port += ":";
823 its_address_port += std::to_string(remote_port_);
824 return its_address_port;
825 }
826
get_address_port_local() const827 const std::string tcp_server_endpoint_impl::connection::get_address_port_local() const {
828 std::string its_address_port;
829 its_address_port.reserve(21);
830 boost::system::error_code ec;
831 if (socket_.is_open()) {
832 endpoint_type its_local_endpoint = socket_.local_endpoint(ec);
833 if (!ec) {
834 its_address_port += its_local_endpoint.address().to_string(ec);
835 its_address_port += ":";
836 its_address_port += std::to_string(its_local_endpoint.port());
837 }
838 }
839 return its_address_port;
840 }
841
handle_recv_buffer_exception(const std::exception & _e)842 void tcp_server_endpoint_impl::connection::handle_recv_buffer_exception(
843 const std::exception &_e) {
844 std::stringstream its_message;
845 its_message <<"tcp_server_endpoint_impl::connection catched exception"
846 << _e.what() << " local: " << get_address_port_local()
847 << " remote: " << get_address_port_remote()
848 << " shutting down connection. Start of buffer: ";
849
850 for (std::size_t i = 0; i < recv_buffer_size_ && i < 16; i++) {
851 its_message << std::setw(2) << std::setfill('0') << std::hex
852 << (int) (recv_buffer_[i]) << " ";
853 }
854
855 its_message << " Last 16 Bytes captured: ";
856 for (int i = 15; recv_buffer_size_ > 15 && i >= 0; i--) {
857 its_message << std::setw(2) << std::setfill('0') << std::hex
858 << (int) (recv_buffer_[static_cast<size_t>(i)]) << " ";
859 }
860 VSOMEIP_ERROR << its_message.str();
861 recv_buffer_.clear();
862 if (socket_.is_open()) {
863 boost::system::error_code its_error;
864 socket_.shutdown(socket_.shutdown_both, its_error);
865 socket_.close(its_error);
866 }
867 std::shared_ptr<tcp_server_endpoint_impl> its_server = server_.lock();
868 if (its_server) {
869 its_server->remove_connection(this);
870 }
871 }
872
873 std::size_t
get_recv_buffer_capacity() const874 tcp_server_endpoint_impl::connection::get_recv_buffer_capacity() const {
875 return recv_buffer_.capacity();
876 }
877
878 std::size_t
write_completion_condition(const boost::system::error_code & _error,std::size_t _bytes_transferred,std::size_t _bytes_to_send,service_t _service,method_t _method,client_t _client,session_t _session,const std::chrono::steady_clock::time_point _start)879 tcp_server_endpoint_impl::connection::write_completion_condition(
880 const boost::system::error_code& _error,
881 std::size_t _bytes_transferred, std::size_t _bytes_to_send,
882 service_t _service, method_t _method, client_t _client, session_t _session,
883 const std::chrono::steady_clock::time_point _start) {
884 if (_error) {
885 VSOMEIP_ERROR << "tse::write_completion_condition: "
886 << _error.message() << "(" << std::dec << _error.value()
887 << ") bytes transferred: " << std::dec << _bytes_transferred
888 << " bytes to sent: " << std::dec << _bytes_to_send << " "
889 << "remote:" << get_address_port_remote() << " ("
890 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
891 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
892 << std::hex << std::setw(4) << std::setfill('0') << _method << "."
893 << std::hex << std::setw(4) << std::setfill('0') << _session << "]";
894 stop_and_remove_connection();
895 return 0;
896 }
897
898 const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
899 const std::chrono::milliseconds passed = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start);
900 if (passed > send_timeout_warning_) {
901 if (passed > send_timeout_) {
902 VSOMEIP_ERROR << "tse::write_completion_condition: "
903 << _error.message() << "(" << std::dec << _error.value()
904 << ") took longer than " << std::dec << send_timeout_.count()
905 << "ms bytes transferred: " << std::dec << _bytes_transferred
906 << " bytes to sent: " << std::dec << _bytes_to_send
907 << " remote:" << get_address_port_remote() << " ("
908 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
909 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
910 << std::hex << std::setw(4) << std::setfill('0') << _method << "."
911 << std::hex << std::setw(4) << std::setfill('0') << _session << "]";
912 } else {
913 VSOMEIP_WARNING << "tse::write_completion_condition: "
914 << _error.message() << "(" << std::dec << _error.value()
915 << ") took longer than " << std::dec << send_timeout_warning_.count()
916 << "ms bytes transferred: " << std::dec << _bytes_transferred
917 << " bytes to sent: " << std::dec << _bytes_to_send
918 << " remote:" << get_address_port_remote() << " ("
919 << std::hex << std::setw(4) << std::setfill('0') << _client <<"): ["
920 << std::hex << std::setw(4) << std::setfill('0') << _service << "."
921 << std::hex << std::setw(4) << std::setfill('0') << _method << "."
922 << std::hex << std::setw(4) << std::setfill('0') << _session << "]";
923 }
924 }
925 return _bytes_to_send - _bytes_transferred;
926 }
927
stop_and_remove_connection()928 void tcp_server_endpoint_impl::connection::stop_and_remove_connection() {
929 std::shared_ptr<tcp_server_endpoint_impl> its_server(server_.lock());
930 if (!its_server) {
931 VSOMEIP_ERROR << "tse::connection::stop_and_remove_connection "
932 " couldn't lock server_";
933 return;
934 }
935 {
936 std::lock_guard<std::mutex> its_lock(its_server->connections_mutex_);
937 stop();
938 }
939 its_server->remove_connection(this);
940 }
941
942 // Dummies
receive()943 void tcp_server_endpoint_impl::receive() {
944 // intentionally left empty
945 }
946
print_status()947 void tcp_server_endpoint_impl::print_status() {
948 std::lock_guard<std::mutex> its_lock(mutex_);
949 connections_t its_connections;
950 {
951 std::lock_guard<std::mutex> its_lock(connections_mutex_);
952 its_connections = connections_;
953 }
954
955 VSOMEIP_INFO << "status tse: " << std::dec << local_port_
956 << " connections: " << std::dec << its_connections.size()
957 << " queues: " << std::dec << queues_.size();
958 for (const auto &c : its_connections) {
959 std::size_t its_data_size(0);
960 std::size_t its_queue_size(0);
961 std::size_t its_recv_size(0);
962 {
963 std::unique_lock<std::mutex> c_s_lock(c.second->get_socket_lock());
964 its_recv_size = c.second->get_recv_buffer_capacity();
965 }
966 auto found_queue = queues_.find(c.first);
967 if (found_queue != queues_.end()) {
968 its_queue_size = found_queue->second.second.size();
969 its_data_size = found_queue->second.first;
970 }
971 VSOMEIP_INFO << "status tse: client: "
972 << c.second->get_address_port_remote()
973 << " queue: " << std::dec << its_queue_size
974 << " data: " << std::dec << its_data_size
975 << " recv_buffer: " << std::dec << its_recv_size;
976 }
977 }
978
get_remote_information(const queue_iterator_type _queue_iterator) const979 std::string tcp_server_endpoint_impl::get_remote_information(
980 const queue_iterator_type _queue_iterator) const {
981 boost::system::error_code ec;
982 return _queue_iterator->first.address().to_string(ec) + ":"
983 + std::to_string(_queue_iterator->first.port());
984 }
985
get_remote_information(const endpoint_type & _remote) const986 std::string tcp_server_endpoint_impl::get_remote_information(
987 const endpoint_type& _remote) const {
988 boost::system::error_code ec;
989 return _remote.address().to_string(ec) + ":"
990 + std::to_string(_remote.port());
991 }
992
tp_segmentation_enabled(service_t _service,method_t _method) const993 bool tcp_server_endpoint_impl::tp_segmentation_enabled(service_t _service,
994 method_t _method) const {
995 (void)_service;
996 (void)_method;
997 return false;
998 }
999
wait_until_sent(const boost::system::error_code & _error)1000 void tcp_server_endpoint_impl::connection::wait_until_sent(const boost::system::error_code &_error) {
1001 std::shared_ptr<tcp_server_endpoint_impl> its_server(server_.lock());
1002 std::unique_lock<std::mutex> its_sent_lock(its_server->sent_mutex_);
1003 if (!its_server->is_sending_ || !_error) {
1004 its_sent_lock.unlock();
1005 if (!_error)
1006 VSOMEIP_WARNING << __func__
1007 << ": Maximum wait time for send operation exceeded for tse.";
1008 {
1009 std::lock_guard<std::mutex> its_lock(its_server->connections_mutex_);
1010 stop();
1011 }
1012 its_server->remove_connection(this);
1013 } else {
1014 std::chrono::milliseconds its_timeout(VSOMEIP_MAX_TCP_SENT_WAIT_TIME);
1015 boost::system::error_code ec;
1016 its_server->sent_timer_.expires_from_now(its_timeout, ec);
1017 its_server->sent_timer_.async_wait(std::bind(&tcp_server_endpoint_impl::connection::wait_until_sent,
1018 std::dynamic_pointer_cast<tcp_server_endpoint_impl::connection>(shared_from_this()),
1019 std::placeholders::_1));
1020 }
1021 }
1022
1023 } // namespace vsomeip_v3
1024