// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include #include #include #include #include #include #include #include #include #include #include #include "../include/client_endpoint_impl.hpp" #include "../include/endpoint_host.hpp" #include "../../utility/include/utility.hpp" #include "../../utility/include/byteorder.hpp" namespace vsomeip_v3 { template client_endpoint_impl::client_endpoint_impl( const std::shared_ptr& _endpoint_host, const std::shared_ptr& _routing_host, const endpoint_type& _local, const endpoint_type& _remote, boost::asio::io_service &_io, std::uint32_t _max_message_size, configuration::endpoint_queue_limit_t _queue_limit, const std::shared_ptr& _configuration) : endpoint_impl(_endpoint_host, _routing_host, _local, _io, _max_message_size, _queue_limit, _configuration), socket_(new socket_type(_io)), remote_(_remote), flush_timer_(_io), connect_timer_(_io), connect_timeout_(VSOMEIP_DEFAULT_CONNECT_TIMEOUT), // TODO: use config variable state_(cei_state_e::CLOSED), reconnect_counter_(0), train_(_io), queue_size_(0), was_not_connected_(false), local_port_(0), strand_(_io) { } template client_endpoint_impl::~client_endpoint_impl() { } template bool client_endpoint_impl::is_client() const { return true; } template bool client_endpoint_impl::is_established() const { return state_ == cei_state_e::ESTABLISHED; } template bool client_endpoint_impl::is_established_or_connected() const { return (state_ == cei_state_e::ESTABLISHED || state_ == cei_state_e::CONNECTED); } template void client_endpoint_impl::set_established(bool _established) { if (_established) { if (state_ != cei_state_e::CONNECTING) { std::lock_guard its_lock(socket_mutex_); if (socket_->is_open()) { state_ = cei_state_e::ESTABLISHED; } else { state_ = cei_state_e::CLOSED; } } } else { state_ = cei_state_e::CLOSED; } } template void client_endpoint_impl::set_connected(bool _connected) { if (_connected) { std::lock_guard its_lock(socket_mutex_); if (socket_->is_open()) { state_ = cei_state_e::CONNECTED; } else { state_ = cei_state_e::CLOSED; } } else { state_ = cei_state_e::CLOSED; } } template void client_endpoint_impl::prepare_stop( endpoint::prepare_stop_handler_t _handler, service_t _service) { (void) _handler; (void) _service; } template void client_endpoint_impl::stop() { { std::lock_guard its_lock(mutex_); endpoint_impl::sending_blocked_ = true; // delete unsent messages queue_.clear(); queue_size_ = 0; } { std::lock_guard its_lock(connect_timer_mutex_); boost::system::error_code ec; connect_timer_.cancel(ec); } connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT; // bind to strand as stop() might be called from different thread strand_.dispatch(std::bind(&client_endpoint_impl::shutdown_and_close_socket, this->shared_from_this(), false) ); } template message_buffer_ptr_t client_endpoint_impl::get_front() { message_buffer_ptr_t its_buffer; if (queue_.size()) its_buffer = queue_.front(); return (its_buffer); } template bool client_endpoint_impl::send_to( const std::shared_ptr _target, const byte_t *_data, uint32_t _size) { (void)_target; (void)_data; (void)_size; VSOMEIP_ERROR << "Clients endpoints must not be used to " << "send to explicitely specified targets"; return false; } template bool client_endpoint_impl::send_error( const std::shared_ptr _target, const byte_t *_data, uint32_t _size) { (void)_target; (void)_data; (void)_size; VSOMEIP_ERROR << "Clients endpoints must not be used to " << "send errors to explicitly specified targets"; return false; } template bool client_endpoint_impl::send(const uint8_t *_data, uint32_t _size) { std::lock_guard its_lock(mutex_); bool must_depart(false); const bool queue_size_zero_on_entry(queue_.empty()); #if 0 std::stringstream msg; msg << "cei::send: "; for (uint32_t i = 0; i < _size; i++) msg << std::hex << std::setw(2) << std::setfill('0') << (int)_data[i] << " "; VSOMEIP_DEBUG << msg.str(); #endif if (endpoint_impl::sending_blocked_ || !check_queue_limit(_data, _size)) { return false; } switch (check_message_size(_data, _size)) { case endpoint_impl::cms_ret_e::MSG_WAS_SPLIT: return true; break; case endpoint_impl::cms_ret_e::MSG_TOO_BIG: return false; break; case endpoint_impl::cms_ret_e::MSG_OK: default: break; } // STEP 1: Determine elapsed time and update the departure time and cancel the departure timer train_.update_departure_time_and_stop_departure(); // STEP 3: Get configured timings const service_t its_service = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); const method_t its_method = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); std::chrono::nanoseconds its_debouncing(0), its_retention(0); get_configured_times_from_endpoint(its_service, its_method, &its_debouncing, &its_retention); // STEP 4: Check if the passenger enters an empty train const std::pair its_identifier = std::make_pair( its_service, its_method); if (train_.passengers_.empty()) { train_.departure_ = its_retention; } else { // STEP 4.1: Check whether the current train already contains the message if (train_.passengers_.end() != train_.passengers_.find(its_identifier)) { must_depart = true; } else { // STEP 5: Check whether the current message fits into the current train if (train_.buffer_->size() + _size > endpoint_impl::max_message_size_) { must_depart = true; } else { // STEP 6: Check debouncing time if (its_debouncing > train_.minimal_max_retention_time_) { // train's latest departure would already undershot new // passenger's debounce time must_depart = true; } else { if (its_debouncing > train_.departure_) { // train departs earlier as the new passenger's debounce // time allows must_depart = true; } else { // STEP 7: Check maximum retention time if (its_retention < train_.minimal_debounce_time_) { // train's earliest departure would already exceed // the new passenger's retention time. must_depart = true; } else { if (its_retention < train_.departure_) { train_.departure_ = its_retention; } } } } } } } // STEP 8: if necessary, send current buffer and create a new one if (must_depart) { // STEP 8.1: check if debounce time would be undershot here if the train // departs. Block sending until train is allowed to depart. wait_until_debounce_time_reached(); train_.passengers_.clear(); queue_train(queue_size_zero_on_entry); train_.last_departure_ = std::chrono::steady_clock::now(); train_.departure_ = its_retention; train_.minimal_debounce_time_ = std::chrono::nanoseconds::max(); train_.minimal_max_retention_time_ = std::chrono::nanoseconds::max(); } // STEP 9: insert current message buffer train_.buffer_->insert(train_.buffer_->end(), _data, _data + _size); train_.passengers_.insert(its_identifier); // STEP 9.1: update the trains minimal debounce time if necessary if (its_debouncing < train_.minimal_debounce_time_) { train_.minimal_debounce_time_ = its_debouncing; } // STEP 9.2: update the trains minimal maximum retention time if necessary if (its_retention < train_.minimal_max_retention_time_) { train_.minimal_max_retention_time_ = its_retention; } // STEP 10: restart timer with current departure time #ifndef _WIN32 train_.departure_timer_->expires_from_now(train_.departure_); #else train_.departure_timer_->expires_from_now( std::chrono::duration_cast< std::chrono::steady_clock::duration>(train_.departure_)); #endif train_.departure_timer_->async_wait( std::bind(&client_endpoint_impl::flush_cbk, this->shared_from_this(), std::placeholders::_1)); return true; } template void client_endpoint_impl::send_segments( const tp::tp_split_messages_t &_segments) { if (_segments.size() == 0) { return; } const bool queue_size_zero_on_entry(queue_.empty()); const service_t its_service = VSOMEIP_BYTES_TO_WORD( (*(_segments[0]))[VSOMEIP_SERVICE_POS_MIN], (*(_segments[0]))[VSOMEIP_SERVICE_POS_MAX]); const method_t its_method = VSOMEIP_BYTES_TO_WORD( (*(_segments[0]))[VSOMEIP_METHOD_POS_MIN], (*(_segments[0]))[VSOMEIP_METHOD_POS_MAX]); std::chrono::nanoseconds its_debouncing(0), its_retention(0); get_configured_times_from_endpoint(its_service, its_method, &its_debouncing, &its_retention); // update the trains minimal debounce time if necessary if (its_debouncing < train_.minimal_debounce_time_) { train_.minimal_debounce_time_ = its_debouncing; } // update the trains minimal maximum retention time if necessary if (its_retention < train_.minimal_max_retention_time_) { train_.minimal_max_retention_time_ = its_retention; } // We only need to respect the debouncing. There is no need to wait for further // messages as we will send several now anyway. if (!train_.passengers_.empty()) { wait_until_debounce_time_reached(); train_.passengers_.clear(); queue_train(queue_size_zero_on_entry); train_.last_departure_ = std::chrono::steady_clock::now(); train_.departure_ = its_retention; train_.minimal_debounce_time_ = std::chrono::nanoseconds::max(); train_.minimal_max_retention_time_ = std::chrono::nanoseconds::max(); } const bool queue_size_still_zero(queue_.empty()); for (const auto& s : _segments) { queue_.emplace_back(s); queue_size_ += s->size(); } if (queue_size_still_zero && !queue_.empty()) { // no writing in progress // respect minimal debounce time wait_until_debounce_time_reached(); // ignore retention time and send immediately as the train is full anyway auto its_buffer = get_front(); if (its_buffer) { strand_.dispatch(std::bind(&client_endpoint_impl::send_queued, this->shared_from_this(), its_buffer)); } } train_.last_departure_ = std::chrono::steady_clock::now(); } template void client_endpoint_impl::wait_until_debounce_time_reached() const { const std::chrono::nanoseconds time_since_last_departure = std::chrono::duration_cast( std::chrono::steady_clock::now() - train_.last_departure_); if (time_since_last_departure < train_.minimal_debounce_time_) { std::this_thread::sleep_for( train_.minimal_debounce_time_ - time_since_last_departure); } } template bool client_endpoint_impl::send(const std::vector& _cmd_header, const byte_t *_data, uint32_t _size) { (void) _cmd_header; (void) _data; (void) _size; return false; } template bool client_endpoint_impl::flush() { bool is_successful(true); std::lock_guard its_lock(mutex_); if (!train_.buffer_->empty()) { queue_train(!queue_.size()); train_.last_departure_ = std::chrono::steady_clock::now(); train_.passengers_.clear(); train_.minimal_debounce_time_ = std::chrono::nanoseconds::max(); train_.minimal_max_retention_time_ = std::chrono::nanoseconds::max(); } else { is_successful = false; } return is_successful; } template void client_endpoint_impl::connect_cbk( boost::system::error_code const &_error) { if (_error == boost::asio::error::operation_aborted || endpoint_impl::sending_blocked_) { // endpoint was stopped shutdown_and_close_socket(false); return; } std::shared_ptr its_host = this->endpoint_host_.lock(); if (its_host) { if (_error && _error != boost::asio::error::already_connected) { shutdown_and_close_socket(true); if (state_ != cei_state_e::ESTABLISHED) { state_ = cei_state_e::CLOSED; its_host->on_disconnect(this->shared_from_this()); } if (get_max_allowed_reconnects() == MAX_RECONNECTS_UNLIMITED || get_max_allowed_reconnects() >= ++reconnect_counter_) { start_connect_timer(); } else { max_allowed_reconnects_reached(); } // Double the timeout as long as the maximum allowed is larger if (connect_timeout_ < VSOMEIP_MAX_CONNECT_TIMEOUT) connect_timeout_ = (connect_timeout_ << 1); } else { { std::lock_guard its_lock(connect_timer_mutex_); connect_timer_.cancel(); } connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT; // TODO: use config variable reconnect_counter_ = 0; set_local_port(); if (was_not_connected_) { was_not_connected_ = false; std::lock_guard its_lock(mutex_); auto its_buffer = get_front(); if (its_buffer) { strand_.dispatch(std::bind(&client_endpoint_impl::send_queued, this->shared_from_this(), its_buffer)); VSOMEIP_WARNING << __func__ << ": resume sending to: " << get_remote_information(); } } if (state_ != cei_state_e::ESTABLISHED) { its_host->on_connect(this->shared_from_this()); } receive(); } } } template void client_endpoint_impl::wait_connect_cbk( boost::system::error_code const &_error) { if (!_error && !client_endpoint_impl::sending_blocked_) { auto self = this->shared_from_this(); strand_.dispatch(std::bind(&client_endpoint_impl::connect, this->shared_from_this())); } } template void client_endpoint_impl::send_cbk( boost::system::error_code const &_error, std::size_t _bytes, const message_buffer_ptr_t& _sent_msg) { (void)_bytes; if (!_error) { std::lock_guard its_lock(mutex_); if (queue_.size() > 0) { queue_size_ -= queue_.front()->size(); queue_.pop_front(); auto its_buffer = get_front(); if (its_buffer) send_queued(its_buffer); } } else if (_error == boost::asio::error::broken_pipe) { state_ = cei_state_e::CLOSED; bool stopping(false); { std::lock_guard its_lock(mutex_); stopping = endpoint_impl::sending_blocked_; if (stopping) { queue_.clear(); queue_size_ = 0; } else { service_t its_service(0); method_t its_method(0); client_t its_client(0); session_t its_session(0); if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) { its_service = VSOMEIP_BYTES_TO_WORD( (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN], (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]); its_method = VSOMEIP_BYTES_TO_WORD( (*_sent_msg)[VSOMEIP_METHOD_POS_MIN], (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]); its_client = VSOMEIP_BYTES_TO_WORD( (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN], (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]); its_session = VSOMEIP_BYTES_TO_WORD( (*_sent_msg)[VSOMEIP_SESSION_POS_MIN], (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]); } VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message() << " (" << std::dec << _error.value() << ") " << get_remote_information() << " " << std::dec << queue_.size() << " " << std::dec << queue_size_ << " (" << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." << std::hex << std::setw(4) << std::setfill('0') << its_method << "." << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"; } } if (!stopping) { print_status(); } was_not_connected_ = true; shutdown_and_close_socket(true); strand_.dispatch(std::bind(&client_endpoint_impl::connect, this->shared_from_this())); } else if (_error == boost::asio::error::not_connected || _error == boost::asio::error::bad_descriptor || _error == boost::asio::error::no_permission) { state_ = cei_state_e::CLOSED; if (_error == boost::asio::error::no_permission) { VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message() << " (" << std::dec << _error.value() << ") " << get_remote_information(); std::lock_guard its_lock(mutex_); queue_.clear(); queue_size_ = 0; } was_not_connected_ = true; shutdown_and_close_socket(true); strand_.dispatch(std::bind(&client_endpoint_impl::connect, this->shared_from_this())); } else if (_error == boost::asio::error::operation_aborted) { VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message(); // endpoint was stopped endpoint_impl::sending_blocked_ = true; shutdown_and_close_socket(false); } else if (_error == boost::system::errc::destination_address_required) { VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message() << " (" << std::dec << _error.value() << ") " << get_remote_information(); was_not_connected_ = true; } else { service_t its_service(0); method_t its_method(0); client_t its_client(0); session_t its_session(0); if (_sent_msg && _sent_msg->size() > VSOMEIP_SESSION_POS_MAX) { its_service = VSOMEIP_BYTES_TO_WORD( (*_sent_msg)[VSOMEIP_SERVICE_POS_MIN], (*_sent_msg)[VSOMEIP_SERVICE_POS_MAX]); its_method = VSOMEIP_BYTES_TO_WORD( (*_sent_msg)[VSOMEIP_METHOD_POS_MIN], (*_sent_msg)[VSOMEIP_METHOD_POS_MAX]); its_client = VSOMEIP_BYTES_TO_WORD( (*_sent_msg)[VSOMEIP_CLIENT_POS_MIN], (*_sent_msg)[VSOMEIP_CLIENT_POS_MAX]); its_session = VSOMEIP_BYTES_TO_WORD( (*_sent_msg)[VSOMEIP_SESSION_POS_MIN], (*_sent_msg)[VSOMEIP_SESSION_POS_MAX]); } VSOMEIP_WARNING << "cei::send_cbk received error: " << _error.message() << " (" << std::dec << _error.value() << ") " << get_remote_information() << " " << " " << std::dec << queue_.size() << " " << std::dec << queue_size_ << " (" << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." << std::hex << std::setw(4) << std::setfill('0') << its_method << "." << std::hex << std::setw(4) << std::setfill('0') << its_session << "]"; print_status(); } } template void client_endpoint_impl::flush_cbk( boost::system::error_code const &_error) { if (!_error) { (void) flush(); } } template void client_endpoint_impl::shutdown_and_close_socket(bool _recreate_socket) { std::lock_guard its_lock(socket_mutex_); shutdown_and_close_socket_unlocked(_recreate_socket); } template void client_endpoint_impl::shutdown_and_close_socket_unlocked(bool _recreate_socket) { local_port_ = 0; if (socket_->is_open()) { #ifndef _WIN32 if (-1 == fcntl(socket_->native_handle(), F_GETFD)) { VSOMEIP_ERROR << "cei::shutdown_and_close_socket_unlocked: socket/handle closed already '" << std::string(std::strerror(errno)) << "' (" << errno << ") " << get_remote_information(); } #endif boost::system::error_code its_error; socket_->shutdown(Protocol::socket::shutdown_both, its_error); socket_->close(its_error); } if (_recreate_socket) { socket_.reset(new socket_type(endpoint_impl::service_)); } } template bool client_endpoint_impl::get_remote_address( boost::asio::ip::address &_address) const { (void)_address; return false; } template std::uint16_t client_endpoint_impl::get_remote_port() const { return 0; } template std::uint16_t client_endpoint_impl::get_local_port() const { return local_port_; } template void client_endpoint_impl::set_local_port(uint16_t _port) { local_port_ = _port; } template void client_endpoint_impl::start_connect_timer() { std::lock_guard its_lock(connect_timer_mutex_); connect_timer_.expires_from_now( std::chrono::milliseconds(connect_timeout_)); connect_timer_.async_wait( std::bind(&client_endpoint_impl::wait_connect_cbk, this->shared_from_this(), std::placeholders::_1)); } template typename endpoint_impl::cms_ret_e client_endpoint_impl::check_message_size( const std::uint8_t * const _data, std::uint32_t _size) { typename endpoint_impl::cms_ret_e ret(endpoint_impl::cms_ret_e::MSG_OK); if (endpoint_impl::max_message_size_ != MESSAGE_SIZE_UNLIMITED && _size > endpoint_impl::max_message_size_) { if (endpoint_impl::is_supporting_someip_tp_ && _data != nullptr) { const service_t its_service = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); const method_t its_method = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); if (tp_segmentation_enabled(its_service, its_method)) { send_segments(tp::tp::tp_split_message(_data, _size)); return endpoint_impl::cms_ret_e::MSG_WAS_SPLIT; } } VSOMEIP_ERROR << "cei::check_message_size: Dropping too big message (" << std::dec << _size << " Bytes). Maximum allowed message size is: " << endpoint_impl::max_message_size_ << " Bytes."; ret = endpoint_impl::cms_ret_e::MSG_TOO_BIG; } return ret; } template bool client_endpoint_impl::check_queue_limit(const uint8_t *_data, std::uint32_t _size) const { if (endpoint_impl::queue_limit_ != QUEUE_SIZE_UNLIMITED && queue_size_ + _size > endpoint_impl::queue_limit_) { service_t its_service(0); method_t its_method(0); client_t its_client(0); session_t its_session(0); if (_size >= VSOMEIP_SESSION_POS_MAX) { // this will yield wrong IDs for local communication as the commands // are prepended to the actual payload // it will print: // (lowbyte service ID + highbyte methoid) // [(Command + lowerbyte sender's client ID). // highbyte sender's client ID + lowbyte command size. // lowbyte methodid + highbyte vsomeip length] its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN], _data[VSOMEIP_CLIENT_POS_MAX]); its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN], _data[VSOMEIP_SESSION_POS_MAX]); } VSOMEIP_ERROR << "cei::check_queue_limit: queue size limit (" << std::dec << endpoint_impl::queue_limit_ << ") reached. Dropping message (" << std::hex << std::setw(4) << std::setfill('0') << its_client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << its_service << "." << std::hex << std::setw(4) << std::setfill('0') << its_method << "." << std::hex << std::setw(4) << std::setfill('0') << its_session << "] " << "queue_size: " << std::dec << queue_size_ << " data size: " << std::dec << _size; return false; } return true; } template void client_endpoint_impl::queue_train(bool _queue_size_zero_on_entry) { queue_.push_back(train_.buffer_); queue_size_ += train_.buffer_->size(); train_.buffer_ = std::make_shared(); if (_queue_size_zero_on_entry && !queue_.empty()) { // no writing in progress auto its_buffer = get_front(); if (its_buffer) { strand_.dispatch(std::bind(&client_endpoint_impl::send_queued, this->shared_from_this(), its_buffer)); } } } template size_t client_endpoint_impl::get_queue_size() const { std::lock_guard its_lock(mutex_); return queue_size_; } // Instantiate template #ifndef _WIN32 template class client_endpoint_impl; #endif template class client_endpoint_impl; template class client_endpoint_impl; } // namespace vsomeip_v3