xref: /aosp_15_r20/external/pigweed/targets/rp2040/py/rp2040_utils/rpc_unit_test_runner.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2024 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""This script flashes and runs RPC unit tests on Raspberry Pi Pico boards."""
16
17import argparse
18import logging
19import sys
20from pathlib import Path
21
22import serial  # type: ignore
23
24import pw_cli.log
25from pw_hdlc import rpc
26from pw_log.proto import log_pb2
27from pw_rpc.callback_client.errors import RpcTimeout
28from pw_system import device
29from pw_tokenizer import detokenize
30from pw_unit_test_proto import unit_test_pb2
31
32from rp2040_utils.device_detector import PicoBoardInfo
33from rp2040_utils.flasher import (
34    create_flash_parser,
35    device_from_args,
36    flash,
37    find_elf,
38)
39
40_LOG = logging.getLogger()
41
42
43def create_test_runner_parser() -> argparse.ArgumentParser:
44    """Parses command-line arguments."""
45    parser = create_flash_parser()
46    parser.description = __doc__
47    parser.add_argument(
48        '--test-timeout',
49        type=float,
50        default=5.0,
51        help='Maximum communication delay in seconds before a '
52        'test is considered unresponsive and aborted',
53    )
54    return parser
55
56
57def run_test_on_board(
58    board: PicoBoardInfo,
59    baud_rate: int,
60    chip: str,
61    binary: Path,
62    test_timeout_seconds: float,
63) -> bool:
64    """Run an RPC unit test on this device.
65
66    Returns whether it succeeded.
67    """
68    if not flash(board, chip, binary):
69        return False
70    serial_device = serial.Serial(board.serial_port, baud_rate, timeout=0.1)
71    reader = rpc.SerialReader(serial_device, 8192)
72    elf_path = find_elf(binary)
73    if not elf_path:
74        return False
75    with device.Device(
76        channel_id=rpc.DEFAULT_CHANNEL_ID,
77        reader=reader,
78        write=serial_device.write,
79        proto_library=[log_pb2, unit_test_pb2],
80        detokenizer=detokenize.Detokenizer(elf_path),
81    ) as dev:
82        try:
83            test_results = dev.run_tests(test_timeout_seconds)
84            _LOG.info('Test run complete')
85        except RpcTimeout:
86            _LOG.error('Test timed out after %s seconds.', test_timeout_seconds)
87            return False
88        if not test_results.all_tests_passed():
89            return False
90    return True
91
92
93def main():
94    """Set up runner and then flash/run device test."""
95    args = create_test_runner_parser().parse_args()
96    # Log to stdout, which will be captured by the unit test server.
97    pw_cli.log.install(
98        level=logging.DEBUG if args.verbose else logging.INFO,
99    )
100    board = device_from_args(args, interactive=False)
101    test_passed = run_test_on_board(
102        board, args.baud, args.chip, args.binary, args.test_timeout
103    )
104    sys.exit(0 if test_passed else 1)
105
106
107if __name__ == '__main__':
108    main()
109