1 // Copyright (C) 2015-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 
6 #include <gtest/gtest.h>
7 
8 #include <vsomeip/vsomeip.hpp>
9 
10 #include <thread>
11 #include <mutex>
12 #include <condition_variable>
13 #include <functional>
14 #include <iomanip>
15 #include <numeric>
16 #include <cmath> // for isfinite
17 #include <atomic>
18 
19 #include "cpu_load_test_globals.hpp"
20 #include <vsomeip/internal/logger.hpp>
21 #include "cpu_load_measurer.hpp"
22 
23 // for getpid
24 #include <sys/types.h>
25 #include <unistd.h>
26 
27 
28 enum protocol_e {
29     PR_UNKNOWN,
30     PR_TCP,
31     PR_UDP
32 };
33 
34 class cpu_load_test_client
35 {
36 public:
cpu_load_test_client(protocol_e _protocol,std::uint32_t _number_of_calls,std::uint32_t _payload_size,bool _call_service_sync,bool _shutdown_service)37     cpu_load_test_client(protocol_e _protocol, std::uint32_t _number_of_calls,
38                         std::uint32_t _payload_size, bool _call_service_sync,
39                         bool _shutdown_service) :
40             protocol_(_protocol),
41             app_(vsomeip::runtime::get()->create_application("cpu_load_test_client")),
42             request_(vsomeip::runtime::get()->create_request(protocol_ == protocol_e::PR_TCP)),
43             call_service_sync_(_call_service_sync),
44             shutdown_service_at_end_(_shutdown_service),
45             sliding_window_size_(_number_of_calls),
46             wait_for_availability_(true),
47             is_available_(false),
48             number_of_calls_(_number_of_calls),
49             number_of_calls_current_(0),
50             number_of_sent_messages_(0),
51             number_of_sent_messages_total_(0),
52             number_of_acknowledged_messages_(0),
53             payload_size_(_payload_size),
54             wait_for_all_msg_acknowledged_(true),
55             initialized_(false),
56             sender_(std::bind(&cpu_load_test_client::run, this)) {
57         if (!app_->init()) {
58             ADD_FAILURE() << "Couldn't initialize application";
59             return;
60         }
61         initialized_ = true;
62         app_->register_state_handler(
63                 std::bind(&cpu_load_test_client::on_state, this,
64                         std::placeholders::_1));
65 
66         app_->register_message_handler(vsomeip::ANY_SERVICE,
67                 vsomeip::ANY_INSTANCE, vsomeip::ANY_METHOD,
68                 std::bind(&cpu_load_test_client::on_message, this,
69                         std::placeholders::_1));
70 
71         app_->register_availability_handler(cpu_load_test::service_id,
72                 cpu_load_test::instance_id,
73                 std::bind(&cpu_load_test_client::on_availability, this,
74                         std::placeholders::_1, std::placeholders::_2,
75                         std::placeholders::_3));
76         VSOMEIP_INFO << "Starting...";
77         app_->start();
78     }
79 
~cpu_load_test_client()80     ~cpu_load_test_client() {
81         {
82             std::lock_guard<std::mutex> its_lock(mutex_);
83             wait_for_availability_ = false;
84             condition_.notify_one();
85         }
86         {
87             std::lock_guard<std::mutex> its_lock(all_msg_acknowledged_mutex_);
88             wait_for_all_msg_acknowledged_ = false;
89             all_msg_acknowledged_cv_.notify_one();
90         }
91         sender_.join();
92     }
93 
94 private:
stop()95     void stop() {
96         VSOMEIP_INFO << "Stopping...";
97         // shutdown the service
98         if(shutdown_service_at_end_)
99         {
100             shutdown_service();
101         }
102         app_->clear_all_handler();
103     }
104 
on_state(vsomeip::state_type_e _state)105     void on_state(vsomeip::state_type_e _state) {
106         if(_state == vsomeip::state_type_e::ST_REGISTERED)
107         {
108             app_->request_service(cpu_load_test::service_id,
109                     cpu_load_test::instance_id);
110         }
111     }
112 
on_availability(vsomeip::service_t _service,vsomeip::instance_t _instance,bool _is_available)113     void on_availability(vsomeip::service_t _service,
114                          vsomeip::instance_t _instance, bool _is_available) {
115         VSOMEIP_INFO << "Service [" << std::setw(4) << std::setfill('0')
116                 << std::hex << _service << "." << _instance << "] is "
117                 << (_is_available ? "available." : "NOT available.");
118 
119         if (cpu_load_test::service_id == _service
120                 && cpu_load_test::instance_id == _instance) {
121             if (is_available_ && !_is_available) {
122                 is_available_ = false;
123             } else if (_is_available && !is_available_) {
124                 is_available_ = true;
125                 std::lock_guard<std::mutex> its_lock(mutex_);
126                 wait_for_availability_ = false;
127                 condition_.notify_one();
128             }
129         }
130     }
on_message(const std::shared_ptr<vsomeip::message> & _response)131     void on_message(const std::shared_ptr<vsomeip::message> &_response) {
132 
133         number_of_acknowledged_messages_++;
134         ASSERT_EQ(_response->get_service(), cpu_load_test::service_id);
135         ASSERT_EQ(_response->get_method(), cpu_load_test::method_id);
136         if(call_service_sync_)
137         {
138             // We notify the sender thread every time a message was acknowledged
139             std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
140             wait_for_all_msg_acknowledged_ = false;
141             all_msg_acknowledged_cv_.notify_one();
142         }
143         else
144         {
145             // We notify the sender thread only if all sent messages have been acknowledged
146             if(number_of_acknowledged_messages_ == number_of_calls_current_)
147             {
148                 std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
149                 number_of_acknowledged_messages_ = 0;
150                 wait_for_all_msg_acknowledged_ = false;
151                 all_msg_acknowledged_cv_.notify_one();
152             }
153             else if(number_of_acknowledged_messages_ % sliding_window_size_ == 0)
154             {
155                 std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
156                 wait_for_all_msg_acknowledged_ = false;
157                 all_msg_acknowledged_cv_.notify_one();
158             }
159         }
160     }
161 
run()162     void run() {
163         std::unique_lock<std::mutex> its_lock(mutex_);
164         while (wait_for_availability_) {
165             condition_.wait(its_lock);
166         }
167 
168         request_->set_service(cpu_load_test::service_id);
169         request_->set_instance(cpu_load_test::instance_id);
170         request_->set_method(cpu_load_test::method_id);
171         std::shared_ptr<vsomeip::payload> payload = vsomeip::runtime::get()->create_payload();
172         std::vector<vsomeip::byte_t> payload_data;
173         payload_data.assign(payload_size_, cpu_load_test::load_test_data);
174         payload->set_data(payload_data);
175         request_->set_payload(payload);
176 
177         // lock the mutex
178         for(std::uint32_t i=0; i <= number_of_calls_; i++) {
179             number_of_calls_current_ = i;
180             sliding_window_size_ = i;
181             std::unique_lock<std::mutex> lk(all_msg_acknowledged_mutex_);
182             call_service_sync_ ? send_messages_sync(lk, i) : send_messages_async(lk, i);
183         }
184         const double average_load(std::accumulate(results_.begin(), results_.end(), 0.0) / static_cast<double>(results_.size()));
185         VSOMEIP_INFO << "Sent: " << number_of_sent_messages_total_
186             << " messages in total (excluding control messages). This caused: "
187             << std::fixed << std::setprecision(2)
188             << average_load << "% load in average (average of "
189             << results_.size() << " measurements).";
190 
191         std::vector<double> results_no_zero;
192         for(const auto &v : results_) {
193             if(v > 0.0) {
194                 results_no_zero.push_back(v);
195             }
196         }
197         const double average_load_no_zero(std::accumulate(results_no_zero.begin(), results_no_zero.end(), 0.0) / static_cast<double>(results_no_zero.size()));
198         VSOMEIP_INFO << "Sent: " << number_of_sent_messages_total_
199             << " messages in total (excluding control messages). This caused: "
200             << std::fixed << std::setprecision(2)
201             << average_load_no_zero << "% load in average, if measured "
202             << "cpu load was greater zero (average of "
203             << results_no_zero.size() << " measurements).";
204 
205         wait_for_availability_ = true;
206 
207         stop();
208         if (initialized_) {
209             app_->stop();
210         }
211     }
212 
213 
send_messages_sync(std::unique_lock<std::mutex> & lk,std::uint32_t _messages_to_send)214     void send_messages_sync(std::unique_lock<std::mutex>& lk, std::uint32_t _messages_to_send) {
215         cpu_load_measurer c(static_cast<std::uint32_t>(::getpid()));
216         send_service_start_measuring(true);
217         c.start();
218         for (number_of_sent_messages_ = 0;
219                 number_of_sent_messages_ < _messages_to_send;
220                 number_of_sent_messages_++, number_of_sent_messages_total_++)
221         {
222             app_->send(request_);
223             // wait until the send messages has been acknowledged
224             while(wait_for_all_msg_acknowledged_) {
225                 all_msg_acknowledged_cv_.wait(lk);
226             }
227             wait_for_all_msg_acknowledged_ = true;
228         }
229         c.stop();
230         send_service_start_measuring(false);
231         VSOMEIP_DEBUG << "Synchronously sent " << std::setw(4) << std::setfill('0')
232             << number_of_sent_messages_ << " messages. CPU load [%]: "
233             << std::fixed << std::setprecision(2)
234             << (std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0);
235         results_.push_back(std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0);
236 
237     }
238 
send_messages_async(std::unique_lock<std::mutex> & lk,std::uint32_t _messages_to_send)239     void send_messages_async(std::unique_lock<std::mutex>& lk, std::uint32_t _messages_to_send) {
240         cpu_load_measurer c(static_cast<std::uint32_t>(::getpid()));
241         send_service_start_measuring(true);
242         c.start();
243         for (number_of_sent_messages_ = 0;
244                 number_of_sent_messages_ < _messages_to_send;
245                 number_of_sent_messages_++, number_of_sent_messages_total_++)
246         {
247             app_->send(request_);
248             if((number_of_sent_messages_+1) % sliding_window_size_ == 0)
249             {
250                 // wait until all send messages have been acknowledged
251                 while(wait_for_all_msg_acknowledged_) {
252                     all_msg_acknowledged_cv_.wait(lk);
253                 }
254                 wait_for_all_msg_acknowledged_ = true;
255             }
256         }
257         c.stop();
258         send_service_start_measuring(false);
259         VSOMEIP_DEBUG << "Asynchronously sent " << std::setw(4) << std::setfill('0')
260             << number_of_sent_messages_ << " messages. CPU load [%]: "
261             << std::fixed << std::setprecision(2)
262             << (std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0);
263         results_.push_back(std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0);
264     }
265 
send_service_start_measuring(bool _start_measuring)266     void send_service_start_measuring(bool _start_measuring) {
267         std::shared_ptr<vsomeip::message> m = vsomeip::runtime::get()->create_request(protocol_ == protocol_e::PR_TCP);
268         m->set_service(cpu_load_test::service_id);
269         m->set_instance(cpu_load_test::instance_id);
270         _start_measuring ? m->set_method(cpu_load_test::method_id_cpu_measure_start) : m->set_method(cpu_load_test::method_id_cpu_measure_stop);
271         app_->send(m);
272     }
273 
shutdown_service()274     void shutdown_service() {
275         request_->set_service(cpu_load_test::service_id);
276         request_->set_instance(cpu_load_test::instance_id);
277         request_->set_method(cpu_load_test::method_id_shutdown);
278         app_->send(request_);
279     }
280 
281 private:
282     protocol_e protocol_;
283     std::shared_ptr<vsomeip::application> app_;
284     std::shared_ptr<vsomeip::message> request_;
285     bool call_service_sync_;
286     bool shutdown_service_at_end_;
287     std::uint32_t sliding_window_size_;
288     std::mutex mutex_;
289     std::condition_variable condition_;
290     bool wait_for_availability_;
291     bool is_available_;
292     const std::uint32_t number_of_calls_;
293     std::uint32_t number_of_calls_current_;
294     std::uint32_t number_of_sent_messages_;
295     std::uint32_t number_of_sent_messages_total_;
296     std::uint32_t number_of_acknowledged_messages_;
297 
298     std::uint32_t payload_size_;
299 
300     bool wait_for_all_msg_acknowledged_;
301     std::mutex all_msg_acknowledged_mutex_;
302     std::condition_variable all_msg_acknowledged_cv_;
303     std::vector<double> results_;
304     std::atomic<bool> initialized_;
305     std::thread sender_;
306 };
307 
308 
309 // this variables are changed via cmdline parameters
310 static protocol_e protocol(protocol_e::PR_UNKNOWN);
311 static std::uint32_t number_of_calls(0);
312 static std::uint32_t payload_size(40);
313 static bool call_service_sync(true);
314 static bool shutdown_service(true);
315 
316 
TEST(someip_load_test,DISABLED_send_messages_and_measure_cpu_load)317 TEST(someip_load_test, DISABLED_send_messages_and_measure_cpu_load)
318 {
319     cpu_load_test_client test_client_(protocol, number_of_calls, payload_size, call_service_sync, shutdown_service);
320 }
321 
322 #ifndef _WIN32
main(int argc,char ** argv)323 int main(int argc, char** argv)
324 {
325     int i = 0;
326     while (i < argc) {
327         if(std::string("--protocol") == std::string(argv[i])
328         || std::string("-p") == std::string(argv[i])) {
329             if(std::string("udp") == std::string(argv[i+1]) ||
330                     std::string("UDP") == std::string(argv[i+1])) {
331                 protocol = protocol_e::PR_UDP;
332                 i++;
333             } else if(std::string("tcp") == std::string(argv[i+1]) ||
334                     std::string("TCP") == std::string(argv[i+1])) {
335                 protocol = protocol_e::PR_TCP;
336                 i++;
337             }
338         } else if(std::string("--calls") == std::string(argv[i])
339         || std::string("-c") == std::string(argv[i])) {
340             try {
341                 number_of_calls = static_cast<std::uint32_t>(std::stoul(std::string(argv[i+1]), nullptr, 10));
342             } catch (const std::exception &e) {
343                 std::cerr << "Please specify a valid value for number of calls" << std::endl;
344                 return(EXIT_FAILURE);
345             }
346             i++;
347         } else if(std::string("--mode") == std::string(argv[i])
348         || std::string("-m") == std::string(argv[i])) {
349             if(std::string("sync") == std::string(argv[i+1]) ||
350                     std::string("SYNC") == std::string(argv[i+1])) {
351                 call_service_sync = true;
352                 i++;
353             } else if(std::string("async") == std::string(argv[i+1]) ||
354                     std::string("ASYNC") == std::string(argv[i+1])) {
355                 call_service_sync = false;
356                 i++;
357             }
358         } else if(std::string("--payload-size") == std::string(argv[i])
359         || std::string("-pl") == std::string(argv[i])) {
360             try {
361                 payload_size = static_cast<std::uint32_t>(std::stoul(std::string(argv[i+1]), nullptr, 10));
362             } catch (const std::exception &e) {
363                 std::cerr << "Please specify a valid values for payload size" << std::endl;
364                 return(EXIT_FAILURE);
365             }
366             i++;
367         } else if(std::string("--help") == std::string(argv[i])
368         || std::string("-h") == std::string(argv[i])) {
369             std::cout << "Available options:" << std::endl;
370             std::cout << "--protocol|-p: valid values TCP or UDP" << std::endl;
371             std::cout << "--calls|-c: number of message calls to do" << std::endl;
372             std::cout << "--mode|-m: mode sync or async" << std::endl;
373             std::cout << "--payload-size|-pl: payload size in Bytes default: 40" << std::endl;
374         }
375         i++;
376     }
377 
378     if(protocol == protocol_e::PR_UNKNOWN) {
379         std::cerr << "Please specify valid protocol mode, see --help" << std::endl;
380         return(EXIT_FAILURE);
381     }
382     if(!number_of_calls) {
383         std::cerr << "Please specify valid number of calls, see --help" << std::endl;
384         return(EXIT_FAILURE);
385     }
386 
387     ::testing::InitGoogleTest(&argc, argv);
388     return RUN_ALL_TESTS();
389 }
390 #endif
391