1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/ipc/consumer/consumer_ipc_client_impl.h"
18
19 #include <string.h>
20
21 #include <cinttypes>
22
23 #include "perfetto/base/task_runner.h"
24 #include "perfetto/ext/ipc/client.h"
25 #include "perfetto/ext/tracing/core/consumer.h"
26 #include "perfetto/ext/tracing/core/observable_events.h"
27 #include "perfetto/ext/tracing/core/trace_stats.h"
28 #include "perfetto/tracing/core/trace_config.h"
29 #include "perfetto/tracing/core/tracing_service_state.h"
30
31 // TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl
32 // gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the
33 // Consumer* during the callbacks.
34
35 namespace perfetto {
36
37 // static. (Declared in include/tracing/ipc/consumer_ipc_client.h).
Connect(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)38 std::unique_ptr<TracingService::ConsumerEndpoint> ConsumerIPCClient::Connect(
39 const char* service_sock_name,
40 Consumer* consumer,
41 base::TaskRunner* task_runner) {
42 return std::unique_ptr<TracingService::ConsumerEndpoint>(
43 new ConsumerIPCClientImpl(service_sock_name, consumer, task_runner));
44 }
45
ConsumerIPCClientImpl(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)46 ConsumerIPCClientImpl::ConsumerIPCClientImpl(const char* service_sock_name,
47 Consumer* consumer,
48 base::TaskRunner* task_runner)
49 : consumer_(consumer),
50 ipc_channel_(
51 ipc::Client::CreateInstance({service_sock_name, /*sock_retry=*/false},
52 task_runner)),
53 consumer_port_(this /* event_listener */),
54 weak_ptr_factory_(this) {
55 ipc_channel_->BindService(consumer_port_.GetWeakPtr());
56 }
57
58 ConsumerIPCClientImpl::~ConsumerIPCClientImpl() = default;
59
60 // Called by the IPC layer if the BindService() succeeds.
OnConnect()61 void ConsumerIPCClientImpl::OnConnect() {
62 connected_ = true;
63 consumer_->OnConnect();
64 }
65
OnDisconnect()66 void ConsumerIPCClientImpl::OnDisconnect() {
67 PERFETTO_DLOG("Tracing service connection failure");
68 connected_ = false;
69 consumer_->OnDisconnect(); // Note: may delete |this|.
70 }
71
EnableTracing(const TraceConfig & trace_config,base::ScopedFile fd)72 void ConsumerIPCClientImpl::EnableTracing(const TraceConfig& trace_config,
73 base::ScopedFile fd) {
74 if (!connected_) {
75 PERFETTO_DLOG("Cannot EnableTracing(), not connected to tracing service");
76 return;
77 }
78
79 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
80 if (fd) {
81 consumer_->OnTracingDisabled(
82 "Passing FDs for write_into_file is not supported on Windows");
83 return;
84 }
85 #endif
86
87 protos::gen::EnableTracingRequest req;
88 *req.mutable_trace_config() = trace_config;
89 ipc::Deferred<protos::gen::EnableTracingResponse> async_response;
90 auto weak_this = weak_ptr_factory_.GetWeakPtr();
91 async_response.Bind(
92 [weak_this](
93 ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
94 if (weak_this)
95 weak_this->OnEnableTracingResponse(std::move(response));
96 });
97
98 // |fd| will be closed when this function returns, but it's fine because the
99 // IPC layer dup()'s it when sending the IPC.
100 consumer_port_.EnableTracing(req, std::move(async_response), *fd);
101 }
102
ChangeTraceConfig(const TraceConfig & trace_config)103 void ConsumerIPCClientImpl::ChangeTraceConfig(const TraceConfig& trace_config) {
104 if (!connected_) {
105 PERFETTO_DLOG(
106 "Cannot ChangeTraceConfig(), not connected to tracing service");
107 return;
108 }
109
110 ipc::Deferred<protos::gen::ChangeTraceConfigResponse> async_response;
111 async_response.Bind(
112 [](ipc::AsyncResult<protos::gen::ChangeTraceConfigResponse> response) {
113 if (!response)
114 PERFETTO_DLOG("ChangeTraceConfig() failed");
115 });
116 protos::gen::ChangeTraceConfigRequest req;
117 *req.mutable_trace_config() = trace_config;
118 consumer_port_.ChangeTraceConfig(req, std::move(async_response));
119 }
120
StartTracing()121 void ConsumerIPCClientImpl::StartTracing() {
122 if (!connected_) {
123 PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service");
124 return;
125 }
126
127 ipc::Deferred<protos::gen::StartTracingResponse> async_response;
128 async_response.Bind(
129 [](ipc::AsyncResult<protos::gen::StartTracingResponse> response) {
130 if (!response)
131 PERFETTO_DLOG("StartTracing() failed");
132 });
133 protos::gen::StartTracingRequest req;
134 consumer_port_.StartTracing(req, std::move(async_response));
135 }
136
DisableTracing()137 void ConsumerIPCClientImpl::DisableTracing() {
138 if (!connected_) {
139 PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service");
140 return;
141 }
142
143 ipc::Deferred<protos::gen::DisableTracingResponse> async_response;
144 async_response.Bind(
145 [](ipc::AsyncResult<protos::gen::DisableTracingResponse> response) {
146 if (!response)
147 PERFETTO_DLOG("DisableTracing() failed");
148 });
149 consumer_port_.DisableTracing(protos::gen::DisableTracingRequest(),
150 std::move(async_response));
151 }
152
ReadBuffers()153 void ConsumerIPCClientImpl::ReadBuffers() {
154 if (!connected_) {
155 PERFETTO_DLOG("Cannot ReadBuffers(), not connected to tracing service");
156 return;
157 }
158
159 ipc::Deferred<protos::gen::ReadBuffersResponse> async_response;
160
161 // The IPC layer guarantees that callbacks are destroyed after this object
162 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
163 // contract of this class expects the caller to not destroy the Consumer class
164 // before having destroyed this class. Hence binding |this| here is safe.
165 async_response.Bind(
166 [this](ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
167 OnReadBuffersResponse(std::move(response));
168 });
169 consumer_port_.ReadBuffers(protos::gen::ReadBuffersRequest(),
170 std::move(async_response));
171 }
172
OnReadBuffersResponse(ipc::AsyncResult<protos::gen::ReadBuffersResponse> response)173 void ConsumerIPCClientImpl::OnReadBuffersResponse(
174 ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
175 if (!response) {
176 PERFETTO_DLOG("ReadBuffers() failed");
177 return;
178 }
179 std::vector<TracePacket> trace_packets;
180 for (auto& resp_slice : response->slices()) {
181 const std::string& slice_data = resp_slice.data();
182 Slice slice = Slice::Allocate(slice_data.size());
183 memcpy(slice.own_data(), slice_data.data(), slice.size);
184 partial_packet_.AddSlice(std::move(slice));
185 if (resp_slice.last_slice_for_packet())
186 trace_packets.emplace_back(std::move(partial_packet_));
187 }
188 if (!trace_packets.empty() || !response.has_more())
189 consumer_->OnTraceData(std::move(trace_packets), response.has_more());
190 }
191
OnEnableTracingResponse(ipc::AsyncResult<protos::gen::EnableTracingResponse> response)192 void ConsumerIPCClientImpl::OnEnableTracingResponse(
193 ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
194 std::string error;
195 // |response| might be empty when the request gets rejected (if the connection
196 // with the service is dropped all outstanding requests are auto-rejected).
197 if (!response) {
198 error =
199 "EnableTracing IPC request rejected. This is likely due to a loss of "
200 "the traced connection";
201 } else {
202 error = response->error();
203 }
204 if (!response || response->disabled())
205 consumer_->OnTracingDisabled(error);
206 }
207
FreeBuffers()208 void ConsumerIPCClientImpl::FreeBuffers() {
209 if (!connected_) {
210 PERFETTO_DLOG("Cannot FreeBuffers(), not connected to tracing service");
211 return;
212 }
213
214 protos::gen::FreeBuffersRequest req;
215 ipc::Deferred<protos::gen::FreeBuffersResponse> async_response;
216 async_response.Bind(
217 [](ipc::AsyncResult<protos::gen::FreeBuffersResponse> response) {
218 if (!response)
219 PERFETTO_DLOG("FreeBuffers() failed");
220 });
221 consumer_port_.FreeBuffers(req, std::move(async_response));
222 }
223
Flush(uint32_t timeout_ms,FlushCallback callback,FlushFlags flush_flags)224 void ConsumerIPCClientImpl::Flush(uint32_t timeout_ms,
225 FlushCallback callback,
226 FlushFlags flush_flags) {
227 if (!connected_) {
228 PERFETTO_DLOG("Cannot Flush(), not connected to tracing service");
229 return callback(/*success=*/false);
230 }
231
232 protos::gen::FlushRequest req;
233 req.set_timeout_ms(static_cast<uint32_t>(timeout_ms));
234 req.set_flags(flush_flags.flags());
235 ipc::Deferred<protos::gen::FlushResponse> async_response;
236 async_response.Bind(
237 [callback](ipc::AsyncResult<protos::gen::FlushResponse> response) {
238 callback(!!response);
239 });
240 consumer_port_.Flush(req, std::move(async_response));
241 }
242
Detach(const std::string & key)243 void ConsumerIPCClientImpl::Detach(const std::string& key) {
244 if (!connected_) {
245 PERFETTO_DLOG("Cannot Detach(), not connected to tracing service");
246 return;
247 }
248
249 protos::gen::DetachRequest req;
250 req.set_key(key);
251 ipc::Deferred<protos::gen::DetachResponse> async_response;
252 auto weak_this = weak_ptr_factory_.GetWeakPtr();
253
254 async_response.Bind(
255 [weak_this](ipc::AsyncResult<protos::gen::DetachResponse> response) {
256 if (weak_this)
257 weak_this->consumer_->OnDetach(!!response);
258 });
259 consumer_port_.Detach(req, std::move(async_response));
260 }
261
Attach(const std::string & key)262 void ConsumerIPCClientImpl::Attach(const std::string& key) {
263 if (!connected_) {
264 PERFETTO_DLOG("Cannot Attach(), not connected to tracing service");
265 return;
266 }
267
268 {
269 protos::gen::AttachRequest req;
270 req.set_key(key);
271 ipc::Deferred<protos::gen::AttachResponse> async_response;
272 auto weak_this = weak_ptr_factory_.GetWeakPtr();
273
274 async_response.Bind(
275 [weak_this](ipc::AsyncResult<protos::gen::AttachResponse> response) {
276 if (!weak_this)
277 return;
278 if (!response) {
279 weak_this->consumer_->OnAttach(/*success=*/false, TraceConfig());
280 return;
281 }
282 const TraceConfig& trace_config = response->trace_config();
283
284 // If attached successfully, also attach to the end-of-trace
285 // notificaton callback, via EnableTracing(attach_notification_only).
286 protos::gen::EnableTracingRequest enable_req;
287 enable_req.set_attach_notification_only(true);
288 ipc::Deferred<protos::gen::EnableTracingResponse> enable_resp;
289 enable_resp.Bind(
290 [weak_this](
291 ipc::AsyncResult<protos::gen::EnableTracingResponse> resp) {
292 if (weak_this)
293 weak_this->OnEnableTracingResponse(std::move(resp));
294 });
295 weak_this->consumer_port_.EnableTracing(enable_req,
296 std::move(enable_resp));
297
298 weak_this->consumer_->OnAttach(/*success=*/true, trace_config);
299 });
300 consumer_port_.Attach(req, std::move(async_response));
301 }
302 }
303
GetTraceStats()304 void ConsumerIPCClientImpl::GetTraceStats() {
305 if (!connected_) {
306 PERFETTO_DLOG("Cannot GetTraceStats(), not connected to tracing service");
307 return;
308 }
309
310 protos::gen::GetTraceStatsRequest req;
311 ipc::Deferred<protos::gen::GetTraceStatsResponse> async_response;
312
313 // The IPC layer guarantees that callbacks are destroyed after this object
314 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
315 // contract of this class expects the caller to not destroy the Consumer class
316 // before having destroyed this class. Hence binding |this| here is safe.
317 async_response.Bind(
318 [this](ipc::AsyncResult<protos::gen::GetTraceStatsResponse> response) {
319 if (!response) {
320 consumer_->OnTraceStats(/*success=*/false, TraceStats());
321 return;
322 }
323 consumer_->OnTraceStats(/*success=*/true, response->trace_stats());
324 });
325 consumer_port_.GetTraceStats(req, std::move(async_response));
326 }
327
ObserveEvents(uint32_t enabled_event_types)328 void ConsumerIPCClientImpl::ObserveEvents(uint32_t enabled_event_types) {
329 if (!connected_) {
330 PERFETTO_DLOG("Cannot ObserveEvents(), not connected to tracing service");
331 return;
332 }
333
334 protos::gen::ObserveEventsRequest req;
335 for (uint32_t i = 0; i < 32; i++) {
336 const uint32_t event_id = 1u << i;
337 if (enabled_event_types & event_id)
338 req.add_events_to_observe(static_cast<ObservableEvents::Type>(event_id));
339 }
340
341 ipc::Deferred<protos::gen::ObserveEventsResponse> async_response;
342 // The IPC layer guarantees that callbacks are destroyed after this object
343 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
344 // contract of this class expects the caller to not destroy the Consumer class
345 // before having destroyed this class. Hence binding |this| here is safe.
346 async_response.Bind(
347 [this](ipc::AsyncResult<protos::gen::ObserveEventsResponse> response) {
348 // Skip empty response, which the service sends to close the stream.
349 if (!response.has_more()) {
350 PERFETTO_DCHECK(!response.success());
351 return;
352 }
353 consumer_->OnObservableEvents(response->events());
354 });
355 consumer_port_.ObserveEvents(req, std::move(async_response));
356 }
357
QueryServiceState(QueryServiceStateArgs args,QueryServiceStateCallback callback)358 void ConsumerIPCClientImpl::QueryServiceState(
359 QueryServiceStateArgs args,
360 QueryServiceStateCallback callback) {
361 if (!connected_) {
362 PERFETTO_DLOG(
363 "Cannot QueryServiceState(), not connected to tracing service");
364 return;
365 }
366
367 auto it = pending_query_svc_reqs_.insert(pending_query_svc_reqs_.end(),
368 {std::move(callback), {}});
369 protos::gen::QueryServiceStateRequest req;
370 req.set_sessions_only(args.sessions_only);
371 ipc::Deferred<protos::gen::QueryServiceStateResponse> async_response;
372 auto weak_this = weak_ptr_factory_.GetWeakPtr();
373 async_response.Bind(
374 [weak_this,
375 it](ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response) {
376 if (weak_this)
377 weak_this->OnQueryServiceStateResponse(std::move(response), it);
378 });
379 consumer_port_.QueryServiceState(req, std::move(async_response));
380 }
381
OnQueryServiceStateResponse(ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,PendingQueryServiceRequests::iterator req_it)382 void ConsumerIPCClientImpl::OnQueryServiceStateResponse(
383 ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,
384 PendingQueryServiceRequests::iterator req_it) {
385 PERFETTO_DCHECK(req_it->callback);
386
387 if (!response) {
388 auto callback = std::move(req_it->callback);
389 pending_query_svc_reqs_.erase(req_it);
390 callback(false, TracingServiceState());
391 return;
392 }
393
394 // The QueryServiceState response can be split in several chunks if the
395 // service has several data sources. The client is supposed to merge all the
396 // replies. The easiest way to achieve this is to re-serialize the partial
397 // response and then re-decode the merged result in one shot.
398 std::vector<uint8_t>& merged_resp = req_it->merged_resp;
399 std::vector<uint8_t> part = response->service_state().SerializeAsArray();
400 merged_resp.insert(merged_resp.end(), part.begin(), part.end());
401
402 if (response.has_more())
403 return;
404
405 // All replies have been received. Decode the merged result and reply to the
406 // callback.
407 protos::gen::TracingServiceState svc_state;
408 bool ok = svc_state.ParseFromArray(merged_resp.data(), merged_resp.size());
409 if (!ok)
410 PERFETTO_ELOG("Failed to decode merged QueryServiceStateResponse");
411 auto callback = std::move(req_it->callback);
412 pending_query_svc_reqs_.erase(req_it);
413 callback(ok, std::move(svc_state));
414 }
415
QueryCapabilities(QueryCapabilitiesCallback callback)416 void ConsumerIPCClientImpl::QueryCapabilities(
417 QueryCapabilitiesCallback callback) {
418 if (!connected_) {
419 PERFETTO_DLOG(
420 "Cannot QueryCapabilities(), not connected to tracing service");
421 return;
422 }
423
424 protos::gen::QueryCapabilitiesRequest req;
425 ipc::Deferred<protos::gen::QueryCapabilitiesResponse> async_response;
426 async_response.Bind(
427 [callback](
428 ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse> response) {
429 if (!response) {
430 // If the IPC fails, we are talking to an older version of the service
431 // that didn't support QueryCapabilities at all. In this case return
432 // an empty capabilities message.
433 callback(TracingServiceCapabilities());
434 } else {
435 callback(response->capabilities());
436 }
437 });
438 consumer_port_.QueryCapabilities(req, std::move(async_response));
439 }
440
SaveTraceForBugreport(SaveTraceForBugreportCallback callback)441 void ConsumerIPCClientImpl::SaveTraceForBugreport(
442 SaveTraceForBugreportCallback callback) {
443 if (!connected_) {
444 PERFETTO_DLOG(
445 "Cannot SaveTraceForBugreport(), not connected to tracing service");
446 return;
447 }
448
449 protos::gen::SaveTraceForBugreportRequest req;
450 ipc::Deferred<protos::gen::SaveTraceForBugreportResponse> async_response;
451 async_response.Bind(
452 [callback](ipc::AsyncResult<protos::gen::SaveTraceForBugreportResponse>
453 response) {
454 if (!response) {
455 // If the IPC fails, we are talking to an older version of the service
456 // that didn't support SaveTraceForBugreport at all.
457 callback(
458 false,
459 "The tracing service doesn't support SaveTraceForBugreport()");
460 } else {
461 callback(response->success(), response->msg());
462 }
463 });
464 consumer_port_.SaveTraceForBugreport(req, std::move(async_response));
465 }
466
CloneSession(CloneSessionArgs args)467 void ConsumerIPCClientImpl::CloneSession(CloneSessionArgs args) {
468 if (!connected_) {
469 PERFETTO_DLOG("Cannot CloneSession(), not connected to tracing service");
470 return;
471 }
472
473 protos::gen::CloneSessionRequest req;
474 if (args.tsid) {
475 req.set_session_id(args.tsid);
476 }
477 if (!args.unique_session_name.empty()) {
478 req.set_unique_session_name(args.unique_session_name);
479 }
480 req.set_skip_trace_filter(args.skip_trace_filter);
481 req.set_for_bugreport(args.for_bugreport);
482 if (!args.clone_trigger_name.empty()) {
483 req.set_clone_trigger_name(args.clone_trigger_name);
484 }
485 if (!args.clone_trigger_producer_name.empty()) {
486 req.set_clone_trigger_producer_name(args.clone_trigger_producer_name);
487 }
488 if (args.clone_trigger_trusted_producer_uid != 0) {
489 req.set_clone_trigger_trusted_producer_uid(
490 static_cast<int32_t>(args.clone_trigger_trusted_producer_uid));
491 }
492 if (args.clone_trigger_boot_time_ns != 0) {
493 req.set_clone_trigger_boot_time_ns(args.clone_trigger_boot_time_ns);
494 }
495 ipc::Deferred<protos::gen::CloneSessionResponse> async_response;
496 auto weak_this = weak_ptr_factory_.GetWeakPtr();
497
498 async_response.Bind(
499 [weak_this](
500 ipc::AsyncResult<protos::gen::CloneSessionResponse> response) {
501 if (!weak_this)
502 return;
503 if (!response) {
504 // If the IPC fails, we are talking to an older version of the service
505 // that didn't support CloneSession at all.
506 weak_this->consumer_->OnSessionCloned(
507 {false, "CloneSession IPC not supported", {}});
508 } else {
509 base::Uuid uuid(response->uuid_lsb(), response->uuid_msb());
510 weak_this->consumer_->OnSessionCloned(
511 {response->success(), response->error(), uuid});
512 }
513 });
514 consumer_port_.CloneSession(req, std::move(async_response));
515 }
516 } // namespace perfetto
517