1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <iomanip>
7 #include <sstream>
8
9 #include <boost/asio/ip/multicast.hpp>
10
11 #include <vsomeip/constants.hpp>
12 #include <vsomeip/internal/logger.hpp>
13
14 #include "../include/endpoint_definition.hpp"
15 #include "../include/endpoint_host.hpp"
16 #include "../include/tp.hpp"
17 #include "../../routing/include/routing_host.hpp"
18 #include "../include/udp_server_endpoint_impl.hpp"
19 #include "../../configuration/include/configuration.hpp"
20 #include "../../utility/include/byteorder.hpp"
21 #include "../../utility/include/utility.hpp"
22 #include "../../service_discovery/include/defines.hpp"
23
24 namespace ip = boost::asio::ip;
25
26 namespace vsomeip_v3 {
27
udp_server_endpoint_impl(const std::shared_ptr<endpoint_host> & _endpoint_host,const std::shared_ptr<routing_host> & _routing_host,const endpoint_type & _local,boost::asio::io_service & _io,const std::shared_ptr<configuration> & _configuration)28 udp_server_endpoint_impl::udp_server_endpoint_impl(
29 const std::shared_ptr<endpoint_host>& _endpoint_host,
30 const std::shared_ptr<routing_host>& _routing_host,
31 const endpoint_type& _local,
32 boost::asio::io_service &_io,
33 const std::shared_ptr<configuration>& _configuration) :
34 server_endpoint_impl<ip::udp_ext>(_endpoint_host, _routing_host, _local,
35 _io, VSOMEIP_MAX_UDP_MESSAGE_SIZE,
36 _configuration->get_endpoint_queue_limit(_configuration->get_unicast_address().to_string(), _local.port()),
37 _configuration),
38 unicast_socket_(_io, _local.protocol()),
39 unicast_recv_buffer_(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0),
40 multicast_id_(0),
41 joined_group_(false),
42 local_port_(_local.port()),
43 tp_reassembler_(std::make_shared<tp::tp_reassembler>(_configuration->get_max_message_size_unreliable(), _io)),
44 tp_cleanup_timer_(_io) {
45 is_supporting_someip_tp_ = true;
46
47 boost::system::error_code ec;
48
49 boost::asio::socket_base::reuse_address optionReuseAddress(true);
50 unicast_socket_.set_option(optionReuseAddress, ec);
51 boost::asio::detail::throw_error(ec, "reuse address");
52
53 #ifndef _WIN32
54 // If specified, bind to device
55 std::string its_device(configuration_->get_device());
56 if (its_device != "") {
57 if (setsockopt(unicast_socket_.native_handle(),
58 SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) {
59 VSOMEIP_WARNING << "UDP Server: Could not bind to device \"" << its_device << "\"";
60 }
61 }
62 #endif
63
64 unicast_socket_.bind(_local, ec);
65 boost::asio::detail::throw_error(ec, "bind");
66
67 if (local_.address().is_v4()) {
68 boost::asio::ip::multicast::outbound_interface option(_local.address().to_v4());
69 unicast_socket_.set_option(option, ec);
70 boost::asio::detail::throw_error(ec, "outbound interface option IPv4");
71 } else if (local_.address().is_v6()) {
72 boost::asio::ip::multicast::outbound_interface option(
73 static_cast<unsigned int>(local_.address().to_v6().scope_id()));
74 unicast_socket_.set_option(option, ec);
75 boost::asio::detail::throw_error(ec, "outbound interface option IPv6");
76 }
77
78 boost::asio::socket_base::broadcast option(true);
79 unicast_socket_.set_option(option, ec);
80 boost::asio::detail::throw_error(ec, "broadcast option");
81
82 const int its_udp_recv_buffer_size =
83 configuration_->get_udp_receive_buffer_size();
84 unicast_socket_.set_option(boost::asio::socket_base::receive_buffer_size(
85 its_udp_recv_buffer_size), ec);
86 if (ec) {
87 VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't set "
88 << "SO_RCVBUF: " << ec.message() << " to: " << std::dec
89 << its_udp_recv_buffer_size << " local port: " << std::dec
90 << local_port_;
91 }
92
93 boost::asio::socket_base::receive_buffer_size its_option;
94 unicast_socket_.get_option(its_option, ec);
95 #ifdef __linux__
96 // If regular setting of the buffer size did not work, try to force
97 // (requires CAP_NET_ADMIN to be successful)
98 if (its_option.value() < 0
99 || its_option.value() < its_udp_recv_buffer_size) {
100 ec.assign(setsockopt(unicast_socket_.native_handle(),
101 SOL_SOCKET, SO_RCVBUFFORCE,
102 &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)),
103 boost::system::generic_category());
104 if (!ec) {
105 VSOMEIP_INFO << "udp_server_endpoint_impl: "
106 << "SO_RCVBUFFORCE successful.";
107 }
108 unicast_socket_.get_option(its_option, ec);
109 }
110 #endif
111 if (ec) {
112 VSOMEIP_WARNING << "udp_server_endpoint_impl: couldn't get "
113 << "SO_RCVBUF: " << ec.message() << " local port:"
114 << std::dec << local_port_;
115 } else {
116 VSOMEIP_INFO << "udp_server_endpoint_impl: SO_RCVBUF is: "
117 << std::dec << its_option.value()
118 << " (" << its_udp_recv_buffer_size << ") local port:"
119 << std::dec << local_port_;
120 }
121
122
123 #ifdef _WIN32
124 const char* optval("0001");
125 ::setsockopt(unicast_socket_.native_handle(), IPPROTO_IP, IP_PKTINFO,
126 optval, sizeof(optval));
127 #else
128 int optval(1);
129 ::setsockopt(unicast_socket_.native_handle(), IPPROTO_IP, IP_PKTINFO,
130 &optval, sizeof(optval));
131 #endif
132 }
133
~udp_server_endpoint_impl()134 udp_server_endpoint_impl::~udp_server_endpoint_impl() {
135 }
136
is_local() const137 bool udp_server_endpoint_impl::is_local() const {
138 return false;
139 }
140
start()141 void udp_server_endpoint_impl::start() {
142 receive();
143 }
144
stop()145 void udp_server_endpoint_impl::stop() {
146 server_endpoint_impl::stop();
147 {
148 std::lock_guard<std::mutex> its_lock(unicast_mutex_);
149
150 if (unicast_socket_.is_open()) {
151 boost::system::error_code its_error;
152 unicast_socket_.shutdown(socket_type::shutdown_both, its_error);
153 unicast_socket_.close(its_error);
154 }
155 }
156
157 {
158 std::lock_guard<std::mutex> its_lock(multicast_mutex_);
159
160 if (multicast_socket_ && multicast_socket_->is_open()) {
161 boost::system::error_code its_error;
162 multicast_socket_->shutdown(socket_type::shutdown_both, its_error);
163 multicast_socket_->close(its_error);
164 }
165 }
166
167 tp_reassembler_->stop();
168 }
169
receive()170 void udp_server_endpoint_impl::receive() {
171 receive_unicast();
172 }
173
receive_unicast()174 void udp_server_endpoint_impl::receive_unicast() {
175
176 std::lock_guard<std::mutex> its_lock(unicast_mutex_);
177
178 if(unicast_socket_.is_open()) {
179 unicast_socket_.async_receive_from(
180 boost::asio::buffer(&unicast_recv_buffer_[0], max_message_size_),
181 unicast_remote_,
182 std::bind(
183 &udp_server_endpoint_impl::on_unicast_received,
184 std::dynamic_pointer_cast<
185 udp_server_endpoint_impl >(shared_from_this()),
186 std::placeholders::_1,
187 std::placeholders::_2,
188 std::placeholders::_3
189 )
190 );
191 }
192 }
193
194 //
195 // receive_multicast is called with multicast_mutex_ being hold
196 //
receive_multicast(uint8_t _multicast_id)197 void udp_server_endpoint_impl::receive_multicast(uint8_t _multicast_id) {
198
199 if (_multicast_id == multicast_id_ && multicast_socket_ && multicast_socket_->is_open()) {
200 multicast_socket_->async_receive_from(
201 boost::asio::buffer(&multicast_recv_buffer_[0], max_message_size_),
202 multicast_remote_,
203 std::bind(
204 &udp_server_endpoint_impl::on_multicast_received,
205 std::dynamic_pointer_cast<
206 udp_server_endpoint_impl >(shared_from_this()),
207 std::placeholders::_1,
208 std::placeholders::_2,
209 std::placeholders::_3,
210 _multicast_id
211 )
212 );
213 }
214 }
215
send_to(const std::shared_ptr<endpoint_definition> _target,const byte_t * _data,uint32_t _size)216 bool udp_server_endpoint_impl::send_to(
217 const std::shared_ptr<endpoint_definition> _target,
218 const byte_t *_data, uint32_t _size) {
219
220 std::lock_guard<std::mutex> its_lock(mutex_);
221 endpoint_type its_target(_target->get_address(), _target->get_port());
222 return send_intern(its_target, _data, _size);
223 }
224
send_error(const std::shared_ptr<endpoint_definition> _target,const byte_t * _data,uint32_t _size)225 bool udp_server_endpoint_impl::send_error(
226 const std::shared_ptr<endpoint_definition> _target,
227 const byte_t *_data, uint32_t _size) {
228
229 bool ret(false);
230 std::lock_guard<std::mutex> its_lock(mutex_);
231 const endpoint_type its_target(_target->get_address(), _target->get_port());
232 const queue_iterator_type target_queue_iterator(find_or_create_queue_unlocked(its_target));
233 auto& its_qpair = target_queue_iterator->second;
234 const bool queue_size_zero_on_entry(its_qpair.second.empty());
235
236 if (check_message_size(nullptr, _size, its_target) == endpoint_impl::cms_ret_e::MSG_OK &&
237 check_queue_limit(_data, _size, its_qpair.first)) {
238 its_qpair.second.emplace_back(
239 std::make_shared<message_buffer_t>(_data, _data + _size));
240 its_qpair.first += _size;
241
242 if (queue_size_zero_on_entry) { // no writing in progress
243 send_queued(target_queue_iterator);
244 }
245 ret = true;
246 }
247 return ret;
248 }
249
send_queued(const queue_iterator_type _queue_iterator)250 void udp_server_endpoint_impl::send_queued(
251 const queue_iterator_type _queue_iterator) {
252
253 message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front();
254 #if 0
255 std::stringstream msg;
256 msg << "usei::sq(" << _queue_iterator->first.address().to_string() << ":"
257 << _queue_iterator->first.port() << "): ";
258 for (std::size_t i = 0; i < its_buffer->size(); ++i)
259 msg << std::hex << std::setw(2) << std::setfill('0')
260 << (int)(*its_buffer)[i] << " ";
261 VSOMEIP_INFO << msg.str();
262 #endif
263 std::lock_guard<std::mutex> its_lock(unicast_mutex_);
264
265 unicast_socket_.async_send_to(
266 boost::asio::buffer(*its_buffer),
267 _queue_iterator->first,
268 std::bind(
269 &udp_server_endpoint_base_impl::send_cbk,
270 shared_from_this(),
271 _queue_iterator,
272 std::placeholders::_1,
273 std::placeholders::_2
274 )
275 );
276 }
277
get_configured_times_from_endpoint(service_t _service,method_t _method,std::chrono::nanoseconds * _debouncing,std::chrono::nanoseconds * _maximum_retention) const278 void udp_server_endpoint_impl::get_configured_times_from_endpoint(
279 service_t _service, method_t _method,
280 std::chrono::nanoseconds *_debouncing,
281 std::chrono::nanoseconds *_maximum_retention) const {
282
283 configuration_->get_configured_timing_responses(_service,
284 udp_server_endpoint_base_impl::local_.address().to_string(),
285 udp_server_endpoint_base_impl::local_.port(), _method,
286 _debouncing, _maximum_retention);
287 }
288
289 //
290 // Both is_joined - methods must be called with multicast_mutex_ being hold!
291 //
is_joined(const std::string & _address) const292 bool udp_server_endpoint_impl::is_joined(const std::string &_address) const {
293
294 return (joined_.find(_address) != joined_.end());
295 }
296
is_joined(const std::string & _address,bool * _received) const297 bool udp_server_endpoint_impl::is_joined(
298 const std::string &_address, bool* _received) const {
299
300 const auto found_address = joined_.find(_address);
301 if (found_address != joined_.end()) {
302 *_received = found_address->second;
303 } else {
304 *_received = false;
305 }
306
307 return (found_address != joined_.end());
308 }
309
join(const std::string & _address)310 void udp_server_endpoint_impl::join(const std::string &_address) {
311
312 std::lock_guard<std::mutex> its_lock(multicast_mutex_);
313 join_unlocked(_address);
314 }
315
join_unlocked(const std::string & _address)316 void udp_server_endpoint_impl::join_unlocked(const std::string &_address) {
317
318 bool has_received(false);
319
320 //
321 // join_func must be called with multicast_mutex_ being hold!
322 //
323 auto join_func = [this](const std::string &_address) {
324 try {
325 VSOMEIP_DEBUG << "Joining to multicast group " << _address
326 << " from " << local_.address().to_string();
327
328 boost::system::error_code ec;
329
330 bool is_v4(false);
331 bool is_v6(false);
332 {
333 std::lock_guard<std::mutex> its_lock(local_mutex_);
334 is_v4 = local_.address().is_v4();
335 is_v6 = local_.address().is_v6();
336 }
337
338 if (multicast_recv_buffer_.empty())
339 multicast_recv_buffer_.resize(VSOMEIP_MAX_UDP_MESSAGE_SIZE, 0);
340
341 if (!multicast_local_) {
342 if (is_v4) {
343 multicast_local_ = std::unique_ptr<endpoint_type>(
344 new endpoint_type(boost::asio::ip::address_v4::any(), local_port_));
345 }
346 if (is_v6) {
347 multicast_local_ = std::unique_ptr<endpoint_type>(
348 new endpoint_type(boost::asio::ip::address_v6::any(), local_port_));
349 }
350 }
351
352 if (!multicast_socket_) {
353 multicast_socket_ = std::unique_ptr<socket_type>(
354 new socket_type(service_, local_.protocol()));
355
356 boost::asio::socket_base::reuse_address optionReuseAddress(true);
357 multicast_socket_->set_option(optionReuseAddress, ec);
358 boost::asio::detail::throw_error(ec, "reuse address in multicast");
359 boost::asio::socket_base::broadcast optionBroadcast(true);
360 multicast_socket_->set_option(optionBroadcast, ec);
361 boost::asio::detail::throw_error(ec, "set broadcast option");
362
363 multicast_socket_->bind(*multicast_local_, ec);
364 boost::asio::detail::throw_error(ec, "bind multicast");
365
366 const int its_udp_recv_buffer_size =
367 configuration_->get_udp_receive_buffer_size();
368 multicast_socket_->set_option(boost::asio::socket_base::receive_buffer_size(
369 its_udp_recv_buffer_size), ec);
370 if (ec) {
371 VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't set "
372 << "SO_RCVBUF: " << ec.message() << " to: " << std::dec
373 << its_udp_recv_buffer_size << " local port: " << std::dec
374 << local_port_;
375 }
376
377 boost::asio::socket_base::receive_buffer_size its_option;
378 multicast_socket_->get_option(its_option, ec);
379 #ifdef __linux__
380 // If regular setting of the buffer size did not work, try to force
381 // (requires CAP_NET_ADMIN to be successful)
382 if (its_option.value() < 0
383 || its_option.value() < its_udp_recv_buffer_size) {
384 ec.assign(setsockopt(multicast_socket_->native_handle(),
385 SOL_SOCKET, SO_RCVBUFFORCE,
386 &its_udp_recv_buffer_size, sizeof(its_udp_recv_buffer_size)),
387 boost::system::generic_category());
388 if (!ec) {
389 VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: "
390 << "SO_RCVBUFFORCE: successful.";
391 }
392 multicast_socket_->get_option(its_option, ec);
393 }
394 #endif
395 if (ec) {
396 VSOMEIP_WARNING << "udp_server_endpoint_impl<multicast>: couldn't get "
397 << "SO_RCVBUF: " << ec.message() << " local port:"
398 << std::dec << local_port_;
399 } else {
400 VSOMEIP_INFO << "udp_server_endpoint_impl<multicast>: SO_RCVBUF is: "
401 << std::dec << its_option.value()
402 << " (" << its_udp_recv_buffer_size << ") local port:"
403 << std::dec << local_port_;
404 }
405
406 #ifdef _WIN32
407 const char* optval("0001");
408 if (is_v4) {
409 ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO,
410 optval, sizeof(optval));
411 } else if (is_v6) {
412 ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IPV6, IPV6_PKTINFO,
413 optval, sizeof(optval));
414 }
415 #else
416 int optval(1);
417 if (is_v4) {
418 ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IP, IP_PKTINFO,
419 &optval, sizeof(optval));
420 } else {
421 ::setsockopt(multicast_socket_->native_handle(), IPPROTO_IPV6, IPV6_RECVPKTINFO,
422 &optval, sizeof(optval));
423 }
424 #endif
425 multicast_id_++;
426 receive_multicast(multicast_id_);
427 }
428
429 if (is_v4) {
430 multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true));
431 multicast_socket_->set_option(
432 boost::asio::ip::multicast::enable_loopback(false));
433 multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
434 boost::asio::ip::address::from_string(_address).to_v4(),
435 local_.address().to_v4()));
436 } else if (is_v6) {
437 multicast_socket_->set_option(ip::udp_ext::socket::reuse_address(true));
438 multicast_socket_->set_option(
439 boost::asio::ip::multicast::enable_loopback(false));
440 multicast_socket_->set_option(boost::asio::ip::multicast::join_group(
441 boost::asio::ip::address::from_string(_address).to_v6(),
442 local_.address().to_v6().scope_id()));
443 }
444
445 joined_[_address] = false;
446 joined_group_ = true;
447
448 } catch (const std::exception &e) {
449 VSOMEIP_ERROR << "udp_server_endpoint_impl::join" << ":" << e.what()
450 << " address: " << _address;
451 }
452 };
453
454 if (!is_joined(_address, &has_received)) {
455 join_func(_address);
456 } else if (!has_received) {
457 // joined the multicast group but didn't receive a event yet -> rejoin
458 leave_unlocked(_address);
459 join_func(_address);
460 }
461 }
462
leave(const std::string & _address)463 void udp_server_endpoint_impl::leave(const std::string &_address) {
464
465 std::lock_guard<std::mutex> its_lock(multicast_mutex_);
466 leave_unlocked(_address);
467 }
468
leave_unlocked(const std::string & _address)469 void udp_server_endpoint_impl::leave_unlocked(const std::string &_address) {
470
471 try {
472 if (is_joined(_address)) {
473 VSOMEIP_DEBUG << "Leaving the multicast group " << _address
474 << " from " << local_.address().to_string();
475
476 bool is_v4(false);
477 bool is_v6(false);
478 {
479 std::lock_guard<std::mutex> its_lock(local_mutex_);
480 is_v4 = local_.address().is_v4();
481 is_v6 = local_.address().is_v6();
482 }
483 if (is_v4) {
484 multicast_socket_->set_option(boost::asio::ip::multicast::leave_group(
485 boost::asio::ip::address::from_string(_address)));
486 } else if (is_v6) {
487 multicast_socket_->set_option(boost::asio::ip::multicast::leave_group(
488 boost::asio::ip::address::from_string(_address)));
489 }
490
491 joined_.erase(_address);
492 if (0 == joined_.size()) {
493 joined_group_ = false;
494
495 boost::system::error_code ec;
496 multicast_socket_->cancel(ec);
497
498 multicast_socket_.reset(nullptr);
499 multicast_local_.reset(nullptr);
500 }
501 }
502 }
503 catch (const std::exception &e) {
504 VSOMEIP_ERROR << __func__ << ":" << e.what()
505 << " address: " << _address;
506 }
507 }
508
add_default_target(service_t _service,const std::string & _address,uint16_t _port)509 void udp_server_endpoint_impl::add_default_target(
510 service_t _service, const std::string &_address, uint16_t _port) {
511 std::lock_guard<std::mutex> its_lock(default_targets_mutex_);
512 endpoint_type its_endpoint(
513 boost::asio::ip::address::from_string(_address), _port);
514 default_targets_[_service] = its_endpoint;
515 }
516
remove_default_target(service_t _service)517 void udp_server_endpoint_impl::remove_default_target(service_t _service) {
518 std::lock_guard<std::mutex> its_lock(default_targets_mutex_);
519 default_targets_.erase(_service);
520 }
521
get_default_target(service_t _service,udp_server_endpoint_impl::endpoint_type & _target) const522 bool udp_server_endpoint_impl::get_default_target(service_t _service,
523 udp_server_endpoint_impl::endpoint_type &_target) const {
524 std::lock_guard<std::mutex> its_lock(default_targets_mutex_);
525 bool is_valid(false);
526 auto find_service = default_targets_.find(_service);
527 if (find_service != default_targets_.end()) {
528 _target = find_service->second;
529 is_valid = true;
530 }
531 return is_valid;
532 }
533
get_local_port() const534 std::uint16_t udp_server_endpoint_impl::get_local_port() const {
535 return local_port_;
536 }
537
set_local_port(std::uint16_t _port)538 void udp_server_endpoint_impl::set_local_port(std::uint16_t _port) {
539 (void)_port;
540 }
541
on_unicast_received(boost::system::error_code const & _error,std::size_t _bytes,boost::asio::ip::address const & _destination)542 void udp_server_endpoint_impl::on_unicast_received(
543 boost::system::error_code const &_error,
544 std::size_t _bytes,
545 boost::asio::ip::address const &_destination) {
546
547 if (_error != boost::asio::error::operation_aborted) {
548 {
549 // By locking the multicast mutex here it is ensured that unicast
550 // & multicast messages are not processed in parallel. This aligns
551 // the behavior of endpoints with one and two active sockets.
552 std::lock_guard<std::mutex> its_lock(multicast_mutex_);
553 on_message_received(_error, _bytes, _destination,
554 unicast_remote_, unicast_recv_buffer_);
555 }
556 receive_unicast();
557 }
558 }
559
on_multicast_received(boost::system::error_code const & _error,std::size_t _bytes,boost::asio::ip::address const & _destination,uint8_t _multicast_id)560 void udp_server_endpoint_impl::on_multicast_received(
561 boost::system::error_code const &_error,
562 std::size_t _bytes,
563 boost::asio::ip::address const &_destination,
564 uint8_t _multicast_id) {
565
566 std::lock_guard<std::mutex> its_lock(multicast_mutex_);
567 if (_error != boost::asio::error::operation_aborted) {
568 // Filter messages sent from the same source address
569 if (multicast_remote_.address() != local_.address()) {
570 on_message_received(_error, _bytes, _destination,
571 multicast_remote_, multicast_recv_buffer_);
572 }
573
574 receive_multicast(_multicast_id);
575 }
576 }
577
on_message_received(boost::system::error_code const & _error,std::size_t _bytes,boost::asio::ip::address const & _destination,endpoint_type const & _remote,message_buffer_t const & _buffer)578 void udp_server_endpoint_impl::on_message_received(
579 boost::system::error_code const &_error, std::size_t _bytes,
580 boost::asio::ip::address const &_destination,
581 endpoint_type const &_remote,
582 message_buffer_t const &_buffer) {
583 #if 0
584 std::stringstream msg;
585 msg << "usei::rcb(" << _error.message() << "): ";
586 for (std::size_t i = 0; i < _bytes; ++i)
587 msg << std::hex << std::setw(2) << std::setfill('0')
588 << (int) recv_buffer_[i] << " ";
589 VSOMEIP_INFO << msg.str();
590 #endif
591 std::shared_ptr<routing_host> its_host = routing_host_.lock();
592
593 if (its_host) {
594 if (!_error && 0 < _bytes) {
595 std::size_t remaining_bytes = _bytes;
596 std::size_t i = 0;
597 const boost::asio::ip::address its_remote_address(_remote.address());
598 const std::uint16_t its_remote_port(_remote.port());
599 do {
600 uint64_t read_message_size
601 = utility::get_message_size(&_buffer[i],
602 remaining_bytes);
603 if (read_message_size > MESSAGE_SIZE_UNLIMITED) {
604 VSOMEIP_ERROR << "Message size exceeds allowed maximum!";
605 return;
606 }
607 uint32_t current_message_size = static_cast<uint32_t>(read_message_size);
608 if (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE &&
609 current_message_size <= remaining_bytes) {
610 if (remaining_bytes - current_message_size > remaining_bytes) {
611 VSOMEIP_ERROR << "buffer underflow in udp client endpoint ~> abort!";
612 return;
613 } else if (current_message_size > VSOMEIP_RETURN_CODE_POS &&
614 (_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION ||
615 !utility::is_valid_message_type(tp::tp::tp_flag_unset(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) ||
616 /*!utility::is_valid_return_code(static_cast<return_code_e>(_buffer[i + VSOMEIP_RETURN_CODE_POS])) ||*/
617 (tp::tp::tp_flag_is_set(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS]) && get_local_port() == configuration_->get_sd_port())
618 )) {
619 if (_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) {
620 VSOMEIP_ERROR << "use: Wrong protocol version: 0x"
621 << std::hex << std::setw(2) << std::setfill('0')
622 << std::uint32_t(_buffer[i + VSOMEIP_PROTOCOL_VERSION_POS])
623 << " local: " << get_address_port_local()
624 << " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
625 // ensure to send back a message w/ wrong protocol version
626 its_host->on_message(&_buffer[i],
627 VSOMEIP_SOMEIP_HEADER_SIZE + 8, this,
628 _destination,
629 VSOMEIP_ROUTING_CLIENT,
630 std::make_pair(ANY_UID, ANY_GID),
631 its_remote_address, its_remote_port);
632 } else if (!utility::is_valid_message_type(tp::tp::tp_flag_unset(
633 _buffer[i + VSOMEIP_MESSAGE_TYPE_POS]))) {
634 VSOMEIP_ERROR << "use: Invalid message type: 0x"
635 << std::hex << std::setw(2) << std::setfill('0')
636 << std::uint32_t(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])
637 << " local: " << get_address_port_local()
638 << " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
639 } else if (!utility::is_valid_return_code(static_cast<return_code_e>(
640 _buffer[i + VSOMEIP_RETURN_CODE_POS]))) {
641 VSOMEIP_ERROR << "use: Invalid return code: 0x"
642 << std::hex << std::setw(2) << std::setfill('0')
643 << std::uint32_t(_buffer[i + VSOMEIP_RETURN_CODE_POS])
644 << " local: " << get_address_port_local()
645 << " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
646 } else if (tp::tp::tp_flag_is_set(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])
647 && get_local_port() == configuration_->get_sd_port()) {
648 VSOMEIP_WARNING << "use: Received a SomeIP/TP message on SD port:"
649 << " local: " << get_address_port_local()
650 << " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
651 }
652 return;
653 }
654 remaining_bytes -= current_message_size;
655 const service_t its_service = VSOMEIP_BYTES_TO_WORD(_buffer[i + VSOMEIP_SERVICE_POS_MIN],
656 _buffer[i + VSOMEIP_SERVICE_POS_MAX]);
657 if (utility::is_request(
658 _buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) {
659 const client_t its_client = VSOMEIP_BYTES_TO_WORD(
660 _buffer[i + VSOMEIP_CLIENT_POS_MIN],
661 _buffer[i + VSOMEIP_CLIENT_POS_MAX]);
662 if (its_client != MAGIC_COOKIE_CLIENT) {
663 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
664 _buffer[i + VSOMEIP_SESSION_POS_MIN],
665 _buffer[i + VSOMEIP_SESSION_POS_MAX]);
666 const method_t its_method = VSOMEIP_BYTES_TO_WORD(
667 _buffer[i + VSOMEIP_METHOD_POS_MIN],
668 _buffer[i + VSOMEIP_METHOD_POS_MAX]);
669
670 std::lock_guard<std::mutex> its_requests_guard(requests_mutex_);
671 requests_[its_client]
672 [std::make_tuple(its_service, its_method, its_session)] = _remote;
673 }
674 } else if (its_service != VSOMEIP_SD_SERVICE
675 && utility::is_notification(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])
676 && joined_group_) {
677 boost::system::error_code ec;
678 const auto found_address = joined_.find(_destination.to_string(ec));
679 if (found_address != joined_.end()) {
680 found_address->second = true;
681 }
682 }
683 if (tp::tp::tp_flag_is_set(_buffer[i + VSOMEIP_MESSAGE_TYPE_POS])) {
684 const method_t its_method = VSOMEIP_BYTES_TO_WORD(_buffer[i + VSOMEIP_METHOD_POS_MIN],
685 _buffer[i + VSOMEIP_METHOD_POS_MAX]);
686 if (!tp_segmentation_enabled(its_service, its_method)) {
687 VSOMEIP_WARNING << "use: Received a SomeIP/TP message for service: 0x" << std::hex << its_service
688 << " method: 0x" << its_method << " which is not configured for TP:"
689 << " local: " << get_address_port_local()
690 << " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
691 return;
692 }
693 const auto res = tp_reassembler_->process_tp_message(
694 &_buffer[i], current_message_size,
695 its_remote_address, its_remote_port);
696 if (res.first) {
697 if (utility::is_request(res.second[VSOMEIP_MESSAGE_TYPE_POS])) {
698 const client_t its_client = VSOMEIP_BYTES_TO_WORD(
699 res.second[VSOMEIP_CLIENT_POS_MIN],
700 res.second[VSOMEIP_CLIENT_POS_MAX]);
701 if (its_client != MAGIC_COOKIE_CLIENT) {
702 const service_t its_service = VSOMEIP_BYTES_TO_WORD(
703 res.second[VSOMEIP_SERVICE_POS_MIN],
704 res.second[VSOMEIP_SERVICE_POS_MAX]);
705 const method_t its_method = VSOMEIP_BYTES_TO_WORD(
706 res.second[VSOMEIP_METHOD_POS_MIN],
707 res.second[VSOMEIP_METHOD_POS_MAX]);
708 const session_t its_session = VSOMEIP_BYTES_TO_WORD(
709 res.second[VSOMEIP_SESSION_POS_MIN],
710 res.second[VSOMEIP_SESSION_POS_MAX]);
711
712 std::lock_guard<std::mutex> its_requests_guard(requests_mutex_);
713 requests_[its_client]
714 [std::make_tuple(its_service, its_method, its_session)] = _remote;
715 }
716 } else if (its_service != VSOMEIP_SD_SERVICE
717 && utility::is_notification(res.second[VSOMEIP_MESSAGE_TYPE_POS])
718 && joined_group_) {
719 boost::system::error_code ec;
720 const auto found_address = joined_.find(_destination.to_string(ec));
721 if (found_address != joined_.end()) {
722 found_address->second = true;
723 }
724 }
725 its_host->on_message(&res.second[0],
726 static_cast<std::uint32_t>(res.second.size()),
727 this, _destination, VSOMEIP_ROUTING_CLIENT,
728 std::make_pair(ANY_UID, ANY_GID),
729 its_remote_address, its_remote_port);
730 }
731 } else {
732 if (its_service != VSOMEIP_SD_SERVICE ||
733 (current_message_size > VSOMEIP_SOMEIP_HEADER_SIZE &&
734 current_message_size >= remaining_bytes)) {
735 its_host->on_message(&_buffer[i],
736 current_message_size, this, _destination,
737 VSOMEIP_ROUTING_CLIENT,
738 std::make_pair(ANY_UID, ANY_GID),
739 its_remote_address, its_remote_port);
740 } else {
741 //ignore messages for service discovery with shorter SomeIP length
742 VSOMEIP_ERROR << "Received an unreliable vSomeIP SD message with too short length field"
743 << " local: " << get_address_port_local()
744 << " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
745 }
746 }
747 i += current_message_size;
748 } else {
749 VSOMEIP_ERROR << "Received an unreliable vSomeIP message with bad length field"
750 << " local: " << get_address_port_local()
751 << " remote: " << its_remote_address << ":" << std::dec << its_remote_port;
752 if (remaining_bytes > VSOMEIP_SERVICE_POS_MAX) {
753 service_t its_service = VSOMEIP_BYTES_TO_WORD(_buffer[VSOMEIP_SERVICE_POS_MIN],
754 _buffer[VSOMEIP_SERVICE_POS_MAX]);
755 if (its_service != VSOMEIP_SD_SERVICE) {
756 if (read_message_size == 0) {
757 VSOMEIP_ERROR << "Ignoring unreliable vSomeIP message with SomeIP message length 0!";
758 } else {
759 auto its_endpoint_host = endpoint_host_.lock();
760 if (its_endpoint_host) {
761 its_endpoint_host->on_error(&_buffer[i],
762 (uint32_t)remaining_bytes, this,
763 its_remote_address, its_remote_port);
764 }
765 }
766 }
767 }
768 remaining_bytes = 0;
769 }
770 } while (remaining_bytes > 0);
771 }
772 }
773 }
774
print_status()775 void udp_server_endpoint_impl::print_status() {
776 std::lock_guard<std::mutex> its_lock(mutex_);
777
778 VSOMEIP_INFO << "status use: " << std::dec << local_port_
779 << " number queues: " << std::dec << queues_.size()
780 << " recv_buffer: "
781 << std::dec << unicast_recv_buffer_.capacity()
782 << " multicast_recv_buffer: "
783 << std::dec << multicast_recv_buffer_.capacity();
784
785 for (const auto &c : queues_) {
786 std::size_t its_data_size(0);
787 std::size_t its_queue_size(0);
788 its_queue_size = c.second.second.size();
789 its_data_size = c.second.first;
790
791 boost::system::error_code ec;
792 VSOMEIP_INFO << "status use: client: "
793 << c.first.address().to_string(ec) << ":"
794 << std::dec << c.first.port()
795 << " queue: " << std::dec << its_queue_size
796 << " data: " << std::dec << its_data_size;
797 }
798 }
799
get_remote_information(const queue_iterator_type _queue_iterator) const800 std::string udp_server_endpoint_impl::get_remote_information(
801 const queue_iterator_type _queue_iterator) const {
802 boost::system::error_code ec;
803 return _queue_iterator->first.address().to_string(ec) + ":"
804 + std::to_string(_queue_iterator->first.port());
805 }
806
get_remote_information(const endpoint_type & _remote) const807 std::string udp_server_endpoint_impl::get_remote_information(
808 const endpoint_type& _remote) const {
809 boost::system::error_code ec;
810 return _remote.address().to_string(ec) + ":"
811 + std::to_string(_remote.port());
812 }
813
is_reliable() const814 bool udp_server_endpoint_impl::is_reliable() const {
815 return false;
816 }
817
get_address_port_local() const818 const std::string udp_server_endpoint_impl::get_address_port_local() const {
819
820 std::lock_guard<std::mutex> its_lock(unicast_mutex_);
821 std::string its_address_port;
822 its_address_port.reserve(21);
823 boost::system::error_code ec;
824 if (unicast_socket_.is_open()) {
825 endpoint_type its_local_endpoint = unicast_socket_.local_endpoint(ec);
826 if (!ec) {
827 its_address_port += its_local_endpoint.address().to_string(ec);
828 its_address_port += ":";
829 its_address_port += std::to_string(its_local_endpoint.port());
830 }
831 }
832 return its_address_port;
833 }
834
tp_segmentation_enabled(service_t _service,method_t _method) const835 bool udp_server_endpoint_impl::tp_segmentation_enabled(
836 service_t _service, method_t _method) const {
837
838 return configuration_->tp_segment_messages_service_to_client(_service,
839 local_.address().to_string(),
840 local_.port(), _method);
841 }
842
843 } // namespace vsomeip_v3
844