1 // Copyright (C) 2019 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 
6 #include <iomanip>
7 
8 #include "../include/tp_reassembler.hpp"
9 
10 #include <vsomeip/defines.hpp>
11 #include <vsomeip/enumeration_types.hpp>
12 #include <vsomeip/internal/logger.hpp>
13 
14 #include "../include/tp.hpp"
15 #include "../../utility/include/byteorder.hpp"
16 
17 #ifdef ANDROID
18 #include "../../configuration/include/internal_android.hpp"
19 #else
20 #include "../../configuration/include/internal.hpp"
21 #endif // ANDROID
22 
23 namespace vsomeip_v3 {
24 namespace tp {
25 
tp_reassembler(std::uint32_t _max_message_size,boost::asio::io_service & _io)26 tp_reassembler::tp_reassembler(std::uint32_t _max_message_size, boost::asio::io_service &_io) :
27     max_message_size_(_max_message_size),
28     cleanup_timer_running_(false),
29     cleanup_timer_(_io) {
30 }
31 
process_tp_message(const byte_t * const _data,std::uint32_t _data_size,const boost::asio::ip::address & _address,std::uint16_t _port)32 std::pair<bool, message_buffer_t> tp_reassembler::process_tp_message(
33         const byte_t* const _data, std::uint32_t _data_size,
34         const boost::asio::ip::address& _address, std::uint16_t _port) {
35     std::pair<bool, message_buffer_t> ret;
36     if (_data_size < VSOMEIP_FULL_HEADER_SIZE) {
37         return std::make_pair(false, message_buffer_t());
38     }
39 
40     cleanup_timer_start(false);
41 
42     const service_t its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
43                                                         _data[VSOMEIP_SERVICE_POS_MAX]);
44     const method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
45                                                       _data[VSOMEIP_METHOD_POS_MAX]);
46     const client_t its_client = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_CLIENT_POS_MIN],
47                                                       _data[VSOMEIP_CLIENT_POS_MAX]);
48     const session_t its_session = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SESSION_POS_MIN],
49                                                       _data[VSOMEIP_SESSION_POS_MAX]);
50     const interface_version_t its_interface_version = _data[VSOMEIP_INTERFACE_VERSION_POS];
51     const message_type_e its_msg_type = tp::tp_flag_unset(_data[VSOMEIP_MESSAGE_TYPE_POS]);
52 
53     const std::uint64_t its_tp_message_id = ((static_cast<std::uint64_t>(its_service) << 48) |
54                                              (static_cast<std::uint64_t>(its_method) << 32) |
55                                              (static_cast<std::uint64_t>(its_client) << 16) |
56                                              (static_cast<std::uint64_t>(its_interface_version) << 8) |
57                                              (static_cast<std::uint64_t>(its_msg_type)));
58 
59     std::lock_guard<std::mutex> its_lock(mutex_);
60     ret.first = false;
61     const auto found_ip = tp_messages_.find(_address);
62     if (found_ip != tp_messages_.end()) {
63         const auto found_port = found_ip->second.find(_port);
64         if (found_port != found_ip->second.end()) {
65             auto found_tp_msg = found_port->second.find(its_tp_message_id);
66             if (found_tp_msg != found_port->second.end()) {
67                 if (found_tp_msg->second.first == its_session) {
68                     // received additional segment for already known message
69                     if (found_tp_msg->second.second.add_segment(_data, _data_size)) {
70                         // message is complete
71                         ret.first = true;
72                         ret.second = found_tp_msg->second.second.get_message();
73                         // cleanup tp_message as message was moved and cleanup map
74                         found_port->second.erase(its_tp_message_id);
75                         if (found_port->second.empty()) {
76                             found_ip->second.erase(found_port);
77                             if (found_ip->second.empty()) {
78                                 tp_messages_.erase(found_ip);
79                             }
80                         }
81                     }
82                 } else {
83                     VSOMEIP_WARNING << __func__ << ": Received new segment "
84                             "although old one is not finished yet. Dropping "
85                             "old. ("
86                             << std::hex << std::setw(4) << std::setfill('0') << its_client << ") ["
87                             << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
88                             << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
89                             << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(its_interface_version) << "."
90                             << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(its_msg_type) << "] Old: 0x"
91                             << std::hex << std::setw(4) << std::setfill('0') << found_tp_msg->second.first << ", new: 0x"
92                             << std::hex << std::setw(4) << std::setfill('0') << its_session;
93                     // new segment with different session id -> throw away current
94                     found_tp_msg->second.first = its_session;
95                     found_tp_msg->second.second = tp_message(_data, _data_size, max_message_size_);
96                 }
97             } else {
98                 found_port->second.emplace(
99                         std::make_pair(its_tp_message_id,
100                                 std::make_pair(its_session,
101                                         tp_message(_data, _data_size, max_message_size_))));
102             }
103         } else {
104             found_ip->second[_port].emplace(
105                     std::make_pair(its_tp_message_id,
106                             std::make_pair(its_session,
107                                     tp_message(_data, _data_size, max_message_size_))));
108         }
109     } else {
110         tp_messages_[_address][_port].emplace(
111                 std::make_pair(its_tp_message_id,
112                         std::make_pair(its_session,
113                                 tp_message(_data, _data_size, max_message_size_))));
114     }
115     return ret;
116 }
117 
cleanup_unfinished_messages()118 bool tp_reassembler::cleanup_unfinished_messages() {
119     std::lock_guard<std::mutex> its_lock(mutex_);
120     const std::chrono::steady_clock::time_point now =
121             std::chrono::steady_clock::now();
122     for (auto ip_iter = tp_messages_.begin(); ip_iter != tp_messages_.end();) {
123         for (auto port_iter = ip_iter->second.begin();
124                 port_iter != ip_iter->second.end();) {
125             for (auto tp_id_iter = port_iter->second.begin();
126                     tp_id_iter != port_iter->second.end();) {
127                 if (std::chrono::duration_cast<std::chrono::milliseconds>(
128                         now - tp_id_iter->second.second.get_creation_time()).count()
129                         > 5000) {
130                     // message is older than 5 seconds delete it
131                     const service_t its_service = static_cast<service_t>(tp_id_iter->first >> 48);
132                     const method_t its_method = static_cast<method_t>(tp_id_iter->first >> 32);
133                     const client_t its_client = static_cast<client_t>(tp_id_iter->first >> 16);
134                     const interface_version_t its_interface_version = static_cast<interface_version_t>(tp_id_iter->first >> 8);
135                     const message_type_e its_msg_type = static_cast<message_type_e>(tp_id_iter->first >> 0);
136                     VSOMEIP_WARNING << __func__
137                             << ": deleting unfinished SOME/IP-TP message from: "
138                             << ip_iter->first.to_string() << ":" << std::dec
139                             << port_iter->first << " ("
140                             << std::hex << std::setw(4) << std::setfill('0') << its_client << ") ["
141                             << std::hex << std::setw(4) << std::setfill('0') << its_service << "."
142                             << std::hex << std::setw(4) << std::setfill('0') << its_method << "."
143                             << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(its_interface_version) << "."
144                             << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(its_msg_type) << "."
145                             << std::hex << std::setw(4) << std::setfill('0') << tp_id_iter->second.first << "]";
146                     tp_id_iter = port_iter->second.erase(tp_id_iter);
147                 } else {
148                     tp_id_iter++;
149                 }
150             }
151             if (port_iter->second.empty()) {
152                 port_iter = ip_iter->second.erase(port_iter);
153             } else {
154                 port_iter++;
155             }
156         }
157         if (ip_iter->second.empty()) {
158             ip_iter = tp_messages_.erase(ip_iter);
159         } else {
160             ip_iter++;
161         }
162     }
163     return !tp_messages_.empty();
164 }
165 
stop()166 void tp_reassembler::stop() {
167     std::lock_guard<std::mutex> its_lock(cleanup_timer_mutex_);
168     boost::system::error_code ec;
169     cleanup_timer_.cancel(ec);
170 }
171 
cleanup_timer_start(bool _force)172 void tp_reassembler::cleanup_timer_start(bool _force) {
173     std::lock_guard<std::mutex> its_lock(cleanup_timer_mutex_);
174     cleanup_timer_start_unlocked(_force);
175 }
176 
cleanup_timer_start_unlocked(bool _force)177 void tp_reassembler::cleanup_timer_start_unlocked(bool _force) {
178     boost::system::error_code ec;
179     if (!cleanup_timer_running_ || _force) {
180         cleanup_timer_.expires_from_now(std::chrono::seconds(5));
181         cleanup_timer_running_ = true;
182         cleanup_timer_.async_wait(
183                 std::bind(&tp_reassembler::cleanup_timer_cbk,
184                         shared_from_this(), std::placeholders::_1));
185     }
186 }
187 
cleanup_timer_cbk(const boost::system::error_code _error)188 void tp_reassembler::cleanup_timer_cbk(
189         const boost::system::error_code _error) {
190     if (!_error) {
191         std::lock_guard<std::mutex> its_lock(cleanup_timer_mutex_);
192         if (cleanup_unfinished_messages()) {
193             cleanup_timer_start_unlocked(true);
194         } else {
195             // don't start timer again as there are no more segmented messages present
196             cleanup_timer_running_ = false;
197         }
198     }
199 }
200 
201 } //namespace tp
202 } // namespace vsomeip_v3
203