1#!/usr/bin/env python3 2# Copyright (C) 2024 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16from typing import List 17 18import grpc 19import pandas as pd 20 21from perfetto.bigtrace.protos.perfetto.bigtrace.orchestrator_pb2 import BigtraceQueryArgs 22from perfetto.bigtrace.protos.perfetto.bigtrace.orchestrator_pb2_grpc import BigtraceOrchestratorStub 23from perfetto.common.query_result_iterator import QueryResultIterator 24from perfetto.common.exceptions import PerfettoException 25 26# C++ INT_MAX which is the maximum gRPC message size 27MAX_MESSAGE_SIZE = 2147483647 28 29 30class Bigtrace: 31 32 def __init__(self, 33 orchestrator_address="127.0.0.1:5051", 34 wait_for_ready_for_testing=False): 35 options = [('grpc.max_receive_message_length', MAX_MESSAGE_SIZE), 36 ('grpc.max_message_length', MAX_MESSAGE_SIZE)] 37 channel = grpc.insecure_channel(orchestrator_address, options=options) 38 self.stub = BigtraceOrchestratorStub(channel) 39 self.wait_for_ready_for_testing = wait_for_ready_for_testing 40 41 def query(self, traces: List[str], sql_query: str): 42 if not traces: 43 raise PerfettoException("Trace list cannot be empty") 44 if not sql_query: 45 raise PerfettoException("SQL query cannot be empty") 46 # Query and then convert to pandas 47 tables = [] 48 args = BigtraceQueryArgs(traces=traces, sql_query=sql_query) 49 50 try: 51 responses = self.stub.Query( 52 args, wait_for_ready=self.wait_for_ready_for_testing) 53 for response in responses: 54 repeated_batches = [] 55 results = response.result 56 column_names = results[0].column_names 57 for result in results: 58 repeated_batches.extend(result.batch) 59 iterator = QueryResultIterator(column_names, repeated_batches) 60 df = iterator.as_pandas_dataframe() 61 # TODO(b/366409021) Investigate whether this is the 62 # best place to insert these addresses for performance 63 df.insert(0, '_trace_address', response.trace) 64 tables.append(df) 65 flattened = pd.concat(tables) 66 return flattened.reset_index(drop=True) 67 except grpc.RpcError as e: 68 raise PerfettoException(f"gRPC {e.code().name} error - {e.details()}") 69