xref: /aosp_15_r20/external/pigweed/pw_transfer/integration_test/python_client.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2022 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Python client for pw_transfer integration test."""
16
17import logging
18import socket
19import sys
20
21from google.protobuf import text_format
22from pw_hdlc.rpc import HdlcRpcClient, default_channels, SocketReader
23from pw_status import Status
24import pw_transfer
25from pw_transfer import transfer_pb2
26from pw_transfer.integration_test import config_pb2
27
28_LOG = logging.getLogger('pw_transfer_integration_test_python_client')
29_LOG.level = logging.DEBUG
30_LOG.addHandler(logging.StreamHandler(sys.stdout))
31
32HOSTNAME: str = "localhost"
33
34
35def _perform_transfer_action(
36    action: config_pb2.TransferAction, transfer_manager: pw_transfer.Manager
37) -> bool:
38    """Performs the transfer action and returns True on success."""
39    protocol_version = pw_transfer.ProtocolVersion(int(action.protocol_version))
40
41    # Default to the latest protocol version if none is specified.
42    if protocol_version == pw_transfer.ProtocolVersion.UNKNOWN:
43        protocol_version = pw_transfer.ProtocolVersion.LATEST
44
45    if (
46        action.transfer_type
47        == config_pb2.TransferAction.TransferType.WRITE_TO_SERVER
48    ):
49        try:
50            with open(action.file_path, 'rb') as f:
51                data = f.read()
52        except:
53            _LOG.critical("Failed to read input file '%s'", action.file_path)
54            return False
55
56        try:
57            transfer_manager.write(
58                action.resource_id,
59                data,
60                protocol_version=protocol_version,
61                initial_offset=action.initial_offset,
62            )
63        except pw_transfer.client.Error as e:
64            if e.status != Status(action.expected_status):
65                _LOG.exception(
66                    "Unexpected error encountered during write transfer"
67                )
68                return False
69            return True
70        except:
71            _LOG.exception("Transfer (write to server) failed")
72            return False
73    elif (
74        action.transfer_type
75        == config_pb2.TransferAction.TransferType.READ_FROM_SERVER
76    ):
77        try:
78            data = transfer_manager.read(
79                action.resource_id,
80                protocol_version=protocol_version,
81                initial_offset=action.initial_offset,
82            )
83        except pw_transfer.client.Error as e:
84            if e.status != Status(action.expected_status):
85                _LOG.exception(
86                    "Unexpected error encountered during read transfer"
87                )
88                return False
89            return True
90        except:
91            _LOG.exception("Transfer (read from server) failed")
92            return False
93
94        try:
95            with open(action.file_path, 'wb') as f:
96                f.write(data)
97        except:
98            _LOG.critical("Failed to write output file '%s'", action.file_path)
99            return False
100    else:
101        _LOG.critical("Unknown transfer type: %d", action.transfer_type)
102        return False
103    if Status(action.expected_status) != Status.OK:
104        _LOG.error("Transfer was not expected to succeed")
105        return False
106    return True
107
108
109def _main() -> int:
110    if len(sys.argv) != 2:
111        _LOG.critical("Usage: PORT")
112        return 1
113
114    # The port is passed via the command line.
115    try:
116        port = int(sys.argv[1])
117    except:
118        _LOG.critical("Invalid port specified.")
119        return 1
120
121    # Load the config from stdin.
122    try:
123        text_config = sys.stdin.buffer.read()
124        config = text_format.Parse(text_config, config_pb2.ClientConfig())
125    except Exception as e:
126        _LOG.critical("Failed to parse config file from stdin: %s", e)
127        return 1
128
129    # Open a connection to the server.
130    try:
131        rpc_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
132        rpc_socket.connect((HOSTNAME, port))
133    except:
134        _LOG.critical("Failed to connect to server at %s:%d", HOSTNAME, port)
135        return 1
136
137    # Initialize an RPC client using a socket reader and set up the
138    # pw_transfer manager.
139    reader = SocketReader(rpc_socket, 4096)
140    with reader:
141        rpc_client = HdlcRpcClient(
142            reader,
143            [transfer_pb2],
144            default_channels(lambda data: rpc_socket.sendall(data)),
145            lambda data: _LOG.info("%s", str(data)),
146        )
147        with rpc_client:
148            transfer_service = rpc_client.rpcs().pw.transfer.Transfer
149            transfer_manager = pw_transfer.Manager(
150                transfer_service,
151                default_response_timeout_s=config.chunk_timeout_ms / 1000,
152                initial_response_timeout_s=config.initial_chunk_timeout_ms
153                / 1000,
154                max_retries=config.max_retries,
155                max_lifetime_retries=config.max_lifetime_retries,
156                default_protocol_version=pw_transfer.ProtocolVersion.LATEST,
157            )
158
159            transfer_logger = logging.getLogger('pw_transfer')
160            transfer_logger.setLevel(logging.DEBUG)
161            transfer_logger.addHandler(logging.StreamHandler(sys.stdout))
162
163            # Perform the requested transfer actions.
164            for action in config.transfer_actions:
165                if not _perform_transfer_action(action, transfer_manager):
166                    return 1
167
168    _LOG.info("All transfers completed successfully")
169    return 0
170
171
172if __name__ == '__main__':
173    sys.exit(_main())
174