xref: /aosp_15_r20/external/pigweed/pw_emu/py/mock_emu_frontend.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Launcher and Connector for mock emulator"""
16
17import json
18import os
19from pathlib import Path
20from typing import Any
21import time
22
23from pw_emu.core import (
24    Connector,
25    Launcher,
26    InvalidProperty,
27    InvalidPropertyPath,
28)
29
30# mock emulator script
31_mock_emu = [
32    'python',
33    os.path.join(Path(os.path.dirname(__file__)).resolve(), 'mock_emu.py'),
34]
35
36
37def wait_for_file_size(
38    path: os.PathLike | str, size: int, timeout: int = 5
39) -> None:
40    deadline = time.monotonic() + timeout
41    while not os.path.exists(path):
42        if time.monotonic() > deadline:
43            break
44        time.sleep(0.1)
45
46    while os.path.getsize(path) < size:
47        if time.monotonic() > deadline:
48            break
49        time.sleep(0.1)
50
51
52class MockEmuLauncher(Launcher):
53    """Launcher for mock emulator"""
54
55    def __init__(
56        self,
57        config_path: Path,
58    ):
59        super().__init__('mock-emu', config_path)
60        self._wdir: Path | None = None
61        self.log = True
62
63    def _pre_start(
64        self,
65        target: str,
66        file: Path | None = None,
67        pause: bool = False,
68        debug: bool = False,
69        args: str | None = None,
70    ) -> list[str]:
71        channels = []
72        if self._config.get_target(['pre-start-cmds']):
73            self._handles.add_channel_tcp('test_subst_tcp', 'localhost', 1234)
74            self._handles.add_channel_pty('test_subst_pty', 'pty-path')
75        if self._config.get_emu(['gdb_channel']):
76            channels += ['--tcp-channel', 'gdb']
77        if self._config.get_emu(['tcp_channel']):
78            channels += ['--tcp-channel', 'tcp']
79        if self._config.get_emu(['pty_channel']):
80            channels += ['--pty-channel', 'pty']
81        if len(channels) > 0:
82            channels += ['--working-dir', str(self._wdir)]
83        return _mock_emu + channels + ['starting mock emulator']
84
85    def _post_start(self) -> None:
86        if not self._wdir:
87            return
88
89        if self._config.get_emu(['gdb_channel']):
90            path = os.path.join(self._wdir, 'gdb')
91            wait_for_file_size(path, 5, 5)
92            with open(path, 'r') as file:
93                port = int(file.read())
94            self._handles.add_channel_tcp('gdb', 'localhost', port)
95
96        if self._config.get_emu(['tcp_channel']):
97            path = os.path.join(self._wdir, 'tcp')
98            wait_for_file_size(path, 5, 5)
99            with open(path, 'r') as file:
100                port = int(file.read())
101            self._handles.add_channel_tcp('tcp', 'localhost', port)
102
103        if self._config.get_emu(['pty_channel']):
104            path = os.path.join(self._wdir, 'pty')
105            wait_for_file_size(path, 5, 5)
106            with open(path, 'r') as file:
107                pty_path = file.read()
108            self._handles.add_channel_pty('pty', pty_path)
109
110    def _get_connector(self, wdir: Path) -> Connector:
111        return MockEmuConnector(wdir)
112
113
114class MockEmuConnector(Connector):
115    """Connector for mock emulator"""
116
117    _props = {
118        'path1': {
119            'prop1': 'val1',
120        }
121    }
122
123    def reset(self) -> None:
124        Path(os.path.join(self._wdir, 'reset')).touch()
125
126    def cont(self) -> None:
127        Path(os.path.join(self._wdir, 'cont')).touch()
128
129    def list_properties(self, path: str) -> list[Any]:
130        try:
131            return list(self._props[path].keys())
132        except KeyError:
133            raise InvalidPropertyPath(path)
134
135    def set_property(self, path: str, prop: str, value: str) -> None:
136        if not self._props.get(path):
137            raise InvalidPropertyPath(path)
138        if not self._props[path].get(prop):
139            raise InvalidProperty(path, prop)
140        self._props[path][prop] = value
141        with open(os.path.join(self._wdir, 'props.json'), 'w') as file:
142            json.dump(self._props, file)
143
144    def get_property(self, path: str, prop: str) -> Any:
145        try:
146            with open(os.path.join(self._wdir, 'props.json'), 'r') as file:
147                self._props = json.load(file)
148        except OSError:
149            pass
150        if not self._props.get(path):
151            raise InvalidPropertyPath(path)
152        if not self._props[path].get(prop):
153            raise InvalidProperty(path, prop)
154        return self._props[path][prop]
155