1#!/usr/bin/env python 2# Copyright 2023 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Launcher and Connector for mock emulator""" 16 17import json 18import os 19from pathlib import Path 20from typing import Any 21import time 22 23from pw_emu.core import ( 24 Connector, 25 Launcher, 26 InvalidProperty, 27 InvalidPropertyPath, 28) 29 30# mock emulator script 31_mock_emu = [ 32 'python', 33 os.path.join(Path(os.path.dirname(__file__)).resolve(), 'mock_emu.py'), 34] 35 36 37def wait_for_file_size( 38 path: os.PathLike | str, size: int, timeout: int = 5 39) -> None: 40 deadline = time.monotonic() + timeout 41 while not os.path.exists(path): 42 if time.monotonic() > deadline: 43 break 44 time.sleep(0.1) 45 46 while os.path.getsize(path) < size: 47 if time.monotonic() > deadline: 48 break 49 time.sleep(0.1) 50 51 52class MockEmuLauncher(Launcher): 53 """Launcher for mock emulator""" 54 55 def __init__( 56 self, 57 config_path: Path, 58 ): 59 super().__init__('mock-emu', config_path) 60 self._wdir: Path | None = None 61 self.log = True 62 63 def _pre_start( 64 self, 65 target: str, 66 file: Path | None = None, 67 pause: bool = False, 68 debug: bool = False, 69 args: str | None = None, 70 ) -> list[str]: 71 channels = [] 72 if self._config.get_target(['pre-start-cmds']): 73 self._handles.add_channel_tcp('test_subst_tcp', 'localhost', 1234) 74 self._handles.add_channel_pty('test_subst_pty', 'pty-path') 75 if self._config.get_emu(['gdb_channel']): 76 channels += ['--tcp-channel', 'gdb'] 77 if self._config.get_emu(['tcp_channel']): 78 channels += ['--tcp-channel', 'tcp'] 79 if self._config.get_emu(['pty_channel']): 80 channels += ['--pty-channel', 'pty'] 81 if len(channels) > 0: 82 channels += ['--working-dir', str(self._wdir)] 83 return _mock_emu + channels + ['starting mock emulator'] 84 85 def _post_start(self) -> None: 86 if not self._wdir: 87 return 88 89 if self._config.get_emu(['gdb_channel']): 90 path = os.path.join(self._wdir, 'gdb') 91 wait_for_file_size(path, 5, 5) 92 with open(path, 'r') as file: 93 port = int(file.read()) 94 self._handles.add_channel_tcp('gdb', 'localhost', port) 95 96 if self._config.get_emu(['tcp_channel']): 97 path = os.path.join(self._wdir, 'tcp') 98 wait_for_file_size(path, 5, 5) 99 with open(path, 'r') as file: 100 port = int(file.read()) 101 self._handles.add_channel_tcp('tcp', 'localhost', port) 102 103 if self._config.get_emu(['pty_channel']): 104 path = os.path.join(self._wdir, 'pty') 105 wait_for_file_size(path, 5, 5) 106 with open(path, 'r') as file: 107 pty_path = file.read() 108 self._handles.add_channel_pty('pty', pty_path) 109 110 def _get_connector(self, wdir: Path) -> Connector: 111 return MockEmuConnector(wdir) 112 113 114class MockEmuConnector(Connector): 115 """Connector for mock emulator""" 116 117 _props = { 118 'path1': { 119 'prop1': 'val1', 120 } 121 } 122 123 def reset(self) -> None: 124 Path(os.path.join(self._wdir, 'reset')).touch() 125 126 def cont(self) -> None: 127 Path(os.path.join(self._wdir, 'cont')).touch() 128 129 def list_properties(self, path: str) -> list[Any]: 130 try: 131 return list(self._props[path].keys()) 132 except KeyError: 133 raise InvalidPropertyPath(path) 134 135 def set_property(self, path: str, prop: str, value: str) -> None: 136 if not self._props.get(path): 137 raise InvalidPropertyPath(path) 138 if not self._props[path].get(prop): 139 raise InvalidProperty(path, prop) 140 self._props[path][prop] = value 141 with open(os.path.join(self._wdir, 'props.json'), 'w') as file: 142 json.dump(self._props, file) 143 144 def get_property(self, path: str, prop: str) -> Any: 145 try: 146 with open(os.path.join(self._wdir, 'props.json'), 'r') as file: 147 self._props = json.load(file) 148 except OSError: 149 pass 150 if not self._props.get(path): 151 raise InvalidPropertyPath(path) 152 if not self._props[path].get(prop): 153 raise InvalidProperty(path, prop) 154 return self._props[path][prop] 155