xref: /aosp_15_r20/external/pigweed/pw_emu/py/pw_emu/frontend.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""User API"""
16
17import io
18import os
19import subprocess
20import tempfile
21
22from pathlib import Path
23from typing import Any
24
25from pw_emu.core import (
26    AlreadyRunning,
27    Config,
28    ConfigError,
29    Connector,
30    Launcher,
31    InvalidEmulator,
32    InvalidChannelType,
33    NotRunning,
34)
35
36
37class Emulator:
38    """Launches, controls and interacts with an emulator instance."""
39
40    def __init__(self, wdir: Path, config_path: Path | None = None) -> None:
41        self._wdir = wdir
42        self._config_path = config_path
43        self._connector: Connector | None = None
44        self._launcher: Launcher | None = None
45
46    def _get_launcher(self, target: str) -> Launcher:
47        """Returns an emulator for a given target.
48
49        If there are multiple emulators for the same target it will return
50        an arbitrary emulator launcher.
51        """
52        config = Config(self._config_path)
53        target_config = config.get(
54            ['targets', target],
55            optional=False,
56            entry_type=dict,
57        )
58        for key in target_config.keys():
59            try:
60                return Launcher.get(key, self._config_path)
61            except InvalidEmulator:
62                pass
63        raise ConfigError(
64            self._config_path,
65            f'could not determine emulator for target `{target}`',
66        )
67
68    def start(
69        self,
70        target: str,
71        file: Path | None = None,
72        pause: bool = False,
73        debug: bool = False,
74        foreground: bool = False,
75        args: str | None = None,
76    ) -> None:
77        """Starts the emulator for the given ``target``.
78
79        If ``file`` is set the emulator loads the file before starting.
80
81        If ``pause`` is ``True`` the emulator pauses until the debugger is
82        connected.
83
84        If ``debug`` is ``True`` the emulator runs in the foreground with debug
85        output enabled. This is useful for seeing errors, traces, etc.
86
87        If ``foreground`` is ``True`` the emulator runs in the foreground,
88        otherwise it starts in daemon mode. Foreground mode is useful when
89        there is another process controlling the emulator's life cycle,
90        e.g. cuttlefish.
91
92        ``args`` are passed directly to the emulator.
93
94        """
95        if self._connector:
96            raise AlreadyRunning(self._wdir)
97
98        if self._launcher is None:
99            self._launcher = self._get_launcher(target)
100        self._connector = self._launcher.start(
101            wdir=self._wdir,
102            target=target,
103            file=file,
104            pause=pause,
105            debug=debug,
106            foreground=foreground,
107            args=args,
108        )
109
110    def _c(self) -> Connector:
111        if self._connector is None:
112            self._connector = Connector.get(self._wdir)
113            if not self.running():
114                raise NotRunning(self._wdir)
115        return self._connector
116
117    def running(self) -> bool:
118        """Checks if the main emulator process is already running."""
119
120        try:
121            return self._c().running()
122        except NotRunning:
123            return False
124
125    def _path(self, name: Path | str) -> Path | str:
126        """Returns the full path for a given emulator file."""
127
128        return os.path.join(self._wdir, name)
129
130    def stop(self):
131        """Stop the emulator."""
132
133        return self._c().stop()
134
135    def get_gdb_remote(self) -> str:
136        """Returns a string that can be passed to the target remote ``gdb``
137        command.
138
139        """
140
141        chan_type = self._c().get_channel_type('gdb')
142
143        if chan_type == 'tcp':
144            host, port = self._c().get_channel_addr('gdb')
145            return f'{host}:{port}'
146
147        if chan_type == 'pty':
148            return self._c().get_channel_path('gdb')
149
150        raise InvalidChannelType(chan_type)
151
152    def get_gdb_cmd(self) -> list[str]:
153        """Returns the ``gdb`` command for current target."""
154        return self._c().get_gdb_cmd()
155
156    def run_gdb_cmds(
157        self,
158        commands: list[str],
159        executable: str | None = None,
160        pause: bool = False,
161    ) -> subprocess.CompletedProcess:
162        """Connects to the target and runs the given commands silently
163        in batch mode.
164
165        ``executable`` is optional but may be required by some ``gdb`` commands.
166
167        If ``pause`` is set, execution stops after running the given commands.
168
169        """
170
171        cmd = self._c().get_gdb_cmd().copy()
172        if not cmd:
173            raise ConfigError(self._c().get_config_path(), 'gdb not configured')
174
175        cmd.append('-batch-silent')
176        cmd.append('-ex')
177        cmd.append(f'target remote {self.get_gdb_remote()}')
178        for gdb_cmd in commands:
179            cmd.append('-ex')
180            cmd.append(gdb_cmd)
181        if pause:
182            cmd.append('-ex')
183            cmd.append('disconnect')
184        if executable:
185            cmd.append(executable)
186        return subprocess.run(cmd, capture_output=True)
187
188    def reset(self) -> None:
189        """Performs a software reset."""
190        self._c().reset()
191
192    def list_properties(self, path: str) -> list[dict]:
193        """Returns the property list for an emulator object.
194
195        The object is identified by a full path. The path is
196        target-specific and the format of the path is backend-specific.
197
198        QEMU path example: ``/machine/unattached/device[10]``
199
200        renode path example: ``sysbus.uart``
201
202        """
203        return self._c().list_properties(path)
204
205    def set_property(self, path: str, prop: str, value: Any) -> None:
206        """Sets the value of an emulator's object property."""
207
208        self._c().set_property(path, prop, value)
209
210    def get_property(self, path: str, prop: str) -> Any:
211        """Returns the value of an emulator's object property."""
212
213        return self._c().get_property(path, prop)
214
215    def get_channel_type(self, name: str) -> str:
216        """Returns the channel type
217
218        Currently ``pty`` and ``tcp`` are the only supported types.
219
220        """
221
222        return self._c().get_channel_type(name)
223
224    def get_channel_path(self, name: str) -> str:
225        """Returns the channel path. Raises ``InvalidChannelType`` if this
226        is not a ``pty`` channel.
227
228        """
229
230        return self._c().get_channel_path(name)
231
232    def get_channel_addr(self, name: str) -> tuple:
233        """Returns a pair of ``(host, port)`` for the channel. Raises
234        ``InvalidChannelType`` if this is not a TCP channel.
235
236        """
237
238        return self._c().get_channel_addr(name)
239
240    def get_channel_stream(
241        self,
242        name: str,
243        timeout: float | None = None,
244    ) -> io.RawIOBase:
245        """Returns a file object for a given host-exposed device.
246
247        If ``timeout`` is ``None`` than reads and writes are blocking. If
248        ``timeout`` is ``0`` the stream is operating in non-blocking
249        mode. Otherwise read and write will timeout after the given
250        value.
251
252        """
253
254        return self._c().get_channel_stream(name, timeout)
255
256    def get_channels(self) -> list[str]:
257        """Returns the list of available channels."""
258
259        return self._c().get_channels()
260
261    def set_emu(self, emu: str) -> None:
262        """Sets the emulator type for this instance."""
263
264        self._launcher = Launcher.get(emu, self._config_path)
265
266    def cont(self) -> None:
267        """Resumes the emulator's execution."""
268
269        self._c().cont()
270
271
272class TemporaryEmulator(Emulator):
273    """Temporary emulator instances.
274
275    Manages emulator instances that run in temporary working
276    directories. The emulator instance is stopped and the working
277    directory is cleared when the ``with`` block completes.
278
279    It also supports interoperability with the ``pw_emu`` cli, e.g. starting the
280    emulator with the CLI and then controlling it from the Python API.
281
282    Usage example:
283
284    .. code-block:: python
285
286        # programatically start and load an executable then access it
287        with TemporaryEmulator() as emu:
288            emu.start(target, file)
289            with emu.get_channel_stream(chan) as stream:
290                ...
291
292    .. code-block:: python
293
294        # or start it form the command line then access it
295        with TemporaryEmulator() as emu:
296            build.bazel(
297                ctx,
298                "run",
299                exec_path,
300                "--run_under=pw emu start <target> --file "
301            )
302            with emu.get_channel_stream(chan) as stream:
303                ...
304
305    """
306
307    def __init__(
308        self,
309        config_path: Path | None = None,
310        cleanup: bool = True,
311    ) -> None:
312        self._temp = tempfile.TemporaryDirectory(prefix="pw_emu_")
313        self._cleanup = cleanup
314        super().__init__(Path(self._temp.name), config_path)
315
316    def __enter__(self):
317        # Interoperability with pw emu cli.
318        os.environ["PW_EMU_WDIR"] = str(self._wdir)
319        return self
320
321    def __exit__(self, exc, value, traceback) -> None:
322        self.stop()
323        del os.environ["PW_EMU_WDIR"]
324        if self._cleanup:
325            self._temp.cleanup()
326