1#!/usr/bin/env python3 2# Copyright 2022 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Python client for pw_transfer integration test.""" 16 17import logging 18import socket 19import sys 20 21from google.protobuf import text_format 22from pw_hdlc.rpc import HdlcRpcClient, default_channels, SocketReader 23from pw_status import Status 24import pw_transfer 25from pw_transfer import transfer_pb2 26from pw_transfer.integration_test import config_pb2 27 28_LOG = logging.getLogger('pw_transfer_integration_test_python_client') 29_LOG.level = logging.DEBUG 30_LOG.addHandler(logging.StreamHandler(sys.stdout)) 31 32HOSTNAME: str = "localhost" 33 34 35def _perform_transfer_action( 36 action: config_pb2.TransferAction, transfer_manager: pw_transfer.Manager 37) -> bool: 38 """Performs the transfer action and returns True on success.""" 39 protocol_version = pw_transfer.ProtocolVersion(int(action.protocol_version)) 40 41 # Default to the latest protocol version if none is specified. 42 if protocol_version == pw_transfer.ProtocolVersion.UNKNOWN: 43 protocol_version = pw_transfer.ProtocolVersion.LATEST 44 45 if ( 46 action.transfer_type 47 == config_pb2.TransferAction.TransferType.WRITE_TO_SERVER 48 ): 49 try: 50 with open(action.file_path, 'rb') as f: 51 data = f.read() 52 except: 53 _LOG.critical("Failed to read input file '%s'", action.file_path) 54 return False 55 56 try: 57 transfer_manager.write( 58 action.resource_id, 59 data, 60 protocol_version=protocol_version, 61 initial_offset=action.initial_offset, 62 ) 63 except pw_transfer.client.Error as e: 64 if e.status != Status(action.expected_status): 65 _LOG.exception( 66 "Unexpected error encountered during write transfer" 67 ) 68 return False 69 return True 70 except: 71 _LOG.exception("Transfer (write to server) failed") 72 return False 73 elif ( 74 action.transfer_type 75 == config_pb2.TransferAction.TransferType.READ_FROM_SERVER 76 ): 77 try: 78 data = transfer_manager.read( 79 action.resource_id, 80 protocol_version=protocol_version, 81 initial_offset=action.initial_offset, 82 ) 83 except pw_transfer.client.Error as e: 84 if e.status != Status(action.expected_status): 85 _LOG.exception( 86 "Unexpected error encountered during read transfer" 87 ) 88 return False 89 return True 90 except: 91 _LOG.exception("Transfer (read from server) failed") 92 return False 93 94 try: 95 with open(action.file_path, 'wb') as f: 96 f.write(data) 97 except: 98 _LOG.critical("Failed to write output file '%s'", action.file_path) 99 return False 100 else: 101 _LOG.critical("Unknown transfer type: %d", action.transfer_type) 102 return False 103 if Status(action.expected_status) != Status.OK: 104 _LOG.error("Transfer was not expected to succeed") 105 return False 106 return True 107 108 109def _main() -> int: 110 if len(sys.argv) != 2: 111 _LOG.critical("Usage: PORT") 112 return 1 113 114 # The port is passed via the command line. 115 try: 116 port = int(sys.argv[1]) 117 except: 118 _LOG.critical("Invalid port specified.") 119 return 1 120 121 # Load the config from stdin. 122 try: 123 text_config = sys.stdin.buffer.read() 124 config = text_format.Parse(text_config, config_pb2.ClientConfig()) 125 except Exception as e: 126 _LOG.critical("Failed to parse config file from stdin: %s", e) 127 return 1 128 129 # Open a connection to the server. 130 try: 131 rpc_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 132 rpc_socket.connect((HOSTNAME, port)) 133 except: 134 _LOG.critical("Failed to connect to server at %s:%d", HOSTNAME, port) 135 return 1 136 137 # Initialize an RPC client using a socket reader and set up the 138 # pw_transfer manager. 139 reader = SocketReader(rpc_socket, 4096) 140 with reader: 141 rpc_client = HdlcRpcClient( 142 reader, 143 [transfer_pb2], 144 default_channels(lambda data: rpc_socket.sendall(data)), 145 lambda data: _LOG.info("%s", str(data)), 146 ) 147 with rpc_client: 148 transfer_service = rpc_client.rpcs().pw.transfer.Transfer 149 transfer_manager = pw_transfer.Manager( 150 transfer_service, 151 default_response_timeout_s=config.chunk_timeout_ms / 1000, 152 initial_response_timeout_s=config.initial_chunk_timeout_ms 153 / 1000, 154 max_retries=config.max_retries, 155 max_lifetime_retries=config.max_lifetime_retries, 156 default_protocol_version=pw_transfer.ProtocolVersion.LATEST, 157 ) 158 159 transfer_logger = logging.getLogger('pw_transfer') 160 transfer_logger.setLevel(logging.DEBUG) 161 transfer_logger.addHandler(logging.StreamHandler(sys.stdout)) 162 163 # Perform the requested transfer actions. 164 for action in config.transfer_actions: 165 if not _perform_transfer_action(action, transfer_manager): 166 return 1 167 168 _LOG.info("All transfers completed successfully") 169 return 0 170 171 172if __name__ == '__main__': 173 sys.exit(_main()) 174