1#!/usr/bin/env python 2# Copyright 2023 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""User API""" 16 17import io 18import os 19import subprocess 20import tempfile 21 22from pathlib import Path 23from typing import Any 24 25from pw_emu.core import ( 26 AlreadyRunning, 27 Config, 28 ConfigError, 29 Connector, 30 Launcher, 31 InvalidEmulator, 32 InvalidChannelType, 33 NotRunning, 34) 35 36 37class Emulator: 38 """Launches, controls and interacts with an emulator instance.""" 39 40 def __init__(self, wdir: Path, config_path: Path | None = None) -> None: 41 self._wdir = wdir 42 self._config_path = config_path 43 self._connector: Connector | None = None 44 self._launcher: Launcher | None = None 45 46 def _get_launcher(self, target: str) -> Launcher: 47 """Returns an emulator for a given target. 48 49 If there are multiple emulators for the same target it will return 50 an arbitrary emulator launcher. 51 """ 52 config = Config(self._config_path) 53 target_config = config.get( 54 ['targets', target], 55 optional=False, 56 entry_type=dict, 57 ) 58 for key in target_config.keys(): 59 try: 60 return Launcher.get(key, self._config_path) 61 except InvalidEmulator: 62 pass 63 raise ConfigError( 64 self._config_path, 65 f'could not determine emulator for target `{target}`', 66 ) 67 68 def start( 69 self, 70 target: str, 71 file: Path | None = None, 72 pause: bool = False, 73 debug: bool = False, 74 foreground: bool = False, 75 args: str | None = None, 76 ) -> None: 77 """Starts the emulator for the given ``target``. 78 79 If ``file`` is set the emulator loads the file before starting. 80 81 If ``pause`` is ``True`` the emulator pauses until the debugger is 82 connected. 83 84 If ``debug`` is ``True`` the emulator runs in the foreground with debug 85 output enabled. This is useful for seeing errors, traces, etc. 86 87 If ``foreground`` is ``True`` the emulator runs in the foreground, 88 otherwise it starts in daemon mode. Foreground mode is useful when 89 there is another process controlling the emulator's life cycle, 90 e.g. cuttlefish. 91 92 ``args`` are passed directly to the emulator. 93 94 """ 95 if self._connector: 96 raise AlreadyRunning(self._wdir) 97 98 if self._launcher is None: 99 self._launcher = self._get_launcher(target) 100 self._connector = self._launcher.start( 101 wdir=self._wdir, 102 target=target, 103 file=file, 104 pause=pause, 105 debug=debug, 106 foreground=foreground, 107 args=args, 108 ) 109 110 def _c(self) -> Connector: 111 if self._connector is None: 112 self._connector = Connector.get(self._wdir) 113 if not self.running(): 114 raise NotRunning(self._wdir) 115 return self._connector 116 117 def running(self) -> bool: 118 """Checks if the main emulator process is already running.""" 119 120 try: 121 return self._c().running() 122 except NotRunning: 123 return False 124 125 def _path(self, name: Path | str) -> Path | str: 126 """Returns the full path for a given emulator file.""" 127 128 return os.path.join(self._wdir, name) 129 130 def stop(self): 131 """Stop the emulator.""" 132 133 return self._c().stop() 134 135 def get_gdb_remote(self) -> str: 136 """Returns a string that can be passed to the target remote ``gdb`` 137 command. 138 139 """ 140 141 chan_type = self._c().get_channel_type('gdb') 142 143 if chan_type == 'tcp': 144 host, port = self._c().get_channel_addr('gdb') 145 return f'{host}:{port}' 146 147 if chan_type == 'pty': 148 return self._c().get_channel_path('gdb') 149 150 raise InvalidChannelType(chan_type) 151 152 def get_gdb_cmd(self) -> list[str]: 153 """Returns the ``gdb`` command for current target.""" 154 return self._c().get_gdb_cmd() 155 156 def run_gdb_cmds( 157 self, 158 commands: list[str], 159 executable: str | None = None, 160 pause: bool = False, 161 ) -> subprocess.CompletedProcess: 162 """Connects to the target and runs the given commands silently 163 in batch mode. 164 165 ``executable`` is optional but may be required by some ``gdb`` commands. 166 167 If ``pause`` is set, execution stops after running the given commands. 168 169 """ 170 171 cmd = self._c().get_gdb_cmd().copy() 172 if not cmd: 173 raise ConfigError(self._c().get_config_path(), 'gdb not configured') 174 175 cmd.append('-batch-silent') 176 cmd.append('-ex') 177 cmd.append(f'target remote {self.get_gdb_remote()}') 178 for gdb_cmd in commands: 179 cmd.append('-ex') 180 cmd.append(gdb_cmd) 181 if pause: 182 cmd.append('-ex') 183 cmd.append('disconnect') 184 if executable: 185 cmd.append(executable) 186 return subprocess.run(cmd, capture_output=True) 187 188 def reset(self) -> None: 189 """Performs a software reset.""" 190 self._c().reset() 191 192 def list_properties(self, path: str) -> list[dict]: 193 """Returns the property list for an emulator object. 194 195 The object is identified by a full path. The path is 196 target-specific and the format of the path is backend-specific. 197 198 QEMU path example: ``/machine/unattached/device[10]`` 199 200 renode path example: ``sysbus.uart`` 201 202 """ 203 return self._c().list_properties(path) 204 205 def set_property(self, path: str, prop: str, value: Any) -> None: 206 """Sets the value of an emulator's object property.""" 207 208 self._c().set_property(path, prop, value) 209 210 def get_property(self, path: str, prop: str) -> Any: 211 """Returns the value of an emulator's object property.""" 212 213 return self._c().get_property(path, prop) 214 215 def get_channel_type(self, name: str) -> str: 216 """Returns the channel type 217 218 Currently ``pty`` and ``tcp`` are the only supported types. 219 220 """ 221 222 return self._c().get_channel_type(name) 223 224 def get_channel_path(self, name: str) -> str: 225 """Returns the channel path. Raises ``InvalidChannelType`` if this 226 is not a ``pty`` channel. 227 228 """ 229 230 return self._c().get_channel_path(name) 231 232 def get_channel_addr(self, name: str) -> tuple: 233 """Returns a pair of ``(host, port)`` for the channel. Raises 234 ``InvalidChannelType`` if this is not a TCP channel. 235 236 """ 237 238 return self._c().get_channel_addr(name) 239 240 def get_channel_stream( 241 self, 242 name: str, 243 timeout: float | None = None, 244 ) -> io.RawIOBase: 245 """Returns a file object for a given host-exposed device. 246 247 If ``timeout`` is ``None`` than reads and writes are blocking. If 248 ``timeout`` is ``0`` the stream is operating in non-blocking 249 mode. Otherwise read and write will timeout after the given 250 value. 251 252 """ 253 254 return self._c().get_channel_stream(name, timeout) 255 256 def get_channels(self) -> list[str]: 257 """Returns the list of available channels.""" 258 259 return self._c().get_channels() 260 261 def set_emu(self, emu: str) -> None: 262 """Sets the emulator type for this instance.""" 263 264 self._launcher = Launcher.get(emu, self._config_path) 265 266 def cont(self) -> None: 267 """Resumes the emulator's execution.""" 268 269 self._c().cont() 270 271 272class TemporaryEmulator(Emulator): 273 """Temporary emulator instances. 274 275 Manages emulator instances that run in temporary working 276 directories. The emulator instance is stopped and the working 277 directory is cleared when the ``with`` block completes. 278 279 It also supports interoperability with the ``pw_emu`` cli, e.g. starting the 280 emulator with the CLI and then controlling it from the Python API. 281 282 Usage example: 283 284 .. code-block:: python 285 286 # programatically start and load an executable then access it 287 with TemporaryEmulator() as emu: 288 emu.start(target, file) 289 with emu.get_channel_stream(chan) as stream: 290 ... 291 292 .. code-block:: python 293 294 # or start it form the command line then access it 295 with TemporaryEmulator() as emu: 296 build.bazel( 297 ctx, 298 "run", 299 exec_path, 300 "--run_under=pw emu start <target> --file " 301 ) 302 with emu.get_channel_stream(chan) as stream: 303 ... 304 305 """ 306 307 def __init__( 308 self, 309 config_path: Path | None = None, 310 cleanup: bool = True, 311 ) -> None: 312 self._temp = tempfile.TemporaryDirectory(prefix="pw_emu_") 313 self._cleanup = cleanup 314 super().__init__(Path(self._temp.name), config_path) 315 316 def __enter__(self): 317 # Interoperability with pw emu cli. 318 os.environ["PW_EMU_WDIR"] = str(self._wdir) 319 return self 320 321 def __exit__(self, exc, value, traceback) -> None: 322 self.stop() 323 del os.environ["PW_EMU_WDIR"] 324 if self._cleanup: 325 self._temp.cleanup() 326