xref: /aosp_15_r20/system/extras/torq/device.py (revision 288bf5226967eb3dac5cce6c939ccc2a7f2b4fe5)
1#
2# Copyright (C) 2024 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import math
18import os
19import subprocess
20import time
21
22from validation_error import ValidationError
23
24ADB_ROOT_TIMED_OUT_LIMIT_SECS = 5
25ADB_BOOT_COMPLETED_TIMED_OUT_LIMIT_SECS = 30
26POLLING_INTERVAL_SECS = 0.5
27SIMPLEPERF_TRACE_FILE = "/data/misc/perfetto-traces/perf.data"
28
29class AdbDevice:
30  """
31  Class representing a device. APIs interact with the current device through
32  the adb bridge.
33  """
34  def __init__(self, serial):
35    self.serial = serial
36
37  @staticmethod
38  def get_adb_devices():
39    """
40    Returns a list of devices connected to the adb bridge.
41    The output of the command 'adb devices' is expected to be of the form:
42    List of devices attached
43    SOMEDEVICE1234    device
44    device2:5678    device
45    """
46    command_output = subprocess.run(["adb", "devices"], capture_output=True)
47    output_lines = command_output.stdout.decode("utf-8").split("\n")
48    devices = []
49    for line in output_lines[:-2]:
50      if line[0] == "*" or line == "List of devices attached":
51        continue
52      words_in_line = line.split('\t')
53      if words_in_line[1] == "device":
54        devices.append(words_in_line[0])
55    return devices
56
57  def check_device_connection(self):
58    devices = self.get_adb_devices()
59    if len(devices) == 0:
60      return ValidationError("There are currently no devices connected.", None)
61    if self.serial is not None:
62      if self.serial not in devices:
63        return ValidationError(("Device with serial %s is not connected."
64                                % self.serial), None)
65    elif "ANDROID_SERIAL" in os.environ:
66      if os.environ["ANDROID_SERIAL"] not in devices:
67        return ValidationError(("Device with serial %s is set as environment"
68                                " variable, ANDROID_SERIAL, but is not"
69                                " connected."
70                                % os.environ["ANDROID_SERIAL"]), None)
71      self.serial = os.environ["ANDROID_SERIAL"]
72    elif len(devices) == 1:
73      self.serial = devices[0]
74    else:
75      return ValidationError(("There is more than one device currently"
76                              " connected."),
77                             ("Run one of the following commands to choose one"
78                              " of the connected devices:\n\t torq --serial %s"
79                              % "\n\t torq --serial ".join(devices)))
80    return None
81
82  @staticmethod
83  def poll_is_task_completed(timed_out_limit, interval, check_is_completed):
84    start_time = time.time()
85    while True:
86      time.sleep(interval)
87      if check_is_completed():
88        return True
89      if time.time() - start_time > timed_out_limit:
90        return False
91
92  def root_device(self):
93    subprocess.run(["adb", "-s", self.serial, "root"])
94    if not self.poll_is_task_completed(ADB_ROOT_TIMED_OUT_LIMIT_SECS,
95                                       POLLING_INTERVAL_SECS,
96                                       lambda: self.serial in
97                                               self.get_adb_devices()):
98      raise Exception(("Device with serial %s took too long to reconnect after"
99                       " being rooted." % self.serial))
100
101  def remove_file(self, file_path):
102    subprocess.run(["adb", "-s", self.serial, "shell", "rm", "-f", file_path])
103
104  def start_perfetto_trace(self, config):
105    return subprocess.Popen(("adb -s %s shell perfetto -c - --txt -o"
106                             " /data/misc/perfetto-traces/"
107                             "trace.perfetto-trace %s"
108                             % (self.serial, config)), shell=True)
109
110  def start_simpleperf_trace(self, command):
111    events_param = "-e " + ",".join(command.simpleperf_event)
112    return subprocess.Popen(("adb -s %s shell simpleperf record -a -f 1000 "
113                             "--post-unwind=yes -m 8192 -g --duration %d"
114                             " %s -o %s"
115                             % (self.serial,
116                                int(math.ceil(command.dur_ms/1000)),
117                                events_param, SIMPLEPERF_TRACE_FILE)),
118                            shell=True)
119
120  def pull_file(self, file_path, host_file):
121    subprocess.run(["adb", "-s", self.serial, "pull", file_path, host_file])
122
123  def get_all_users(self):
124    command_output = subprocess.run(["adb", "-s", self.serial, "shell", "pm",
125                                     "list", "users"], capture_output=True)
126    output_lines = command_output.stdout.decode("utf-8").split("\n")[1:-1]
127    return [int((line.split("{", 1)[1]).split(":", 1)[0]) for line in
128            output_lines]
129
130  def user_exists(self, user):
131    users = self.get_all_users()
132    if user not in users:
133      return ValidationError(("User ID %s does not exist on device with serial"
134                              " %s." % (user, self.serial)),
135                             ("Select from one of the following user IDs on"
136                              " device with serial %s: %s"
137                              % (self.serial, ", ".join(map(str, users)))))
138    return None
139
140  def get_current_user(self):
141    command_output = subprocess.run(["adb", "-s", self.serial, "shell", "am",
142                                     "get-current-user"], capture_output=True)
143    return int(command_output.stdout.decode("utf-8").split()[0])
144
145  def perform_user_switch(self, user):
146    subprocess.run(["adb", "-s", self.serial, "shell", "am", "switch-user",
147                    str(user)])
148
149  def write_to_file(self, file_path, host_file_string):
150    subprocess.run(("adb -s %s shell 'cat > %s %s'"
151                    % (self.serial, file_path, host_file_string)), shell=True)
152
153  def set_prop(self, prop, value):
154    subprocess.run(["adb", "-s", self.serial, "shell", "setprop", prop, value])
155
156  def reboot(self):
157    subprocess.run(["adb", "-s", self.serial, "reboot"])
158    if not self.poll_is_task_completed(ADB_ROOT_TIMED_OUT_LIMIT_SECS,
159                                       POLLING_INTERVAL_SECS,
160                                       lambda: self.serial not in
161                                               self.get_adb_devices()):
162      raise Exception(("Device with serial %s took too long to start"
163                       " rebooting." % self.serial))
164
165  def wait_for_device(self):
166    subprocess.run(["adb", "-s", self.serial, "wait-for-device"])
167
168  def is_boot_completed(self):
169    command_output = subprocess.run(["adb", "-s", self.serial, "shell",
170                                     "getprop", "sys.boot_completed"],
171                                    capture_output=True)
172    return command_output.stdout.decode("utf-8").strip() == "1"
173
174  def wait_for_boot_to_complete(self):
175    if not self.poll_is_task_completed(ADB_BOOT_COMPLETED_TIMED_OUT_LIMIT_SECS,
176                                       POLLING_INTERVAL_SECS,
177                                       self.is_boot_completed):
178      raise Exception(("Device with serial %s took too long to finish"
179                       " rebooting." % self.serial))
180
181  def get_packages(self):
182    return [package.removeprefix("package:") for package in subprocess.run(
183        ["adb", "-s", self.serial, "shell", "pm", "list", "packages"],
184        capture_output=True).stdout.decode("utf-8").splitlines()]
185
186  def get_pid(self, package):
187    return subprocess.run("adb -s %s shell pidof %s" % (self.serial, package),
188                          shell=True, capture_output=True
189                          ).stdout.decode("utf-8").split("\n")[0]
190
191  def is_package_running(self, package):
192    return self.get_pid(package) != ""
193
194  def start_package(self, package):
195    if subprocess.run(
196        ["adb", "-s", self.serial, "shell", "am", "start", package],
197        capture_output=True).stderr.decode("utf-8").split("\n")[0] != "":
198      return ValidationError(("Cannot start package %s on device with"
199                              " serial %s because %s is a service package,"
200                              " which doesn't implement a MAIN activity."
201                              % (package, self.serial, package)), None)
202    return None
203
204  def kill_pid(self, package):
205    pid = self.get_pid(package)
206    if pid != "":
207      subprocess.run(["adb", "-s", self.serial, "shell", "kill", "-9", pid])
208
209  def force_stop_package(self, package):
210    subprocess.run(["adb", "-s", self.serial, "shell", "am", "force-stop",
211                    package])
212
213  def get_prop(self, prop):
214    return subprocess.run(
215        ["adb", "-s", self.serial, "shell", "getprop", prop],
216        capture_output=True).stdout.decode("utf-8").split("\n")[0]
217
218  def get_android_sdk_version(self):
219    return int(self.get_prop("ro.build.version.sdk"))
220
221  def simpleperf_event_exists(self, simpleperf_events):
222    events_copy = simpleperf_events.copy()
223    grep_command = "grep"
224    for event in simpleperf_events:
225      grep_command += " -e " + event.lower()
226
227    output = subprocess.run(["adb", "-s", self.serial, "shell",
228                             "simpleperf", "list", "|", grep_command],
229                            capture_output=True)
230
231    if output is None or len(output.stdout) == 0:
232      raise Exception("Error while validating simpleperf events.")
233    lines = output.stdout.decode("utf-8").split("\n")
234
235    # Anything that does not start with two spaces is not a command.
236    # Any command with a space will have the command before the first space.
237    for line in lines:
238      if len(line) <= 3 or line[:2] != "  " or line[2] == "#":
239        # Line doesn't contain a simpleperf event
240        continue
241      event = line[2:].split(" ")[0]
242      if event in events_copy:
243        events_copy.remove(event)
244        if len(events_copy) == 0:
245          # All of the events exist, exit early
246          break
247
248    if len(events_copy) > 0:
249      return ValidationError("The following simpleperf event(s) are invalid:"
250                             " %s."
251                             % events_copy,
252                             "Run adb shell simpleperf list to"
253                             " see valid simpleperf events.")
254    return None
255