xref: /aosp_15_r20/external/pigweed/pw_emu/py/mock_emu.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker#!/usr/bin/env python
2*61c4878aSAndroid Build Coastguard Worker# Copyright 2023 The Pigweed Authors
3*61c4878aSAndroid Build Coastguard Worker#
4*61c4878aSAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5*61c4878aSAndroid Build Coastguard Worker# use this file except in compliance with the License. You may obtain a copy of
6*61c4878aSAndroid Build Coastguard Worker# the License at
7*61c4878aSAndroid Build Coastguard Worker#
8*61c4878aSAndroid Build Coastguard Worker#     https://www.apache.org/licenses/LICENSE-2.0
9*61c4878aSAndroid Build Coastguard Worker#
10*61c4878aSAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software
11*61c4878aSAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12*61c4878aSAndroid Build Coastguard Worker# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13*61c4878aSAndroid Build Coastguard Worker# License for the specific language governing permissions and limitations under
14*61c4878aSAndroid Build Coastguard Worker# the License.
15*61c4878aSAndroid Build Coastguard Worker"""Mock emulator used for testing process and channel management."""
16*61c4878aSAndroid Build Coastguard Worker
17*61c4878aSAndroid Build Coastguard Workerimport argparse
18*61c4878aSAndroid Build Coastguard Workerimport os
19*61c4878aSAndroid Build Coastguard Workerimport socket
20*61c4878aSAndroid Build Coastguard Workerimport sys
21*61c4878aSAndroid Build Coastguard Workerimport time
22*61c4878aSAndroid Build Coastguard Worker
23*61c4878aSAndroid Build Coastguard Workerfrom threading import Thread
24*61c4878aSAndroid Build Coastguard Worker
25*61c4878aSAndroid Build Coastguard Worker
26*61c4878aSAndroid Build Coastguard Workerdef _tcp_thread(sock: socket.socket) -> None:
27*61c4878aSAndroid Build Coastguard Worker    conn, _ = sock.accept()
28*61c4878aSAndroid Build Coastguard Worker    while True:
29*61c4878aSAndroid Build Coastguard Worker        data = conn.recv(1)
30*61c4878aSAndroid Build Coastguard Worker        conn.send(data)
31*61c4878aSAndroid Build Coastguard Worker
32*61c4878aSAndroid Build Coastguard Worker
33*61c4878aSAndroid Build Coastguard Workerdef _pty_thread(fd: int) -> None:
34*61c4878aSAndroid Build Coastguard Worker    while True:
35*61c4878aSAndroid Build Coastguard Worker        data = os.read(fd, 1)
36*61c4878aSAndroid Build Coastguard Worker        os.write(fd, data)
37*61c4878aSAndroid Build Coastguard Worker
38*61c4878aSAndroid Build Coastguard Worker
39*61c4878aSAndroid Build Coastguard Workerdef _get_parser() -> argparse.ArgumentParser:
40*61c4878aSAndroid Build Coastguard Worker    """Command line parser."""
41*61c4878aSAndroid Build Coastguard Worker
42*61c4878aSAndroid Build Coastguard Worker    parser = argparse.ArgumentParser()
43*61c4878aSAndroid Build Coastguard Worker    parser.add_argument(
44*61c4878aSAndroid Build Coastguard Worker        '-C', '--working-dir', metavar='PATH', help='working directory'
45*61c4878aSAndroid Build Coastguard Worker    )
46*61c4878aSAndroid Build Coastguard Worker    parser.add_argument(
47*61c4878aSAndroid Build Coastguard Worker        'echo', metavar='STRING', nargs='*', help='write STRING to stdout'
48*61c4878aSAndroid Build Coastguard Worker    )
49*61c4878aSAndroid Build Coastguard Worker    parser.add_argument(
50*61c4878aSAndroid Build Coastguard Worker        '--tcp-channel',
51*61c4878aSAndroid Build Coastguard Worker        action='append',
52*61c4878aSAndroid Build Coastguard Worker        default=[],
53*61c4878aSAndroid Build Coastguard Worker        metavar='NAME',
54*61c4878aSAndroid Build Coastguard Worker        help='listen for TCP connections, write port WDIR/NAME',
55*61c4878aSAndroid Build Coastguard Worker    )
56*61c4878aSAndroid Build Coastguard Worker    if sys.platform != 'win32':
57*61c4878aSAndroid Build Coastguard Worker        parser.add_argument(
58*61c4878aSAndroid Build Coastguard Worker            '--pty-channel',
59*61c4878aSAndroid Build Coastguard Worker            action='append',
60*61c4878aSAndroid Build Coastguard Worker            default=[],
61*61c4878aSAndroid Build Coastguard Worker            metavar='NAME',
62*61c4878aSAndroid Build Coastguard Worker            help='create pty channel and link in WDIR/NAME',
63*61c4878aSAndroid Build Coastguard Worker        )
64*61c4878aSAndroid Build Coastguard Worker    parser.add_argument(
65*61c4878aSAndroid Build Coastguard Worker        '--exit', action='store_true', default=False, help='exit when done'
66*61c4878aSAndroid Build Coastguard Worker    )
67*61c4878aSAndroid Build Coastguard Worker
68*61c4878aSAndroid Build Coastguard Worker    return parser
69*61c4878aSAndroid Build Coastguard Worker
70*61c4878aSAndroid Build Coastguard Worker
71*61c4878aSAndroid Build Coastguard Workerdef main() -> None:
72*61c4878aSAndroid Build Coastguard Worker    """Mock emulator."""
73*61c4878aSAndroid Build Coastguard Worker
74*61c4878aSAndroid Build Coastguard Worker    args = _get_parser().parse_args()
75*61c4878aSAndroid Build Coastguard Worker
76*61c4878aSAndroid Build Coastguard Worker    if len(args.echo) > 0:
77*61c4878aSAndroid Build Coastguard Worker        print(' '.join(args.echo))
78*61c4878aSAndroid Build Coastguard Worker    sys.stdout.flush()
79*61c4878aSAndroid Build Coastguard Worker
80*61c4878aSAndroid Build Coastguard Worker    threads = []
81*61c4878aSAndroid Build Coastguard Worker
82*61c4878aSAndroid Build Coastguard Worker    for chan in args.tcp_channel:
83*61c4878aSAndroid Build Coastguard Worker        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
84*61c4878aSAndroid Build Coastguard Worker        sock.bind(('localhost', 0))
85*61c4878aSAndroid Build Coastguard Worker        port = sock.getsockname()[1]
86*61c4878aSAndroid Build Coastguard Worker        sock.listen()
87*61c4878aSAndroid Build Coastguard Worker        with open(os.path.join(args.working_dir, chan), 'w') as file:
88*61c4878aSAndroid Build Coastguard Worker            file.write(str(port))
89*61c4878aSAndroid Build Coastguard Worker        thread = Thread(target=_tcp_thread, args=(sock,))
90*61c4878aSAndroid Build Coastguard Worker        thread.start()
91*61c4878aSAndroid Build Coastguard Worker        threads.append(thread)
92*61c4878aSAndroid Build Coastguard Worker
93*61c4878aSAndroid Build Coastguard Worker    if sys.platform != 'win32':
94*61c4878aSAndroid Build Coastguard Worker        for chan in args.pty_channel:
95*61c4878aSAndroid Build Coastguard Worker            controller, tty = os.openpty()
96*61c4878aSAndroid Build Coastguard Worker            with open(os.path.join(args.working_dir, chan), 'w') as file:
97*61c4878aSAndroid Build Coastguard Worker                file.write(os.ttyname(tty))
98*61c4878aSAndroid Build Coastguard Worker            thread = Thread(target=_pty_thread, args=(controller,))
99*61c4878aSAndroid Build Coastguard Worker            thread.start()
100*61c4878aSAndroid Build Coastguard Worker            threads.append(thread)
101*61c4878aSAndroid Build Coastguard Worker
102*61c4878aSAndroid Build Coastguard Worker    for thread in threads:
103*61c4878aSAndroid Build Coastguard Worker        thread.join()
104*61c4878aSAndroid Build Coastguard Worker
105*61c4878aSAndroid Build Coastguard Worker    if not args.exit:
106*61c4878aSAndroid Build Coastguard Worker        while True:
107*61c4878aSAndroid Build Coastguard Worker            time.sleep(1)
108*61c4878aSAndroid Build Coastguard Worker
109*61c4878aSAndroid Build Coastguard Worker
110*61c4878aSAndroid Build Coastguard Workerif __name__ == '__main__':
111*61c4878aSAndroid Build Coastguard Worker    main()
112