xref: /aosp_15_r20/development/python-packages/adb/adb/__init__.py (revision 90c8c64db3049935a07c6143d7fd006e26f8ecca)
1#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from __future__ import annotations
17
18import atexit
19import base64
20import logging
21import os
22import re
23import subprocess
24from typing import Any, Callable
25
26
27class FindDeviceError(RuntimeError):
28    pass
29
30
31class DeviceNotFoundError(FindDeviceError):
32    def __init__(self, serial: str) -> None:
33        self.serial = serial
34        super(DeviceNotFoundError, self).__init__(
35            'No device with serial {}'.format(serial))
36
37
38class NoUniqueDeviceError(FindDeviceError):
39    def __init__(self) -> None:
40        super(NoUniqueDeviceError, self).__init__('No unique device')
41
42
43class ShellError(RuntimeError):
44    def __init__(
45        self, cmd: list[str], stdout: str, stderr: str, exit_code: int
46    ) -> None:
47        super(ShellError, self).__init__(
48            '`{0}` exited with code {1}'.format(cmd, exit_code))
49        self.cmd = cmd
50        self.stdout = stdout
51        self.stderr = stderr
52        self.exit_code = exit_code
53
54
55def get_devices(adb_path: str = 'adb') -> list[str]:
56    with open(os.devnull, 'wb') as devnull:
57        subprocess.check_call([adb_path, 'start-server'], stdout=devnull,
58                              stderr=devnull)
59    out = split_lines(
60        subprocess.check_output([adb_path, 'devices']).decode('utf-8'))
61
62    # The first line of `adb devices` just says "List of attached devices", so
63    # skip that.
64    devices = []
65    for line in out[1:]:
66        if not line.strip():
67            continue
68        if 'offline' in line:
69            continue
70
71        serial, _ = re.split(r'\s+', line, maxsplit=1)
72        devices.append(serial)
73    return devices
74
75
76def _get_unique_device(
77    product: str | None = None, adb_path: str = 'adb'
78) -> AndroidDevice:
79    devices = get_devices(adb_path=adb_path)
80    if len(devices) != 1:
81        raise NoUniqueDeviceError()
82    return AndroidDevice(devices[0], product, adb_path)
83
84
85def _get_device_by_serial(
86    serial: str, product: str | None = None, adb_path: str = 'adb'
87) -> AndroidDevice:
88    for device in get_devices(adb_path=adb_path):
89        if device == serial:
90            return AndroidDevice(serial, product, adb_path)
91    raise DeviceNotFoundError(serial)
92
93
94def get_device(
95    serial: str | None = None, product: str | None = None, adb_path: str = 'adb'
96) -> AndroidDevice:
97    """Get a uniquely identified AndroidDevice if one is available.
98
99    Raises:
100        DeviceNotFoundError:
101            The serial specified by `serial` or $ANDROID_SERIAL is not
102            connected.
103
104        NoUniqueDeviceError:
105            Neither `serial` nor $ANDROID_SERIAL was set, and the number of
106            devices connected to the system is not 1. Having 0 connected
107            devices will also result in this error.
108
109    Returns:
110        An AndroidDevice associated with the first non-None identifier in the
111        following order of preference:
112
113        1) The `serial` argument.
114        2) The environment variable $ANDROID_SERIAL.
115        3) The single device connnected to the system.
116    """
117    if serial is not None:
118        return _get_device_by_serial(serial, product, adb_path)
119
120    android_serial = os.getenv('ANDROID_SERIAL')
121    if android_serial is not None:
122        return _get_device_by_serial(android_serial, product, adb_path)
123
124    return _get_unique_device(product, adb_path=adb_path)
125
126
127def _get_device_by_type(flag: str, adb_path: str) -> AndroidDevice:
128    with open(os.devnull, 'wb') as devnull:
129        subprocess.check_call([adb_path, 'start-server'], stdout=devnull,
130                              stderr=devnull)
131    try:
132        serial = subprocess.check_output(
133            [adb_path, flag, 'get-serialno']).decode('utf-8').strip()
134    except subprocess.CalledProcessError:
135        raise RuntimeError('adb unexpectedly returned nonzero')
136    if serial == 'unknown':
137        raise NoUniqueDeviceError()
138    return _get_device_by_serial(serial, adb_path=adb_path)
139
140
141def get_usb_device(adb_path: str = 'adb') -> AndroidDevice:
142    """Get the unique USB-connected AndroidDevice if it is available.
143
144    Raises:
145        NoUniqueDeviceError:
146            0 or multiple devices are connected via USB.
147
148    Returns:
149        An AndroidDevice associated with the unique USB-connected device.
150    """
151    return _get_device_by_type('-d', adb_path=adb_path)
152
153
154def get_emulator_device(adb_path: str = 'adb') -> AndroidDevice:
155    """Get the unique emulator AndroidDevice if it is available.
156
157    Raises:
158        NoUniqueDeviceError:
159            0 or multiple emulators are running.
160
161    Returns:
162        An AndroidDevice associated with the unique running emulator.
163    """
164    return _get_device_by_type('-e', adb_path=adb_path)
165
166
167def split_lines(s: str) -> list[str]:
168    """Splits lines in a way that works even on Windows and old devices.
169
170    Windows will see \r\n instead of \n, old devices do the same, old devices
171    on Windows will see \r\r\n.
172    """
173    # rstrip is used here to workaround a difference between splitlines and
174    # re.split:
175    # >>> 'foo\n'.splitlines()
176    # ['foo']
177    # >>> re.split(r'\n', 'foo\n')
178    # ['foo', '']
179    return re.split(r'[\r\n]+', s.rstrip())
180
181
182def version(adb_path: list[str] | None = None) -> int:
183    """Get the version of adb (in terms of ADB_SERVER_VERSION)."""
184
185    adb_path = adb_path if adb_path is not None else ['adb']
186    version_output = subprocess.check_output(adb_path + ['version'], encoding='utf-8')
187    pattern = r'^Android Debug Bridge version 1.0.(\d+)$'
188    result = re.match(pattern, version_output.splitlines()[0])
189    if not result:
190        return 0
191    return int(result.group(1))
192
193
194class AndroidDevice(object):
195    # Delimiter string to indicate the start of the exit code.
196    _RETURN_CODE_DELIMITER = 'x'
197
198    # Follow any shell command with this string to get the exit
199    # status of a program since this isn't propagated by adb.
200    #
201    # The delimiter is needed because `printf 1; echo $?` would print
202    # "10", and we wouldn't be able to distinguish the exit code.
203    _RETURN_CODE_PROBE = [';', 'echo', '{0}$?'.format(_RETURN_CODE_DELIMITER)]
204
205    # Maximum search distance from the output end to find the delimiter.
206    # adb on Windows returns \r\n even if adbd returns \n. Some old devices
207    # seem to actually return \r\r\n.
208    _RETURN_CODE_SEARCH_LENGTH = len(
209        '{0}255\r\r\n'.format(_RETURN_CODE_DELIMITER))
210
211    def __init__(
212        self, serial: str | None, product: str | None = None, adb_path: str = 'adb'
213    ) -> None:
214        self.serial = serial
215        self.product = product
216        self.adb_path = adb_path
217        self.adb_cmd = [adb_path]
218
219        if self.serial is not None:
220            self.adb_cmd.extend(['-s', self.serial])
221        if self.product is not None:
222            self.adb_cmd.extend(['-p', self.product])
223        self._linesep: str | None = None
224        self._features: list[str] | None = None
225
226    @property
227    def linesep(self) -> str:
228        if self._linesep is None:
229            self._linesep = subprocess.check_output(
230                self.adb_cmd + ['shell', 'echo'], encoding='utf-8')
231        return self._linesep
232
233    @property
234    def features(self) -> list[str]:
235        if self._features is None:
236            try:
237                self._features = split_lines(self._simple_call(['features']))
238            except subprocess.CalledProcessError:
239                self._features = []
240        return self._features
241
242    def has_shell_protocol(self) -> bool:
243        return version(self.adb_cmd) >= 35 and 'shell_v2' in self.features
244
245    def _make_shell_cmd(self, user_cmd: list[str]) -> list[str]:
246        command = self.adb_cmd + ['shell'] + user_cmd
247        if not self.has_shell_protocol():
248            command += self._RETURN_CODE_PROBE
249        return command
250
251    def _parse_shell_output(self, out: str) -> tuple[int, str]:
252        """Finds the exit code string from shell output.
253
254        Args:
255            out: Shell output string.
256
257        Returns:
258            An (exit_code, output_string) tuple. The output string is
259            cleaned of any additional stuff we appended to find the
260            exit code.
261
262        Raises:
263            RuntimeError: Could not find the exit code in |out|.
264        """
265        search_text = out
266        if len(search_text) > self._RETURN_CODE_SEARCH_LENGTH:
267            # We don't want to search over massive amounts of data when we know
268            # the part we want is right at the end.
269            search_text = search_text[-self._RETURN_CODE_SEARCH_LENGTH:]
270        partition = search_text.rpartition(self._RETURN_CODE_DELIMITER)
271        if partition[1] == '':
272            raise RuntimeError('Could not find exit status in shell output.')
273        result = int(partition[2])
274        # partition[0] won't contain the full text if search_text was
275        # truncated, pull from the original string instead.
276        out = out[:-len(partition[1]) - len(partition[2])]
277        return result, out
278
279    def _simple_call(self, cmd: list[str]) -> str:
280        logging.info(' '.join(self.adb_cmd + cmd))
281        return subprocess.check_output(
282            self.adb_cmd + cmd, stderr=subprocess.STDOUT).decode('utf-8')
283
284    def shell(self, cmd: list[str]) -> tuple[str, str]:
285        """Calls `adb shell`
286
287        Args:
288            cmd: command to execute as a list of strings.
289
290        Returns:
291            A (stdout, stderr) tuple. Stderr may be combined into stdout
292            if the device doesn't support separate streams.
293
294        Raises:
295            ShellError: the exit code was non-zero.
296        """
297        exit_code, stdout, stderr = self.shell_nocheck(cmd)
298        if exit_code != 0:
299            raise ShellError(cmd, stdout, stderr, exit_code)
300        return stdout, stderr
301
302    def shell_nocheck(self, cmd: list[str]) -> tuple[int, str, str]:
303        """Calls `adb shell`
304
305        Args:
306            cmd: command to execute as a list of strings.
307
308        Returns:
309            An (exit_code, stdout, stderr) tuple. Stderr may be combined
310            into stdout if the device doesn't support separate streams.
311        """
312        cmd = self._make_shell_cmd(cmd)
313        logging.info(' '.join(cmd))
314        p = subprocess.Popen(
315            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
316        stdout, stderr = p.communicate()
317        if self.has_shell_protocol():
318            exit_code = p.returncode
319        else:
320            exit_code, stdout = self._parse_shell_output(stdout)
321        return exit_code, stdout, stderr
322
323    def shell_popen(
324        self,
325        cmd: list[str],
326        kill_atexit: bool = True,
327        preexec_fn: Callable[[], None] | None = None,
328        creationflags: int = 0,
329        **kwargs: Any,
330    ) -> subprocess.Popen[Any]:
331        """Calls `adb shell` and returns a handle to the adb process.
332
333        This function provides direct access to the subprocess used to run the
334        command, without special return code handling. Users that need the
335        return value must retrieve it themselves.
336
337        Args:
338            cmd: Array of command arguments to execute.
339            kill_atexit: Whether to kill the process upon exiting.
340            preexec_fn: Argument forwarded to subprocess.Popen.
341            creationflags: Argument forwarded to subprocess.Popen.
342            **kwargs: Arguments forwarded to subprocess.Popen.
343
344        Returns:
345            subprocess.Popen handle to the adb shell instance
346        """
347
348        command = self.adb_cmd + ['shell'] + cmd
349
350        # Make sure a ctrl-c in the parent script doesn't kill gdbserver.
351        if os.name == 'nt':
352            creationflags |= subprocess.CREATE_NEW_PROCESS_GROUP
353        else:
354            if preexec_fn is None:
355                preexec_fn = os.setpgrp
356            elif preexec_fn is not os.setpgrp:
357                fn = preexec_fn
358                def _wrapper() -> None:
359                    fn()
360                    os.setpgrp()
361                preexec_fn = _wrapper
362
363        p = subprocess.Popen(command, creationflags=creationflags,
364                             preexec_fn=preexec_fn, **kwargs)
365
366        if kill_atexit:
367            atexit.register(p.kill)
368
369        return p
370
371    def install(self, filename: str, replace: bool = False) -> str:
372        cmd = ['install']
373        if replace:
374            cmd.append('-r')
375        cmd.append(filename)
376        return self._simple_call(cmd)
377
378    def push(self, local: str | list[str], remote: str, sync: bool = False, parameters= []) -> str:
379        """Transfer a local file or directory to the device.
380
381        Args:
382            local: The local file or directory to transfer.
383            remote: The remote path to which local should be transferred.
384            sync: If True, only transfers files that are newer on the host than
385                  those on the device. If False, transfers all files.
386
387        Returns:
388            Output of the command.
389        """
390        cmd = ['push']
391        cmd = cmd + parameters
392
393        if sync:
394            cmd.append('--sync')
395
396        if isinstance(local, str):
397            cmd.extend([local, remote])
398        else:
399            cmd.extend(local)
400            cmd.append(remote)
401
402        return self._simple_call(cmd)
403
404    def pull(self, remote: str, local: str) -> str:
405        return self._simple_call(['pull', remote, local])
406
407    def sync(self, directory: str | None = None) -> str:
408        cmd = ['sync']
409        if directory is not None:
410            cmd.append(directory)
411        return self._simple_call(cmd)
412
413    def tcpip(self, port: str) -> str:
414        return self._simple_call(['tcpip', port])
415
416    def usb(self) -> str:
417        return self._simple_call(['usb'])
418
419    def reboot(self) -> str:
420        return self._simple_call(['reboot'])
421
422    def remount(self) -> str:
423        return self._simple_call(['remount'])
424
425    def root(self) -> str:
426        return self._simple_call(['root'])
427
428    def unroot(self) -> str:
429        return self._simple_call(['unroot'])
430
431    def connect(self, host: str) -> str:
432        return self._simple_call(['connect', host])
433
434    def disconnect(self, host: str) -> str:
435        return self._simple_call(['disconnect', host])
436
437    def forward(self, local: str, remote: str) -> str:
438        return self._simple_call(['forward', local, remote])
439
440    def forward_list(self) -> str:
441        return self._simple_call(['forward', '--list'])
442
443    def forward_no_rebind(self, local: str, remote: str) -> str:
444        return self._simple_call(['forward', '--no-rebind', local, remote])
445
446    def forward_remove(self, local: str) -> str:
447        return self._simple_call(['forward', '--remove', local])
448
449    def forward_remove_all(self) -> str:
450        return self._simple_call(['forward', '--remove-all'])
451
452    def reverse(self, remote: str, local: str) -> str:
453        return self._simple_call(['reverse', remote, local])
454
455    def reverse_list(self) -> str:
456        return self._simple_call(['reverse', '--list'])
457
458    def reverse_no_rebind(self, local: str, remote: str) -> str:
459        return self._simple_call(['reverse', '--no-rebind', local, remote])
460
461    def reverse_remove_all(self) -> str:
462        return self._simple_call(['reverse', '--remove-all'])
463
464    def reverse_remove(self, remote: str) -> str:
465        return self._simple_call(['reverse', '--remove', remote])
466
467    def wait(self) -> str:
468        return self._simple_call(['wait-for-device'])
469
470    def get_prop(self, prop_name: str) -> str | None:
471        output = split_lines(self.shell(['getprop', prop_name])[0])
472        if len(output) != 1:
473            raise RuntimeError('Too many lines in getprop output:\n' +
474                               '\n'.join(output))
475        value = output[0]
476        if not value.strip():
477            return None
478        return value
479
480    def set_prop(self, prop_name: str, value: str) -> None:
481        self.shell(['setprop', prop_name, value])
482
483    def logcat(self) -> str:
484        """Returns the contents of logcat."""
485        return self._simple_call(['logcat', '-d'])
486
487    def clear_logcat(self) -> None:
488        """Clears the logcat buffer."""
489        self._simple_call(['logcat', '-c'])
490