1 // Copyright (C) 2015-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <gtest/gtest.h>
7
8 #include <vsomeip/vsomeip.hpp>
9
10 #include <thread>
11 #include <mutex>
12 #include <condition_variable>
13 #include <functional>
14 #include <iomanip>
15 #include <numeric>
16 #include <cmath> // for isfinite
17 #include <atomic>
18
19 #include "cpu_load_test_globals.hpp"
20 #include <vsomeip/internal/logger.hpp>
21 #include "cpu_load_measurer.hpp"
22
23 // for getpid
24 #include <sys/types.h>
25 #include <unistd.h>
26
27
28 enum protocol_e {
29 PR_UNKNOWN,
30 PR_TCP,
31 PR_UDP
32 };
33
34 class cpu_load_test_client
35 {
36 public:
cpu_load_test_client(protocol_e _protocol,std::uint32_t _number_of_calls,std::uint32_t _payload_size,bool _call_service_sync,bool _shutdown_service)37 cpu_load_test_client(protocol_e _protocol, std::uint32_t _number_of_calls,
38 std::uint32_t _payload_size, bool _call_service_sync,
39 bool _shutdown_service) :
40 protocol_(_protocol),
41 app_(vsomeip::runtime::get()->create_application("cpu_load_test_client")),
42 request_(vsomeip::runtime::get()->create_request(protocol_ == protocol_e::PR_TCP)),
43 call_service_sync_(_call_service_sync),
44 shutdown_service_at_end_(_shutdown_service),
45 sliding_window_size_(_number_of_calls),
46 wait_for_availability_(true),
47 is_available_(false),
48 number_of_calls_(_number_of_calls),
49 number_of_calls_current_(0),
50 number_of_sent_messages_(0),
51 number_of_sent_messages_total_(0),
52 number_of_acknowledged_messages_(0),
53 payload_size_(_payload_size),
54 wait_for_all_msg_acknowledged_(true),
55 initialized_(false),
56 sender_(std::bind(&cpu_load_test_client::run, this)) {
57 if (!app_->init()) {
58 ADD_FAILURE() << "Couldn't initialize application";
59 return;
60 }
61 initialized_ = true;
62 app_->register_state_handler(
63 std::bind(&cpu_load_test_client::on_state, this,
64 std::placeholders::_1));
65
66 app_->register_message_handler(vsomeip::ANY_SERVICE,
67 vsomeip::ANY_INSTANCE, vsomeip::ANY_METHOD,
68 std::bind(&cpu_load_test_client::on_message, this,
69 std::placeholders::_1));
70
71 app_->register_availability_handler(cpu_load_test::service_id,
72 cpu_load_test::instance_id,
73 std::bind(&cpu_load_test_client::on_availability, this,
74 std::placeholders::_1, std::placeholders::_2,
75 std::placeholders::_3));
76 VSOMEIP_INFO << "Starting...";
77 app_->start();
78 }
79
~cpu_load_test_client()80 ~cpu_load_test_client() {
81 {
82 std::lock_guard<std::mutex> its_lock(mutex_);
83 wait_for_availability_ = false;
84 condition_.notify_one();
85 }
86 {
87 std::lock_guard<std::mutex> its_lock(all_msg_acknowledged_mutex_);
88 wait_for_all_msg_acknowledged_ = false;
89 all_msg_acknowledged_cv_.notify_one();
90 }
91 sender_.join();
92 }
93
94 private:
stop()95 void stop() {
96 VSOMEIP_INFO << "Stopping...";
97 // shutdown the service
98 if(shutdown_service_at_end_)
99 {
100 shutdown_service();
101 }
102 app_->clear_all_handler();
103 }
104
on_state(vsomeip::state_type_e _state)105 void on_state(vsomeip::state_type_e _state) {
106 if(_state == vsomeip::state_type_e::ST_REGISTERED)
107 {
108 app_->request_service(cpu_load_test::service_id,
109 cpu_load_test::instance_id);
110 }
111 }
112
on_availability(vsomeip::service_t _service,vsomeip::instance_t _instance,bool _is_available)113 void on_availability(vsomeip::service_t _service,
114 vsomeip::instance_t _instance, bool _is_available) {
115 VSOMEIP_INFO << "Service [" << std::setw(4) << std::setfill('0')
116 << std::hex << _service << "." << _instance << "] is "
117 << (_is_available ? "available." : "NOT available.");
118
119 if (cpu_load_test::service_id == _service
120 && cpu_load_test::instance_id == _instance) {
121 if (is_available_ && !_is_available) {
122 is_available_ = false;
123 } else if (_is_available && !is_available_) {
124 is_available_ = true;
125 std::lock_guard<std::mutex> its_lock(mutex_);
126 wait_for_availability_ = false;
127 condition_.notify_one();
128 }
129 }
130 }
on_message(const std::shared_ptr<vsomeip::message> & _response)131 void on_message(const std::shared_ptr<vsomeip::message> &_response) {
132
133 number_of_acknowledged_messages_++;
134 ASSERT_EQ(_response->get_service(), cpu_load_test::service_id);
135 ASSERT_EQ(_response->get_method(), cpu_load_test::method_id);
136 if(call_service_sync_)
137 {
138 // We notify the sender thread every time a message was acknowledged
139 std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
140 wait_for_all_msg_acknowledged_ = false;
141 all_msg_acknowledged_cv_.notify_one();
142 }
143 else
144 {
145 // We notify the sender thread only if all sent messages have been acknowledged
146 if(number_of_acknowledged_messages_ == number_of_calls_current_)
147 {
148 std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
149 number_of_acknowledged_messages_ = 0;
150 wait_for_all_msg_acknowledged_ = false;
151 all_msg_acknowledged_cv_.notify_one();
152 }
153 else if(number_of_acknowledged_messages_ % sliding_window_size_ == 0)
154 {
155 std::lock_guard<std::mutex> lk(all_msg_acknowledged_mutex_);
156 wait_for_all_msg_acknowledged_ = false;
157 all_msg_acknowledged_cv_.notify_one();
158 }
159 }
160 }
161
run()162 void run() {
163 std::unique_lock<std::mutex> its_lock(mutex_);
164 while (wait_for_availability_) {
165 condition_.wait(its_lock);
166 }
167
168 request_->set_service(cpu_load_test::service_id);
169 request_->set_instance(cpu_load_test::instance_id);
170 request_->set_method(cpu_load_test::method_id);
171 std::shared_ptr<vsomeip::payload> payload = vsomeip::runtime::get()->create_payload();
172 std::vector<vsomeip::byte_t> payload_data;
173 payload_data.assign(payload_size_, cpu_load_test::load_test_data);
174 payload->set_data(payload_data);
175 request_->set_payload(payload);
176
177 // lock the mutex
178 for(std::uint32_t i=0; i <= number_of_calls_; i++) {
179 number_of_calls_current_ = i;
180 sliding_window_size_ = i;
181 std::unique_lock<std::mutex> lk(all_msg_acknowledged_mutex_);
182 call_service_sync_ ? send_messages_sync(lk, i) : send_messages_async(lk, i);
183 }
184 const double average_load(std::accumulate(results_.begin(), results_.end(), 0.0) / static_cast<double>(results_.size()));
185 VSOMEIP_INFO << "Sent: " << number_of_sent_messages_total_
186 << " messages in total (excluding control messages). This caused: "
187 << std::fixed << std::setprecision(2)
188 << average_load << "% load in average (average of "
189 << results_.size() << " measurements).";
190
191 std::vector<double> results_no_zero;
192 for(const auto &v : results_) {
193 if(v > 0.0) {
194 results_no_zero.push_back(v);
195 }
196 }
197 const double average_load_no_zero(std::accumulate(results_no_zero.begin(), results_no_zero.end(), 0.0) / static_cast<double>(results_no_zero.size()));
198 VSOMEIP_INFO << "Sent: " << number_of_sent_messages_total_
199 << " messages in total (excluding control messages). This caused: "
200 << std::fixed << std::setprecision(2)
201 << average_load_no_zero << "% load in average, if measured "
202 << "cpu load was greater zero (average of "
203 << results_no_zero.size() << " measurements).";
204
205 wait_for_availability_ = true;
206
207 stop();
208 if (initialized_) {
209 app_->stop();
210 }
211 }
212
213
send_messages_sync(std::unique_lock<std::mutex> & lk,std::uint32_t _messages_to_send)214 void send_messages_sync(std::unique_lock<std::mutex>& lk, std::uint32_t _messages_to_send) {
215 cpu_load_measurer c(static_cast<std::uint32_t>(::getpid()));
216 send_service_start_measuring(true);
217 c.start();
218 for (number_of_sent_messages_ = 0;
219 number_of_sent_messages_ < _messages_to_send;
220 number_of_sent_messages_++, number_of_sent_messages_total_++)
221 {
222 app_->send(request_);
223 // wait until the send messages has been acknowledged
224 while(wait_for_all_msg_acknowledged_) {
225 all_msg_acknowledged_cv_.wait(lk);
226 }
227 wait_for_all_msg_acknowledged_ = true;
228 }
229 c.stop();
230 send_service_start_measuring(false);
231 VSOMEIP_DEBUG << "Synchronously sent " << std::setw(4) << std::setfill('0')
232 << number_of_sent_messages_ << " messages. CPU load [%]: "
233 << std::fixed << std::setprecision(2)
234 << (std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0);
235 results_.push_back(std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0);
236
237 }
238
send_messages_async(std::unique_lock<std::mutex> & lk,std::uint32_t _messages_to_send)239 void send_messages_async(std::unique_lock<std::mutex>& lk, std::uint32_t _messages_to_send) {
240 cpu_load_measurer c(static_cast<std::uint32_t>(::getpid()));
241 send_service_start_measuring(true);
242 c.start();
243 for (number_of_sent_messages_ = 0;
244 number_of_sent_messages_ < _messages_to_send;
245 number_of_sent_messages_++, number_of_sent_messages_total_++)
246 {
247 app_->send(request_);
248 if((number_of_sent_messages_+1) % sliding_window_size_ == 0)
249 {
250 // wait until all send messages have been acknowledged
251 while(wait_for_all_msg_acknowledged_) {
252 all_msg_acknowledged_cv_.wait(lk);
253 }
254 wait_for_all_msg_acknowledged_ = true;
255 }
256 }
257 c.stop();
258 send_service_start_measuring(false);
259 VSOMEIP_DEBUG << "Asynchronously sent " << std::setw(4) << std::setfill('0')
260 << number_of_sent_messages_ << " messages. CPU load [%]: "
261 << std::fixed << std::setprecision(2)
262 << (std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0);
263 results_.push_back(std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0);
264 }
265
send_service_start_measuring(bool _start_measuring)266 void send_service_start_measuring(bool _start_measuring) {
267 std::shared_ptr<vsomeip::message> m = vsomeip::runtime::get()->create_request(protocol_ == protocol_e::PR_TCP);
268 m->set_service(cpu_load_test::service_id);
269 m->set_instance(cpu_load_test::instance_id);
270 _start_measuring ? m->set_method(cpu_load_test::method_id_cpu_measure_start) : m->set_method(cpu_load_test::method_id_cpu_measure_stop);
271 app_->send(m);
272 }
273
shutdown_service()274 void shutdown_service() {
275 request_->set_service(cpu_load_test::service_id);
276 request_->set_instance(cpu_load_test::instance_id);
277 request_->set_method(cpu_load_test::method_id_shutdown);
278 app_->send(request_);
279 }
280
281 private:
282 protocol_e protocol_;
283 std::shared_ptr<vsomeip::application> app_;
284 std::shared_ptr<vsomeip::message> request_;
285 bool call_service_sync_;
286 bool shutdown_service_at_end_;
287 std::uint32_t sliding_window_size_;
288 std::mutex mutex_;
289 std::condition_variable condition_;
290 bool wait_for_availability_;
291 bool is_available_;
292 const std::uint32_t number_of_calls_;
293 std::uint32_t number_of_calls_current_;
294 std::uint32_t number_of_sent_messages_;
295 std::uint32_t number_of_sent_messages_total_;
296 std::uint32_t number_of_acknowledged_messages_;
297
298 std::uint32_t payload_size_;
299
300 bool wait_for_all_msg_acknowledged_;
301 std::mutex all_msg_acknowledged_mutex_;
302 std::condition_variable all_msg_acknowledged_cv_;
303 std::vector<double> results_;
304 std::atomic<bool> initialized_;
305 std::thread sender_;
306 };
307
308
309 // this variables are changed via cmdline parameters
310 static protocol_e protocol(protocol_e::PR_UNKNOWN);
311 static std::uint32_t number_of_calls(0);
312 static std::uint32_t payload_size(40);
313 static bool call_service_sync(true);
314 static bool shutdown_service(true);
315
316
TEST(someip_load_test,DISABLED_send_messages_and_measure_cpu_load)317 TEST(someip_load_test, DISABLED_send_messages_and_measure_cpu_load)
318 {
319 cpu_load_test_client test_client_(protocol, number_of_calls, payload_size, call_service_sync, shutdown_service);
320 }
321
322 #ifndef _WIN32
main(int argc,char ** argv)323 int main(int argc, char** argv)
324 {
325 int i = 0;
326 while (i < argc) {
327 if(std::string("--protocol") == std::string(argv[i])
328 || std::string("-p") == std::string(argv[i])) {
329 if(std::string("udp") == std::string(argv[i+1]) ||
330 std::string("UDP") == std::string(argv[i+1])) {
331 protocol = protocol_e::PR_UDP;
332 i++;
333 } else if(std::string("tcp") == std::string(argv[i+1]) ||
334 std::string("TCP") == std::string(argv[i+1])) {
335 protocol = protocol_e::PR_TCP;
336 i++;
337 }
338 } else if(std::string("--calls") == std::string(argv[i])
339 || std::string("-c") == std::string(argv[i])) {
340 try {
341 number_of_calls = static_cast<std::uint32_t>(std::stoul(std::string(argv[i+1]), nullptr, 10));
342 } catch (const std::exception &e) {
343 std::cerr << "Please specify a valid value for number of calls" << std::endl;
344 return(EXIT_FAILURE);
345 }
346 i++;
347 } else if(std::string("--mode") == std::string(argv[i])
348 || std::string("-m") == std::string(argv[i])) {
349 if(std::string("sync") == std::string(argv[i+1]) ||
350 std::string("SYNC") == std::string(argv[i+1])) {
351 call_service_sync = true;
352 i++;
353 } else if(std::string("async") == std::string(argv[i+1]) ||
354 std::string("ASYNC") == std::string(argv[i+1])) {
355 call_service_sync = false;
356 i++;
357 }
358 } else if(std::string("--payload-size") == std::string(argv[i])
359 || std::string("-pl") == std::string(argv[i])) {
360 try {
361 payload_size = static_cast<std::uint32_t>(std::stoul(std::string(argv[i+1]), nullptr, 10));
362 } catch (const std::exception &e) {
363 std::cerr << "Please specify a valid values for payload size" << std::endl;
364 return(EXIT_FAILURE);
365 }
366 i++;
367 } else if(std::string("--help") == std::string(argv[i])
368 || std::string("-h") == std::string(argv[i])) {
369 std::cout << "Available options:" << std::endl;
370 std::cout << "--protocol|-p: valid values TCP or UDP" << std::endl;
371 std::cout << "--calls|-c: number of message calls to do" << std::endl;
372 std::cout << "--mode|-m: mode sync or async" << std::endl;
373 std::cout << "--payload-size|-pl: payload size in Bytes default: 40" << std::endl;
374 }
375 i++;
376 }
377
378 if(protocol == protocol_e::PR_UNKNOWN) {
379 std::cerr << "Please specify valid protocol mode, see --help" << std::endl;
380 return(EXIT_FAILURE);
381 }
382 if(!number_of_calls) {
383 std::cerr << "Please specify valid number of calls, see --help" << std::endl;
384 return(EXIT_FAILURE);
385 }
386
387 ::testing::InitGoogleTest(&argc, argv);
388 return RUN_ALL_TESTS();
389 }
390 #endif
391