1 // Copyright (C) 2015-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 
6 #include "payload_test_client.hpp"
7 
8 enum class payloadsize
9     : std::uint8_t
10     {
11         UDS, TCP, UDP, USER_SPECIFIED
12 };
13 
14 // this variables are changed via cmdline parameters
15 static bool use_tcp = false;
16 static bool call_service_sync = true;
17 static std::uint32_t sliding_window_size = vsomeip_test::NUMBER_OF_MESSAGES_TO_SEND_PAYLOAD_TESTS;
18 static payloadsize max_payload_size = payloadsize::UDS;
19 static bool shutdown_service_at_end = true;
20 static std::uint32_t user_defined_max_payload;
21 static std::uint32_t number_of_messages_to_send = 0;
22 
payload_test_client(bool _use_tcp,bool _call_service_sync,std::uint32_t _sliding_window_size)23 payload_test_client::payload_test_client(
24         bool _use_tcp,
25         bool _call_service_sync,
26         std::uint32_t _sliding_window_size) :
27                 app_(vsomeip::runtime::get()->create_application()),
28                 request_(vsomeip::runtime::get()->create_request(_use_tcp)),
29                 call_service_sync_(_call_service_sync),
30                 sliding_window_size_(_sliding_window_size),
31                 blocked_(false),
32                 is_available_(false),
33                 number_of_messages_to_send_(number_of_messages_to_send ? number_of_messages_to_send : vsomeip_test::NUMBER_OF_MESSAGES_TO_SEND_PAYLOAD_TESTS),
34                 number_of_sent_messages_(0),
35                 number_of_sent_messages_total_(0),
36                 number_of_acknowledged_messages_(0),
37                 current_payload_size_(1),
38                 all_msg_acknowledged_(false),
39                 sender_(std::bind(&payload_test_client::run, this))
40 {
41 }
42 
init()43 bool payload_test_client::init()
44 {
45     if (!app_->init()) {
46         ADD_FAILURE() << "Couldn't initialize application";
47         return false;
48     }
49 
50     app_->register_state_handler(
51             std::bind(&payload_test_client::on_state, this,
52                     std::placeholders::_1));
53 
54     app_->register_message_handler(vsomeip::ANY_SERVICE,
55             vsomeip_test::TEST_SERVICE_INSTANCE_ID, vsomeip::ANY_METHOD,
56             std::bind(&payload_test_client::on_message, this,
57                     std::placeholders::_1));
58 
59     app_->register_availability_handler(vsomeip_test::TEST_SERVICE_SERVICE_ID,
60             vsomeip_test::TEST_SERVICE_INSTANCE_ID,
61             std::bind(&payload_test_client::on_availability, this,
62                     std::placeholders::_1, std::placeholders::_2,
63                     std::placeholders::_3));
64     return true;
65 }
66 
start()67 void payload_test_client::start()
68 {
69     VSOMEIP_INFO << "Starting...";
70     app_->start();
71 }
72 
stop()73 void payload_test_client::stop()
74 {
75     VSOMEIP_INFO << "Stopping...";
76     // shutdown the service
77     if(shutdown_service_at_end)
78     {
79         shutdown_service();
80     }
81     app_->clear_all_handler();
82 }
83 
shutdown_service()84 void payload_test_client::shutdown_service()
85 {
86     request_->set_service(vsomeip_test::TEST_SERVICE_SERVICE_ID);
87     request_->set_instance(vsomeip_test::TEST_SERVICE_INSTANCE_ID);
88     request_->set_method(vsomeip_test::TEST_SERVICE_METHOD_ID_SHUTDOWN);
89     app_->send(request_);
90 }
91 
join_sender_thread()92 void payload_test_client::join_sender_thread()
93 {
94     sender_.join();
95 }
96 
on_state(vsomeip::state_type_e _state)97 void payload_test_client::on_state(vsomeip::state_type_e _state)
98 {
99     if(_state == vsomeip::state_type_e::ST_REGISTERED)
100     {
101         app_->request_service(vsomeip_test::TEST_SERVICE_SERVICE_ID,
102                 vsomeip_test::TEST_SERVICE_INSTANCE_ID, false);
103     }
104 }
105 
on_availability(vsomeip::service_t _service,vsomeip::instance_t _instance,bool _is_available)106 void payload_test_client::on_availability(vsomeip::service_t _service,
107         vsomeip::instance_t _instance, bool _is_available)
108 {
109     VSOMEIP_INFO << "Service [" << std::setw(4) << std::setfill('0') << std::hex
110             << _service << "." << _instance << "] is "
111             << (_is_available ? "available." : "NOT available.");
112 
113     if(vsomeip_test::TEST_SERVICE_SERVICE_ID == _service
114             && vsomeip_test::TEST_SERVICE_INSTANCE_ID == _instance)
115     {
116         if(is_available_ && !_is_available)
117         {
118             is_available_ = false;
119         }
120         else if(_is_available && !is_available_)
121         {
122             is_available_ = true;
123             send();
124         }
125     }
126 }
127 
on_message(const std::shared_ptr<vsomeip::message> & _response)128 void payload_test_client::on_message(const std::shared_ptr<vsomeip::message>& _response)
129 {
130     number_of_acknowledged_messages_++;
131 
132     ASSERT_EQ(_response->get_service(), vsomeip_test::TEST_SERVICE_SERVICE_ID);
133     ASSERT_EQ(_response->get_instance(), vsomeip_test::TEST_SERVICE_INSTANCE_ID);
134 
135     if(call_service_sync_)
136     {
137         // We notify the sender thread every time a message was acknowledged
138         {
139             std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
140             all_msg_acknowledged_ = true;
141         }
142         all_msg_acknowledged_cv_.notify_one();
143     }
144     else
145     {
146         // We notify the sender thread only if all sent messages have been acknowledged
147         if(number_of_acknowledged_messages_ == number_of_messages_to_send_)
148         {
149             std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
150             number_of_acknowledged_messages_ = 0;
151             all_msg_acknowledged_ = true;
152             all_msg_acknowledged_cv_.notify_one();
153         }
154         else if(number_of_acknowledged_messages_ % sliding_window_size_ == 0)
155         {
156             std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
157             all_msg_acknowledged_ = true;
158             all_msg_acknowledged_cv_.notify_one();
159         }
160     }
161 }
162 
send()163 void payload_test_client::send()
164 {
165     std::lock_guard<std::mutex> its_lock(mutex_);
166     blocked_ = true;
167     condition_.notify_one();
168 }
169 
run()170 void payload_test_client::run()
171 {
172     std::unique_lock<std::mutex> its_lock(mutex_);
173     while (!blocked_)
174     {
175         condition_.wait(its_lock);
176     }
177 
178     request_->set_service(vsomeip_test::TEST_SERVICE_SERVICE_ID);
179     request_->set_instance(vsomeip_test::TEST_SERVICE_INSTANCE_ID);
180     request_->set_method(vsomeip_test::TEST_SERVICE_METHOD_ID);
181 
182     // lock the mutex
183     std::unique_lock<std::mutex> lk(all_msg_acknowledged_mutex_);
184 
185     std::uint32_t max_allowed_payload = get_max_allowed_payload();
186 
187     std::shared_ptr<vsomeip::payload> payload = vsomeip::runtime::get()->create_payload();
188     std::vector<vsomeip::byte_t> payload_data;
189     bool reached_peak = false;
190     for(;;)
191     {
192         payload_data.assign(current_payload_size_ , vsomeip_test::PAYLOAD_TEST_DATA);
193         payload->set_data(payload_data);
194         request_->set_payload(payload);
195 
196         watch_.reset();
197         watch_.start();
198 
199         call_service_sync_ ? send_messages_sync(lk) : send_messages_async(lk);
200 
201         watch_.stop();
202         print_throughput();
203 
204         // Increase array size for next iteration
205         if(!reached_peak) {
206             current_payload_size_ *= 2;
207         } else {
208             current_payload_size_ /= 2;
209         }
210 
211         if(!reached_peak && current_payload_size_ > max_allowed_payload)
212         {
213             current_payload_size_ = max_allowed_payload;
214             reached_peak = true;
215         } else if(reached_peak && current_payload_size_ <= 1) {
216             break;
217         }
218     }
219     blocked_ = false;
220 
221     stop();
222     std::thread t1([](){ std::this_thread::sleep_for(std::chrono::microseconds(1000000 * 5));});
223     t1.join();
224     app_->stop();
225     std::thread t([](){ std::this_thread::sleep_for(std::chrono::microseconds(1000000 * 5));});
226     t.join();
227 }
228 
229 
get_max_allowed_payload()230 std::uint32_t payload_test_client::get_max_allowed_payload()
231 {
232     std::uint32_t payload;
233     switch (max_payload_size)
234     {
235         case payloadsize::UDS:
236             // TODO
237             payload = 1024 * 32 - 16;
238             break;
239         case payloadsize::TCP:
240             // TODO
241             payload = 4095 - 16;
242             break;
243         case payloadsize::UDP:
244             payload = VSOMEIP_MAX_UDP_MESSAGE_SIZE - 16;
245             break;
246         case payloadsize::USER_SPECIFIED:
247             payload = user_defined_max_payload;
248             break;
249         default:
250             payload = VSOMEIP_MAX_LOCAL_MESSAGE_SIZE;
251             break;
252     }
253     return payload;
254 }
255 
send_messages_sync(std::unique_lock<std::mutex> & lk)256 void payload_test_client::send_messages_sync(std::unique_lock<std::mutex>& lk)
257 {
258     for (number_of_sent_messages_ = 0;
259             number_of_sent_messages_ < number_of_messages_to_send_;
260             number_of_sent_messages_++, number_of_sent_messages_total_++)
261     {
262         app_->send(request_);
263         // wait until the send messages has been acknowledged
264         // as long we wait lk is released; after wait returns lk is reacquired
265         all_msg_acknowledged_cv_.wait(lk, [&]
266         {   return all_msg_acknowledged_;});
267         // Reset condition variable (lk is locked again here)
268         all_msg_acknowledged_ = false;
269     }
270 }
271 
send_messages_async(std::unique_lock<std::mutex> & lk)272 void payload_test_client::send_messages_async(std::unique_lock<std::mutex>& lk)
273 {
274     for (number_of_sent_messages_ = 0;
275             number_of_sent_messages_ < number_of_messages_to_send_;
276             number_of_sent_messages_++, number_of_sent_messages_total_++)
277     {
278         app_->send(request_);
279 
280         if((number_of_sent_messages_+1) % sliding_window_size_ == 0)
281         {
282             // wait until all send messages have been acknowledged
283             // as long we wait lk is released; after wait returns lk is reacquired
284             all_msg_acknowledged_cv_.wait(lk, [&]
285             {   return all_msg_acknowledged_;});
286 
287             // Reset condition variable
288             all_msg_acknowledged_ = false;
289         }
290     }
291 }
292 
print_throughput()293 void payload_test_client::print_throughput()
294 {
295     constexpr std::uint32_t usec_per_sec = 1000000;
296     stop_watch::usec_t time_needed = watch_.get_total_elapsed_microseconds();
297     stop_watch::usec_t time_per_message = time_needed / number_of_sent_messages_;
298     std::double_t calls_per_sec = number_of_sent_messages_
299             * (usec_per_sec / static_cast<double>(time_needed));
300     std::double_t mbyte_per_sec = ((number_of_sent_messages_
301             * current_payload_size_)
302             / (static_cast<double>(time_needed) / usec_per_sec)) / (1024*1024);
303 
304     VSOMEIP_INFO<< "[ Payload Test ] : :"
305     << "Payload size [byte]: " << std::dec << std::setw(8) << std::setfill('0') << current_payload_size_
306     << " Messages sent: " << std::dec << std::setw(8) << std::setfill('0') << number_of_sent_messages_
307     << " Meantime/message [usec]: " << std::dec << std::setw(8) << std::setfill('0') << time_per_message
308     << " Calls/sec: " << std::dec << std::setw(8) << std::setfill('0') << calls_per_sec
309     << " MiB/sec: " << std::dec << std::setw(8) << std::setfill('0') << mbyte_per_sec;
310 }
311 
TEST(someip_payload_test,send_different_payloads)312 TEST(someip_payload_test, send_different_payloads)
313 {
314     payload_test_client test_client_(use_tcp, call_service_sync, sliding_window_size);
315     if (test_client_.init()) {
316         test_client_.start();
317         test_client_.join_sender_thread();
318     }
319 }
320 
321 
322 #ifndef _WIN32
main(int argc,char ** argv)323 int main(int argc, char** argv)
324 {
325     std::string tcp_enable("--tcp");
326     std::string udp_enable("--udp");
327     std::string sync_enable("--sync");
328     std::string async_enable("--async");
329     std::string sliding_window_size_param("--sliding-window-size");
330     std::string max_payload_size_param("--max-payload-size");
331     std::string shutdown_service_disable_param("--dont-shutdown-service");
332     std::string numbers_of_messages("--number-of-messages");
333     std::string help("--help");
334 
335     int i = 1;
336     while (i < argc)
337     {
338         if(tcp_enable == argv[i])
339         {
340             use_tcp = true;
341         }
342         else if(udp_enable == argv[i])
343         {
344             use_tcp = false;
345         }
346         else if(sync_enable == argv[i])
347         {
348             call_service_sync = true;
349         }
350         else if(async_enable == argv[i])
351         {
352             call_service_sync = false;
353         }
354         else if(sliding_window_size_param == argv[i] && i + 1 < argc)
355         {
356             i++;
357             std::stringstream converter(argv[i]);
358             converter >> sliding_window_size;
359         }
360         else if(max_payload_size_param == argv[i] && i + 1 < argc)
361         {
362             i++;
363             if(std::string("UDS") == argv[i])
364             {
365                 max_payload_size = payloadsize::UDS;
366             }
367             else if(std::string("TCP") == argv[i])
368             {
369                 max_payload_size = payloadsize::TCP;
370             }
371             else if(std::string("UDP") == argv[i])
372             {
373                 max_payload_size = payloadsize::UDP;
374             }
375             else {
376                 max_payload_size = payloadsize::USER_SPECIFIED;
377                 std::stringstream converter(argv[i]);
378                 converter >> user_defined_max_payload;
379             }
380         }
381         else if (numbers_of_messages == argv[i]) {
382             i++;
383             std::stringstream converter(argv[i]);
384             converter >> number_of_messages_to_send;
385         }
386         else if(shutdown_service_disable_param == argv[i])
387         {
388             shutdown_service_at_end = false;
389         }
390         else if(help == argv[i])
391         {
392             VSOMEIP_INFO << "Parameters:\n"
393             << "--tcp: Send messages via TCP\n"
394             << "--udp: Send messages via UDP (default)\n"
395             << "--sync: Wait for acknowledge before sending next message (default)\n"
396             << "--async: Send multiple messages w/o waiting for"
397                 " acknowledge of service\n"
398             << "--sliding-window-size: Number of messages to send before waiting "
399                 "for acknowledge of service. Default: " << sliding_window_size << "\n"
400             << "--max-payload-size: limit the maximum payloadsize of send requests. One of {"
401                 "UDS (=" << VSOMEIP_MAX_LOCAL_MESSAGE_SIZE << "byte), "
402                 "UDP (=" << VSOMEIP_MAX_UDP_MESSAGE_SIZE << "byte), "
403                 "TCP (=" << VSOMEIP_MAX_TCP_MESSAGE_SIZE << "byte)}, default: UDS\n"
404             << "--dont-shutdown-service: Don't shutdown the service upon "
405                 "finishing of the payload test\n"
406             << "--number-of-messages: Number of messages to send per payload size iteration\n"
407             << "--help: print this help";
408         }
409         i++;
410     }
411 
412     ::testing::InitGoogleTest(&argc, argv);
413     return RUN_ALL_TESTS();
414 }
415 #endif
416