1 // Copyright (C) 2015-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include "payload_test_client.hpp"
7
8 enum class payloadsize
9 : std::uint8_t
10 {
11 UDS, TCP, UDP, USER_SPECIFIED
12 };
13
14 // this variables are changed via cmdline parameters
15 static bool use_tcp = false;
16 static bool call_service_sync = true;
17 static std::uint32_t sliding_window_size = vsomeip_test::NUMBER_OF_MESSAGES_TO_SEND_PAYLOAD_TESTS;
18 static payloadsize max_payload_size = payloadsize::UDS;
19 static bool shutdown_service_at_end = true;
20 static std::uint32_t user_defined_max_payload;
21 static std::uint32_t number_of_messages_to_send = 0;
22
payload_test_client(bool _use_tcp,bool _call_service_sync,std::uint32_t _sliding_window_size)23 payload_test_client::payload_test_client(
24 bool _use_tcp,
25 bool _call_service_sync,
26 std::uint32_t _sliding_window_size) :
27 app_(vsomeip::runtime::get()->create_application()),
28 request_(vsomeip::runtime::get()->create_request(_use_tcp)),
29 call_service_sync_(_call_service_sync),
30 sliding_window_size_(_sliding_window_size),
31 blocked_(false),
32 is_available_(false),
33 number_of_messages_to_send_(number_of_messages_to_send ? number_of_messages_to_send : vsomeip_test::NUMBER_OF_MESSAGES_TO_SEND_PAYLOAD_TESTS),
34 number_of_sent_messages_(0),
35 number_of_sent_messages_total_(0),
36 number_of_acknowledged_messages_(0),
37 current_payload_size_(1),
38 all_msg_acknowledged_(false),
39 sender_(std::bind(&payload_test_client::run, this))
40 {
41 }
42
init()43 bool payload_test_client::init()
44 {
45 if (!app_->init()) {
46 ADD_FAILURE() << "Couldn't initialize application";
47 return false;
48 }
49
50 app_->register_state_handler(
51 std::bind(&payload_test_client::on_state, this,
52 std::placeholders::_1));
53
54 app_->register_message_handler(vsomeip::ANY_SERVICE,
55 vsomeip_test::TEST_SERVICE_INSTANCE_ID, vsomeip::ANY_METHOD,
56 std::bind(&payload_test_client::on_message, this,
57 std::placeholders::_1));
58
59 app_->register_availability_handler(vsomeip_test::TEST_SERVICE_SERVICE_ID,
60 vsomeip_test::TEST_SERVICE_INSTANCE_ID,
61 std::bind(&payload_test_client::on_availability, this,
62 std::placeholders::_1, std::placeholders::_2,
63 std::placeholders::_3));
64 return true;
65 }
66
start()67 void payload_test_client::start()
68 {
69 VSOMEIP_INFO << "Starting...";
70 app_->start();
71 }
72
stop()73 void payload_test_client::stop()
74 {
75 VSOMEIP_INFO << "Stopping...";
76 // shutdown the service
77 if(shutdown_service_at_end)
78 {
79 shutdown_service();
80 }
81 app_->clear_all_handler();
82 }
83
shutdown_service()84 void payload_test_client::shutdown_service()
85 {
86 request_->set_service(vsomeip_test::TEST_SERVICE_SERVICE_ID);
87 request_->set_instance(vsomeip_test::TEST_SERVICE_INSTANCE_ID);
88 request_->set_method(vsomeip_test::TEST_SERVICE_METHOD_ID_SHUTDOWN);
89 app_->send(request_);
90 }
91
join_sender_thread()92 void payload_test_client::join_sender_thread()
93 {
94 sender_.join();
95 }
96
on_state(vsomeip::state_type_e _state)97 void payload_test_client::on_state(vsomeip::state_type_e _state)
98 {
99 if(_state == vsomeip::state_type_e::ST_REGISTERED)
100 {
101 app_->request_service(vsomeip_test::TEST_SERVICE_SERVICE_ID,
102 vsomeip_test::TEST_SERVICE_INSTANCE_ID, false);
103 }
104 }
105
on_availability(vsomeip::service_t _service,vsomeip::instance_t _instance,bool _is_available)106 void payload_test_client::on_availability(vsomeip::service_t _service,
107 vsomeip::instance_t _instance, bool _is_available)
108 {
109 VSOMEIP_INFO << "Service [" << std::setw(4) << std::setfill('0') << std::hex
110 << _service << "." << _instance << "] is "
111 << (_is_available ? "available." : "NOT available.");
112
113 if(vsomeip_test::TEST_SERVICE_SERVICE_ID == _service
114 && vsomeip_test::TEST_SERVICE_INSTANCE_ID == _instance)
115 {
116 if(is_available_ && !_is_available)
117 {
118 is_available_ = false;
119 }
120 else if(_is_available && !is_available_)
121 {
122 is_available_ = true;
123 send();
124 }
125 }
126 }
127
on_message(const std::shared_ptr<vsomeip::message> & _response)128 void payload_test_client::on_message(const std::shared_ptr<vsomeip::message>& _response)
129 {
130 number_of_acknowledged_messages_++;
131
132 ASSERT_EQ(_response->get_service(), vsomeip_test::TEST_SERVICE_SERVICE_ID);
133 ASSERT_EQ(_response->get_instance(), vsomeip_test::TEST_SERVICE_INSTANCE_ID);
134
135 if(call_service_sync_)
136 {
137 // We notify the sender thread every time a message was acknowledged
138 {
139 std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
140 all_msg_acknowledged_ = true;
141 }
142 all_msg_acknowledged_cv_.notify_one();
143 }
144 else
145 {
146 // We notify the sender thread only if all sent messages have been acknowledged
147 if(number_of_acknowledged_messages_ == number_of_messages_to_send_)
148 {
149 std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
150 number_of_acknowledged_messages_ = 0;
151 all_msg_acknowledged_ = true;
152 all_msg_acknowledged_cv_.notify_one();
153 }
154 else if(number_of_acknowledged_messages_ % sliding_window_size_ == 0)
155 {
156 std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
157 all_msg_acknowledged_ = true;
158 all_msg_acknowledged_cv_.notify_one();
159 }
160 }
161 }
162
send()163 void payload_test_client::send()
164 {
165 std::lock_guard<std::mutex> its_lock(mutex_);
166 blocked_ = true;
167 condition_.notify_one();
168 }
169
run()170 void payload_test_client::run()
171 {
172 std::unique_lock<std::mutex> its_lock(mutex_);
173 while (!blocked_)
174 {
175 condition_.wait(its_lock);
176 }
177
178 request_->set_service(vsomeip_test::TEST_SERVICE_SERVICE_ID);
179 request_->set_instance(vsomeip_test::TEST_SERVICE_INSTANCE_ID);
180 request_->set_method(vsomeip_test::TEST_SERVICE_METHOD_ID);
181
182 // lock the mutex
183 std::unique_lock<std::mutex> lk(all_msg_acknowledged_mutex_);
184
185 std::uint32_t max_allowed_payload = get_max_allowed_payload();
186
187 std::shared_ptr<vsomeip::payload> payload = vsomeip::runtime::get()->create_payload();
188 std::vector<vsomeip::byte_t> payload_data;
189 bool reached_peak = false;
190 for(;;)
191 {
192 payload_data.assign(current_payload_size_ , vsomeip_test::PAYLOAD_TEST_DATA);
193 payload->set_data(payload_data);
194 request_->set_payload(payload);
195
196 watch_.reset();
197 watch_.start();
198
199 call_service_sync_ ? send_messages_sync(lk) : send_messages_async(lk);
200
201 watch_.stop();
202 print_throughput();
203
204 // Increase array size for next iteration
205 if(!reached_peak) {
206 current_payload_size_ *= 2;
207 } else {
208 current_payload_size_ /= 2;
209 }
210
211 if(!reached_peak && current_payload_size_ > max_allowed_payload)
212 {
213 current_payload_size_ = max_allowed_payload;
214 reached_peak = true;
215 } else if(reached_peak && current_payload_size_ <= 1) {
216 break;
217 }
218 }
219 blocked_ = false;
220
221 stop();
222 std::thread t1([](){ std::this_thread::sleep_for(std::chrono::microseconds(1000000 * 5));});
223 t1.join();
224 app_->stop();
225 std::thread t([](){ std::this_thread::sleep_for(std::chrono::microseconds(1000000 * 5));});
226 t.join();
227 }
228
229
get_max_allowed_payload()230 std::uint32_t payload_test_client::get_max_allowed_payload()
231 {
232 std::uint32_t payload;
233 switch (max_payload_size)
234 {
235 case payloadsize::UDS:
236 // TODO
237 payload = 1024 * 32 - 16;
238 break;
239 case payloadsize::TCP:
240 // TODO
241 payload = 4095 - 16;
242 break;
243 case payloadsize::UDP:
244 payload = VSOMEIP_MAX_UDP_MESSAGE_SIZE - 16;
245 break;
246 case payloadsize::USER_SPECIFIED:
247 payload = user_defined_max_payload;
248 break;
249 default:
250 payload = VSOMEIP_MAX_LOCAL_MESSAGE_SIZE;
251 break;
252 }
253 return payload;
254 }
255
send_messages_sync(std::unique_lock<std::mutex> & lk)256 void payload_test_client::send_messages_sync(std::unique_lock<std::mutex>& lk)
257 {
258 for (number_of_sent_messages_ = 0;
259 number_of_sent_messages_ < number_of_messages_to_send_;
260 number_of_sent_messages_++, number_of_sent_messages_total_++)
261 {
262 app_->send(request_);
263 // wait until the send messages has been acknowledged
264 // as long we wait lk is released; after wait returns lk is reacquired
265 all_msg_acknowledged_cv_.wait(lk, [&]
266 { return all_msg_acknowledged_;});
267 // Reset condition variable (lk is locked again here)
268 all_msg_acknowledged_ = false;
269 }
270 }
271
send_messages_async(std::unique_lock<std::mutex> & lk)272 void payload_test_client::send_messages_async(std::unique_lock<std::mutex>& lk)
273 {
274 for (number_of_sent_messages_ = 0;
275 number_of_sent_messages_ < number_of_messages_to_send_;
276 number_of_sent_messages_++, number_of_sent_messages_total_++)
277 {
278 app_->send(request_);
279
280 if((number_of_sent_messages_+1) % sliding_window_size_ == 0)
281 {
282 // wait until all send messages have been acknowledged
283 // as long we wait lk is released; after wait returns lk is reacquired
284 all_msg_acknowledged_cv_.wait(lk, [&]
285 { return all_msg_acknowledged_;});
286
287 // Reset condition variable
288 all_msg_acknowledged_ = false;
289 }
290 }
291 }
292
print_throughput()293 void payload_test_client::print_throughput()
294 {
295 constexpr std::uint32_t usec_per_sec = 1000000;
296 stop_watch::usec_t time_needed = watch_.get_total_elapsed_microseconds();
297 stop_watch::usec_t time_per_message = time_needed / number_of_sent_messages_;
298 std::double_t calls_per_sec = number_of_sent_messages_
299 * (usec_per_sec / static_cast<double>(time_needed));
300 std::double_t mbyte_per_sec = ((number_of_sent_messages_
301 * current_payload_size_)
302 / (static_cast<double>(time_needed) / usec_per_sec)) / (1024*1024);
303
304 VSOMEIP_INFO<< "[ Payload Test ] : :"
305 << "Payload size [byte]: " << std::dec << std::setw(8) << std::setfill('0') << current_payload_size_
306 << " Messages sent: " << std::dec << std::setw(8) << std::setfill('0') << number_of_sent_messages_
307 << " Meantime/message [usec]: " << std::dec << std::setw(8) << std::setfill('0') << time_per_message
308 << " Calls/sec: " << std::dec << std::setw(8) << std::setfill('0') << calls_per_sec
309 << " MiB/sec: " << std::dec << std::setw(8) << std::setfill('0') << mbyte_per_sec;
310 }
311
TEST(someip_payload_test,send_different_payloads)312 TEST(someip_payload_test, send_different_payloads)
313 {
314 payload_test_client test_client_(use_tcp, call_service_sync, sliding_window_size);
315 if (test_client_.init()) {
316 test_client_.start();
317 test_client_.join_sender_thread();
318 }
319 }
320
321
322 #ifndef _WIN32
main(int argc,char ** argv)323 int main(int argc, char** argv)
324 {
325 std::string tcp_enable("--tcp");
326 std::string udp_enable("--udp");
327 std::string sync_enable("--sync");
328 std::string async_enable("--async");
329 std::string sliding_window_size_param("--sliding-window-size");
330 std::string max_payload_size_param("--max-payload-size");
331 std::string shutdown_service_disable_param("--dont-shutdown-service");
332 std::string numbers_of_messages("--number-of-messages");
333 std::string help("--help");
334
335 int i = 1;
336 while (i < argc)
337 {
338 if(tcp_enable == argv[i])
339 {
340 use_tcp = true;
341 }
342 else if(udp_enable == argv[i])
343 {
344 use_tcp = false;
345 }
346 else if(sync_enable == argv[i])
347 {
348 call_service_sync = true;
349 }
350 else if(async_enable == argv[i])
351 {
352 call_service_sync = false;
353 }
354 else if(sliding_window_size_param == argv[i] && i + 1 < argc)
355 {
356 i++;
357 std::stringstream converter(argv[i]);
358 converter >> sliding_window_size;
359 }
360 else if(max_payload_size_param == argv[i] && i + 1 < argc)
361 {
362 i++;
363 if(std::string("UDS") == argv[i])
364 {
365 max_payload_size = payloadsize::UDS;
366 }
367 else if(std::string("TCP") == argv[i])
368 {
369 max_payload_size = payloadsize::TCP;
370 }
371 else if(std::string("UDP") == argv[i])
372 {
373 max_payload_size = payloadsize::UDP;
374 }
375 else {
376 max_payload_size = payloadsize::USER_SPECIFIED;
377 std::stringstream converter(argv[i]);
378 converter >> user_defined_max_payload;
379 }
380 }
381 else if (numbers_of_messages == argv[i]) {
382 i++;
383 std::stringstream converter(argv[i]);
384 converter >> number_of_messages_to_send;
385 }
386 else if(shutdown_service_disable_param == argv[i])
387 {
388 shutdown_service_at_end = false;
389 }
390 else if(help == argv[i])
391 {
392 VSOMEIP_INFO << "Parameters:\n"
393 << "--tcp: Send messages via TCP\n"
394 << "--udp: Send messages via UDP (default)\n"
395 << "--sync: Wait for acknowledge before sending next message (default)\n"
396 << "--async: Send multiple messages w/o waiting for"
397 " acknowledge of service\n"
398 << "--sliding-window-size: Number of messages to send before waiting "
399 "for acknowledge of service. Default: " << sliding_window_size << "\n"
400 << "--max-payload-size: limit the maximum payloadsize of send requests. One of {"
401 "UDS (=" << VSOMEIP_MAX_LOCAL_MESSAGE_SIZE << "byte), "
402 "UDP (=" << VSOMEIP_MAX_UDP_MESSAGE_SIZE << "byte), "
403 "TCP (=" << VSOMEIP_MAX_TCP_MESSAGE_SIZE << "byte)}, default: UDS\n"
404 << "--dont-shutdown-service: Don't shutdown the service upon "
405 "finishing of the payload test\n"
406 << "--number-of-messages: Number of messages to send per payload size iteration\n"
407 << "--help: print this help";
408 }
409 i++;
410 }
411
412 ::testing::InitGoogleTest(&argc, argv);
413 return RUN_ALL_TESTS();
414 }
415 #endif
416