1#!/usr/bin/env python 2# Copyright 2023 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""QEMU emulator tests.""" 16 17import json 18import os 19import socket 20import sys 21import tempfile 22import time 23import unittest 24 25from pathlib import Path 26from typing import Any 27 28from pw_emu.core import InvalidChannelName, InvalidChannelType 29from config_helper import check_prog, ConfigHelperWithEmulator 30 31 32# TODO: b/301382004 - The Python Pigweed package install (into python-venv) 33# races with running this test and there is no way to add that package as a test 34# depedency without creating circular depedencies. This means we can't rely on 35# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper. 36# 37# run the arm_gdb.py wrapper directly 38_arm_none_eabi_gdb_path = Path( 39 os.path.join( 40 os.environ['PW_ROOT'], 41 'pw_env_setup', 42 'py', 43 'pw_env_setup', 44 'entry_points', 45 'arm_gdb.py', 46 ) 47).resolve() 48 49 50class TestQemu(ConfigHelperWithEmulator): 51 """Tests for a valid qemu configuration.""" 52 53 _config = { 54 'gdb': ['python', str(_arm_none_eabi_gdb_path)], 55 'qemu': { 56 'executable': 'qemu-system-arm', 57 }, 58 'targets': { 59 'test-target': { 60 'ignore1': None, 61 'qemu': { 62 'machine': 'lm3s6965evb', 63 'channels': { 64 'chardevs': { 65 'test_uart': { 66 'id': 'serial0', 67 } 68 } 69 }, 70 }, 71 'ignore2': None, 72 } 73 }, 74 } 75 76 def setUp(self) -> None: 77 super().setUp() 78 # No image so start paused to avoid crashing. 79 self._emu.start(target='test-target', pause=True) 80 81 def tearDown(self) -> None: 82 self._emu.stop() 83 super().tearDown() 84 85 def test_running(self) -> None: 86 self.assertTrue(self._emu.running()) 87 88 def test_list_properties(self) -> None: 89 self.assertIsNotNone(self._emu.list_properties('/machine')) 90 91 def test_get_property(self) -> None: 92 self.assertEqual( 93 self._emu.get_property('/machine', 'type'), 'lm3s6965evb-machine' 94 ) 95 96 def test_set_property(self) -> None: 97 self._emu.set_property('/machine', 'graphics', False) 98 self.assertFalse(self._emu.get_property('/machine', 'graphics')) 99 100 def test_bad_channel_name(self) -> None: 101 with self.assertRaises(InvalidChannelName): 102 self._emu.get_channel_addr('serial1') 103 104 def get_reg(self, addr: int) -> bytes: 105 temp = tempfile.NamedTemporaryFile(delete=False) 106 temp.close() 107 108 res = self._emu.run_gdb_cmds( 109 [ 110 f'dump val {temp.name} *(char*){addr}', 111 'disconnect', 112 ] 113 ) 114 self.assertEqual(res.returncode, 0, res.stderr.decode('ascii')) 115 116 with open(temp.name, 'rb') as file: 117 ret = file.read(1) 118 119 self.assertNotEqual(ret, b'', res.stderr.decode('ascii')) 120 121 os.unlink(temp.name) 122 123 return ret 124 125 def poll_data(self, timeout: int) -> bytes | None: 126 uartris = 0x4000C03C 127 uartrd = 0x4000C000 128 129 deadline = time.monotonic() + timeout 130 while self.get_reg(uartris) == b'\x00': 131 time.sleep(0.1) 132 if time.monotonic() > deadline: 133 return None 134 return self.get_reg(uartrd) 135 136 def test_channel_stream(self) -> None: 137 ok, msg = check_prog('arm-none-eabi-gdb') 138 if not ok: 139 self.skipTest(msg) 140 141 stream = self._emu.get_channel_stream('test_uart') 142 stream.write('test\n'.encode('ascii')) 143 144 self.assertEqual(self.poll_data(5), b't') 145 self.assertEqual(self.poll_data(5), b'e') 146 self.assertEqual(self.poll_data(5), b's') 147 self.assertEqual(self.poll_data(5), b't') 148 149 def test_gdb(self) -> None: 150 self._emu.run_gdb_cmds(['c']) 151 deadline = time.monotonic() + 5 152 while self._emu.running(): 153 if time.monotonic() > deadline: 154 return 155 self.assertFalse(self._emu.running()) 156 157 158class TestQemuChannelsTcp(TestQemu): 159 """Tests for configurations using TCP channels.""" 160 161 _config: dict[str, Any] = {} 162 _config.update(json.loads(json.dumps(TestQemu._config))) 163 _config['qemu']['channels'] = {'type': 'tcp'} 164 165 def test_get_channel_addr(self) -> None: 166 host, port = self._emu.get_channel_addr('test_uart') 167 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 168 sock.connect((host, port)) 169 sock.close() 170 171 172class TestQemuChannelsPty(TestQemu): 173 """Tests for configurations using PTY channels.""" 174 175 _config: dict[str, Any] = {} 176 _config.update(json.loads(json.dumps(TestQemu._config))) 177 _config['qemu']['channels'] = {'type': 'pty'} 178 179 def setUp(self): 180 if sys.platform == 'win32': 181 self.skipTest('pty not supported on win32') 182 super().setUp() 183 184 def test_get_path(self) -> None: 185 self.assertTrue(os.path.exists(self._emu.get_channel_path('test_uart'))) 186 187 188class TestQemuInvalidChannelType(ConfigHelperWithEmulator): 189 """Test invalid channel type configuration.""" 190 191 _config = { 192 'qemu': { 193 'executable': 'qemu-system-arm', 194 'channels': {'type': 'invalid'}, 195 }, 196 'targets': { 197 'test-target': { 198 'qemu': { 199 'machine': 'lm3s6965evb', 200 } 201 } 202 }, 203 } 204 205 def test_start(self) -> None: 206 with self.assertRaises(InvalidChannelType): 207 self._emu.start('test-target', pause=True) 208 209 210class TestQemuTargetChannelsMixed(ConfigHelperWithEmulator): 211 """Test configuration with mixed channels types.""" 212 213 _config = { 214 'qemu': { 215 'executable': 'qemu-system-arm', 216 }, 217 'targets': { 218 'test-target': { 219 'qemu': { 220 'machine': 'lm3s6965evb', 221 'channels': { 222 'chardevs': { 223 'test_uart0': { 224 'id': 'serial0', 225 }, 226 'test_uart1': { 227 'id': 'serial1', 228 'type': 'tcp', 229 }, 230 'test_uart2': { 231 'id': 'serial2', 232 'type': 'pty', 233 }, 234 } 235 }, 236 } 237 } 238 }, 239 } 240 241 def setUp(self) -> None: 242 if sys.platform == 'win32': 243 self.skipTest('pty not supported on win32') 244 super().setUp() 245 # no image to run so start paused 246 self._emu.start('test-target', pause=True) 247 248 def tearDown(self) -> None: 249 self._emu.stop() 250 super().tearDown() 251 252 def test_uart0_addr(self) -> None: 253 host, port = self._emu.get_channel_addr('test_uart0') 254 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 255 sock.connect((host, port)) 256 sock.close() 257 258 def test_uart1_addr(self) -> None: 259 host, port = self._emu.get_channel_addr('test_uart1') 260 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 261 sock.connect((host, port)) 262 sock.close() 263 264 def test_uart2_path(self) -> None: 265 self.assertTrue( 266 os.path.exists(self._emu.get_channel_path('test_uart2')) 267 ) 268 269 270def main() -> None: 271 ok, msg = check_prog('qemu-system-arm') 272 if not ok: 273 print(f'skipping tests: {msg}') 274 sys.exit(0) 275 276 unittest.main() 277 278 279if __name__ == '__main__': 280 main() 281