xref: /aosp_15_r20/external/pigweed/pw_emu/py/pw_emu/renode.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2023 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Pigweed renode frontend."""
15
16import socket
17import time
18import xmlrpc.client
19
20from pathlib import Path
21from typing import Any
22
23from pw_emu.core import (
24    Connector,
25    Error,
26    Handles,
27    InvalidChannelType,
28    Launcher,
29    RunError,
30    WrongEmulator,
31)
32
33
34class RenodeRobotError(Error):
35    """Exception for Renode robot errors."""
36
37    def __init__(self, err: str):
38        super().__init__(err)
39
40
41class RenodeLauncher(Launcher):
42    """Start a new renode process for a given target and config file."""
43
44    def __init__(self, config_path: Path | None = None):
45        super().__init__('renode', config_path)
46        self._start_cmd: list[str] = []
47
48    @staticmethod
49    def _allocate_port() -> int:
50        """Allocate renode ports.
51
52        This is inherently racy but renode currently does not have proper
53        support for dynamic ports. It accecept 0 as a port and the OS allocates
54        a dynamic port but there is no API to retrive the port.
55
56        """
57        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
58        sock.bind(('localhost', 0))
59        port = sock.getsockname()[1]
60        sock.close()
61
62        return port
63
64    def _pre_start(
65        self,
66        target: str,
67        file: Path | None = None,
68        pause: bool = False,
69        debug: bool = False,
70        args: str | None = None,
71    ) -> list[str]:
72        renode = self._config.get_target_emu(['executable'])
73        if not renode:
74            renode = self._config.get_emu(['executable'], optional=False)
75
76        self._start_cmd.extend([f'{renode}', '--disable-xwt'])
77        port = self._allocate_port()
78        self._start_cmd.extend(['--robot-server-port', str(port)])
79        self._handles.add_channel_tcp('robot', 'localhost', port)
80
81        machine = self._config.get_target_emu(['machine'], optional=False)
82        self._start_cmd.extend(['--execute', f'mach add "{target}"'])
83
84        self._start_cmd.extend(
85            ['--execute', f'machine LoadPlatformDescription @{machine}']
86        )
87
88        terms = self._config.get_target_emu(
89            ['channels', 'terminals'], entry_type=dict
90        )
91        for name in terms.keys():
92            port = self._allocate_port()
93            dev_path = self._config.get_target_emu(
94                ['channels', 'terminals', name, 'device-path'],
95                optional=False,
96                entry_type=str,
97            )
98            term_type = self._config.get_target_emu(
99                ['channels', 'terminals', name, 'type'],
100                entry_type=str,
101            )
102            if not term_type:
103                term_type = self._config.get_emu(
104                    ['channels', 'terminals', 'type'],
105                    entry_type=str,
106                )
107            if not term_type:
108                term_type = 'tcp'
109
110            cmd = 'emulation '
111            if term_type == 'tcp':
112                cmd += f'CreateServerSocketTerminal {port} "{name}" false'
113                self._handles.add_channel_tcp(name, 'localhost', port)
114            elif term_type == 'pty':
115                path = self._path(name)
116                cmd += f'CreateUartPtyTerminal "{name}" "{path}"'
117                self._handles.add_channel_pty(name, str(path))
118            else:
119                raise InvalidChannelType(term_type)
120
121            self._start_cmd.extend(['--execute', cmd])
122            self._start_cmd.extend(
123                ['--execute', f'connector Connect {dev_path} {name}']
124            )
125
126        port = self._allocate_port()
127        self._start_cmd.extend(['--execute', f'machine StartGdbServer {port}'])
128        self._handles.add_channel_tcp('gdb', 'localhost', port)
129
130        if file:
131            self._start_cmd.extend(['--execute', f'sysbus LoadELF @{file}'])
132
133        if not pause:
134            self._start_cmd.extend(['--execute', 'start'])
135
136        return self._start_cmd
137
138    def _post_start(self) -> None:
139        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
140        robot = self._handles.channels['gdb']
141        assert isinstance(robot, Handles.TcpChannel)
142
143        # renode is slow to start especially during host load
144        deadline = time.monotonic() + 120
145        connected = False
146        err = None
147        while time.monotonic() < deadline:
148            try:
149                sock.connect((robot.host, robot.port))
150                connected = True
151                break
152            except OSError as exc:
153                err = exc
154            time.sleep(1)
155
156        if not connected:
157            msg = 'failed to connect to robot channel'
158            msg += f'({robot.host}:{robot.port}): {err}'
159            raise RunError('renode', msg)
160
161        sock.close()
162
163    def _get_connector(self, wdir: Path) -> Connector:
164        return RenodeConnector(wdir)
165
166
167class RenodeConnector(Connector):
168    """renode implementation for the emulator specific connector methods."""
169
170    def __init__(self, wdir: Path) -> None:
171        super().__init__(wdir)
172        if self.get_emu() != 'renode':
173            raise WrongEmulator('renode', self.get_emu())
174        robot = self._handles.channels['robot']
175        host = robot.host
176        port = robot.port
177        self._proxy = xmlrpc.client.ServerProxy(f'http://{host}:{port}/')
178
179    def _request(self, cmd: str, args: list[str]) -> Any:
180        """Send a request using the robot interface.
181
182        Using the robot interface is not ideal since it is designed
183        for testing. However, it is more robust than the ANSI colored,
184        echoed, log mixed, telnet interface.
185
186        """
187
188        resp = self._proxy.run_keyword(cmd, args)
189        if not isinstance(resp, dict):
190            raise RenodeRobotError('expected dictionary in response')
191        if resp['status'] != 'PASS':
192            raise RenodeRobotError(resp['error'])
193        if resp.get('return'):
194            return resp['return']
195        return None
196
197    def reset(self) -> None:
198        self._request('ResetEmulation', [])
199
200    def cont(self) -> None:
201        self._request('StartEmulation', [])
202
203    def list_properties(self, path: str) -> list[Any]:
204        return self._request('ExecuteCommand', [f'{path}'])
205
206    def get_property(self, path: str, prop: str) -> Any:
207        return self._request('ExecuteCommand', [f'{path} {prop}'])
208
209    def set_property(self, path: str, prop: str, value: Any) -> None:
210        return self._request('ExecuteCommand', [f'{path} {prop} {value}'])
211