xref: /aosp_15_r20/external/perfetto/python/perfetto/bigtrace/api.py (revision 6dbdd20afdafa5e3ca9b8809fa73465d530080dc)
1#!/usr/bin/env python3
2# Copyright (C) 2024 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from typing import List
17
18import grpc
19import pandas as pd
20
21from perfetto.bigtrace.protos.perfetto.bigtrace.orchestrator_pb2 import BigtraceQueryArgs
22from perfetto.bigtrace.protos.perfetto.bigtrace.orchestrator_pb2_grpc import BigtraceOrchestratorStub
23from perfetto.common.query_result_iterator import QueryResultIterator
24from perfetto.common.exceptions import PerfettoException
25
26# C++ INT_MAX which is the maximum gRPC message size
27MAX_MESSAGE_SIZE = 2147483647
28
29
30class Bigtrace:
31
32  def __init__(self,
33               orchestrator_address="127.0.0.1:5051",
34               wait_for_ready_for_testing=False):
35    options = [('grpc.max_receive_message_length', MAX_MESSAGE_SIZE),
36               ('grpc.max_message_length', MAX_MESSAGE_SIZE)]
37    channel = grpc.insecure_channel(orchestrator_address, options=options)
38    self.stub = BigtraceOrchestratorStub(channel)
39    self.wait_for_ready_for_testing = wait_for_ready_for_testing
40
41  def query(self, traces: List[str], sql_query: str):
42    if not traces:
43      raise PerfettoException("Trace list cannot be empty")
44    if not sql_query:
45      raise PerfettoException("SQL query cannot be empty")
46    # Query and then convert to pandas
47    tables = []
48    args = BigtraceQueryArgs(traces=traces, sql_query=sql_query)
49
50    try:
51      responses = self.stub.Query(
52          args, wait_for_ready=self.wait_for_ready_for_testing)
53      for response in responses:
54        repeated_batches = []
55        results = response.result
56        column_names = results[0].column_names
57        for result in results:
58          repeated_batches.extend(result.batch)
59        iterator = QueryResultIterator(column_names, repeated_batches)
60        df = iterator.as_pandas_dataframe()
61        # TODO(b/366409021) Investigate whether this is the
62        # best place to insert these addresses for performance
63        df.insert(0, '_trace_address', response.trace)
64        tables.append(df)
65      flattened = pd.concat(tables)
66      return flattened.reset_index(drop=True)
67    except grpc.RpcError as e:
68      raise PerfettoException(f"gRPC {e.code().name} error - {e.details()}")
69