#!/usr/bin/env python # Copyright 2023 The Pigweed Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. """Mock emulator used for testing process and channel management.""" import argparse import os import socket import sys import time from threading import Thread def _tcp_thread(sock: socket.socket) -> None: conn, _ = sock.accept() while True: data = conn.recv(1) conn.send(data) def _pty_thread(fd: int) -> None: while True: data = os.read(fd, 1) os.write(fd, data) def _get_parser() -> argparse.ArgumentParser: """Command line parser.""" parser = argparse.ArgumentParser() parser.add_argument( '-C', '--working-dir', metavar='PATH', help='working directory' ) parser.add_argument( 'echo', metavar='STRING', nargs='*', help='write STRING to stdout' ) parser.add_argument( '--tcp-channel', action='append', default=[], metavar='NAME', help='listen for TCP connections, write port WDIR/NAME', ) if sys.platform != 'win32': parser.add_argument( '--pty-channel', action='append', default=[], metavar='NAME', help='create pty channel and link in WDIR/NAME', ) parser.add_argument( '--exit', action='store_true', default=False, help='exit when done' ) return parser def main() -> None: """Mock emulator.""" args = _get_parser().parse_args() if len(args.echo) > 0: print(' '.join(args.echo)) sys.stdout.flush() threads = [] for chan in args.tcp_channel: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('localhost', 0)) port = sock.getsockname()[1] sock.listen() with open(os.path.join(args.working_dir, chan), 'w') as file: file.write(str(port)) thread = Thread(target=_tcp_thread, args=(sock,)) thread.start() threads.append(thread) if sys.platform != 'win32': for chan in args.pty_channel: controller, tty = os.openpty() with open(os.path.join(args.working_dir, chan), 'w') as file: file.write(os.ttyname(tty)) thread = Thread(target=_pty_thread, args=(controller,)) thread.start() threads.append(thread) for thread in threads: thread.join() if not args.exit: while True: time.sleep(1) if __name__ == '__main__': main()