xref: /aosp_15_r20/external/pigweed/pw_emu/py/tests/qemu_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""QEMU emulator tests."""
16
17import json
18import os
19import socket
20import sys
21import tempfile
22import time
23import unittest
24
25from pathlib import Path
26from typing import Any
27
28from pw_emu.core import InvalidChannelName, InvalidChannelType
29from config_helper import check_prog, ConfigHelperWithEmulator
30
31
32# TODO: b/301382004 - The Python Pigweed package install (into python-venv)
33# races with running this test and there is no way to add that package as a test
34# depedency without creating circular depedencies. This means we can't rely on
35# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper.
36#
37# run the arm_gdb.py wrapper directly
38_arm_none_eabi_gdb_path = Path(
39    os.path.join(
40        os.environ['PW_ROOT'],
41        'pw_env_setup',
42        'py',
43        'pw_env_setup',
44        'entry_points',
45        'arm_gdb.py',
46    )
47).resolve()
48
49
50class TestQemu(ConfigHelperWithEmulator):
51    """Tests for a valid qemu configuration."""
52
53    _config = {
54        'gdb': ['python', str(_arm_none_eabi_gdb_path)],
55        'qemu': {
56            'executable': 'qemu-system-arm',
57        },
58        'targets': {
59            'test-target': {
60                'ignore1': None,
61                'qemu': {
62                    'machine': 'lm3s6965evb',
63                    'channels': {
64                        'chardevs': {
65                            'test_uart': {
66                                'id': 'serial0',
67                            }
68                        }
69                    },
70                },
71                'ignore2': None,
72            }
73        },
74    }
75
76    def setUp(self) -> None:
77        super().setUp()
78        # No image so start paused to avoid crashing.
79        self._emu.start(target='test-target', pause=True)
80
81    def tearDown(self) -> None:
82        self._emu.stop()
83        super().tearDown()
84
85    def test_running(self) -> None:
86        self.assertTrue(self._emu.running())
87
88    def test_list_properties(self) -> None:
89        self.assertIsNotNone(self._emu.list_properties('/machine'))
90
91    def test_get_property(self) -> None:
92        self.assertEqual(
93            self._emu.get_property('/machine', 'type'), 'lm3s6965evb-machine'
94        )
95
96    def test_set_property(self) -> None:
97        self._emu.set_property('/machine', 'graphics', False)
98        self.assertFalse(self._emu.get_property('/machine', 'graphics'))
99
100    def test_bad_channel_name(self) -> None:
101        with self.assertRaises(InvalidChannelName):
102            self._emu.get_channel_addr('serial1')
103
104    def get_reg(self, addr: int) -> bytes:
105        temp = tempfile.NamedTemporaryFile(delete=False)
106        temp.close()
107
108        res = self._emu.run_gdb_cmds(
109            [
110                f'dump val {temp.name} *(char*){addr}',
111                'disconnect',
112            ]
113        )
114        self.assertEqual(res.returncode, 0, res.stderr.decode('ascii'))
115
116        with open(temp.name, 'rb') as file:
117            ret = file.read(1)
118
119        self.assertNotEqual(ret, b'', res.stderr.decode('ascii'))
120
121        os.unlink(temp.name)
122
123        return ret
124
125    def poll_data(self, timeout: int) -> bytes | None:
126        uartris = 0x4000C03C
127        uartrd = 0x4000C000
128
129        deadline = time.monotonic() + timeout
130        while self.get_reg(uartris) == b'\x00':
131            time.sleep(0.1)
132            if time.monotonic() > deadline:
133                return None
134        return self.get_reg(uartrd)
135
136    def test_channel_stream(self) -> None:
137        ok, msg = check_prog('arm-none-eabi-gdb')
138        if not ok:
139            self.skipTest(msg)
140
141        stream = self._emu.get_channel_stream('test_uart')
142        stream.write('test\n'.encode('ascii'))
143
144        self.assertEqual(self.poll_data(5), b't')
145        self.assertEqual(self.poll_data(5), b'e')
146        self.assertEqual(self.poll_data(5), b's')
147        self.assertEqual(self.poll_data(5), b't')
148
149    def test_gdb(self) -> None:
150        self._emu.run_gdb_cmds(['c'])
151        deadline = time.monotonic() + 5
152        while self._emu.running():
153            if time.monotonic() > deadline:
154                return
155        self.assertFalse(self._emu.running())
156
157
158class TestQemuChannelsTcp(TestQemu):
159    """Tests for configurations using TCP channels."""
160
161    _config: dict[str, Any] = {}
162    _config.update(json.loads(json.dumps(TestQemu._config)))
163    _config['qemu']['channels'] = {'type': 'tcp'}
164
165    def test_get_channel_addr(self) -> None:
166        host, port = self._emu.get_channel_addr('test_uart')
167        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
168        sock.connect((host, port))
169        sock.close()
170
171
172class TestQemuChannelsPty(TestQemu):
173    """Tests for configurations using PTY channels."""
174
175    _config: dict[str, Any] = {}
176    _config.update(json.loads(json.dumps(TestQemu._config)))
177    _config['qemu']['channels'] = {'type': 'pty'}
178
179    def setUp(self):
180        if sys.platform == 'win32':
181            self.skipTest('pty not supported on win32')
182        super().setUp()
183
184    def test_get_path(self) -> None:
185        self.assertTrue(os.path.exists(self._emu.get_channel_path('test_uart')))
186
187
188class TestQemuInvalidChannelType(ConfigHelperWithEmulator):
189    """Test invalid channel type configuration."""
190
191    _config = {
192        'qemu': {
193            'executable': 'qemu-system-arm',
194            'channels': {'type': 'invalid'},
195        },
196        'targets': {
197            'test-target': {
198                'qemu': {
199                    'machine': 'lm3s6965evb',
200                }
201            }
202        },
203    }
204
205    def test_start(self) -> None:
206        with self.assertRaises(InvalidChannelType):
207            self._emu.start('test-target', pause=True)
208
209
210class TestQemuTargetChannelsMixed(ConfigHelperWithEmulator):
211    """Test configuration with mixed channels types."""
212
213    _config = {
214        'qemu': {
215            'executable': 'qemu-system-arm',
216        },
217        'targets': {
218            'test-target': {
219                'qemu': {
220                    'machine': 'lm3s6965evb',
221                    'channels': {
222                        'chardevs': {
223                            'test_uart0': {
224                                'id': 'serial0',
225                            },
226                            'test_uart1': {
227                                'id': 'serial1',
228                                'type': 'tcp',
229                            },
230                            'test_uart2': {
231                                'id': 'serial2',
232                                'type': 'pty',
233                            },
234                        }
235                    },
236                }
237            }
238        },
239    }
240
241    def setUp(self) -> None:
242        if sys.platform == 'win32':
243            self.skipTest('pty not supported on win32')
244        super().setUp()
245        # no image to run so start paused
246        self._emu.start('test-target', pause=True)
247
248    def tearDown(self) -> None:
249        self._emu.stop()
250        super().tearDown()
251
252    def test_uart0_addr(self) -> None:
253        host, port = self._emu.get_channel_addr('test_uart0')
254        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
255        sock.connect((host, port))
256        sock.close()
257
258    def test_uart1_addr(self) -> None:
259        host, port = self._emu.get_channel_addr('test_uart1')
260        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
261        sock.connect((host, port))
262        sock.close()
263
264    def test_uart2_path(self) -> None:
265        self.assertTrue(
266            os.path.exists(self._emu.get_channel_path('test_uart2'))
267        )
268
269
270def main() -> None:
271    ok, msg = check_prog('qemu-system-arm')
272    if not ok:
273        print(f'skipping tests: {msg}')
274        sys.exit(0)
275
276    unittest.main()
277
278
279if __name__ == '__main__':
280    main()
281