1*61c4878aSAndroid Build Coastguard Worker#!/usr/bin/env python3 2*61c4878aSAndroid Build Coastguard Worker# Copyright 2022 The Pigweed Authors 3*61c4878aSAndroid Build Coastguard Worker# 4*61c4878aSAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5*61c4878aSAndroid Build Coastguard Worker# use this file except in compliance with the License. You may obtain a copy of 6*61c4878aSAndroid Build Coastguard Worker# the License at 7*61c4878aSAndroid Build Coastguard Worker# 8*61c4878aSAndroid Build Coastguard Worker# https://www.apache.org/licenses/LICENSE-2.0 9*61c4878aSAndroid Build Coastguard Worker# 10*61c4878aSAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software 11*61c4878aSAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12*61c4878aSAndroid Build Coastguard Worker# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13*61c4878aSAndroid Build Coastguard Worker# License for the specific language governing permissions and limitations under 14*61c4878aSAndroid Build Coastguard Worker# the License. 15*61c4878aSAndroid Build Coastguard Worker"""Proxy for transfer integration testing. 16*61c4878aSAndroid Build Coastguard Worker 17*61c4878aSAndroid Build Coastguard WorkerThis module contains a proxy for transfer intergation testing. It is capable 18*61c4878aSAndroid Build Coastguard Workerof introducing various link failures into the connection between the client and 19*61c4878aSAndroid Build Coastguard Workerserver. 20*61c4878aSAndroid Build Coastguard Worker""" 21*61c4878aSAndroid Build Coastguard Worker 22*61c4878aSAndroid Build Coastguard Workerimport abc 23*61c4878aSAndroid Build Coastguard Workerimport argparse 24*61c4878aSAndroid Build Coastguard Workerimport asyncio 25*61c4878aSAndroid Build Coastguard Workerfrom enum import Enum 26*61c4878aSAndroid Build Coastguard Workerimport logging 27*61c4878aSAndroid Build Coastguard Workerimport random 28*61c4878aSAndroid Build Coastguard Workerimport socket 29*61c4878aSAndroid Build Coastguard Workerimport sys 30*61c4878aSAndroid Build Coastguard Workerimport time 31*61c4878aSAndroid Build Coastguard Workerfrom typing import Awaitable, Callable, Iterable, NamedTuple 32*61c4878aSAndroid Build Coastguard Worker 33*61c4878aSAndroid Build Coastguard Workerfrom google.protobuf import text_format 34*61c4878aSAndroid Build Coastguard Worker 35*61c4878aSAndroid Build Coastguard Workerfrom pw_rpc.internal import packet_pb2 36*61c4878aSAndroid Build Coastguard Workerfrom pw_transfer import transfer_pb2 37*61c4878aSAndroid Build Coastguard Workerfrom pw_transfer.integration_test import config_pb2 38*61c4878aSAndroid Build Coastguard Workerfrom pw_hdlc import decode 39*61c4878aSAndroid Build Coastguard Workerfrom pw_transfer.chunk import Chunk 40*61c4878aSAndroid Build Coastguard Worker 41*61c4878aSAndroid Build Coastguard Worker_LOG = logging.getLogger('pw_transfer_intergration_test_proxy') 42*61c4878aSAndroid Build Coastguard Worker 43*61c4878aSAndroid Build Coastguard Worker# This is the maximum size of the socket receive buffers. Ideally, this is set 44*61c4878aSAndroid Build Coastguard Worker# to the lowest allowed value to minimize buffering between the proxy and 45*61c4878aSAndroid Build Coastguard Worker# clients so rate limiting causes the client to block and wait for the 46*61c4878aSAndroid Build Coastguard Worker# integration test proxy to drain rather than allowing OS buffers to backlog 47*61c4878aSAndroid Build Coastguard Worker# large quantities of data. 48*61c4878aSAndroid Build Coastguard Worker# 49*61c4878aSAndroid Build Coastguard Worker# Note that the OS may chose to not strictly follow this requested buffer size. 50*61c4878aSAndroid Build Coastguard Worker# Still, setting this value to be relatively small does reduce bufer sizes 51*61c4878aSAndroid Build Coastguard Worker# significantly enough to better reflect typical inter-device communication. 52*61c4878aSAndroid Build Coastguard Worker# 53*61c4878aSAndroid Build Coastguard Worker# For this to be effective, clients should also configure their sockets to a 54*61c4878aSAndroid Build Coastguard Worker# smaller send buffer size. 55*61c4878aSAndroid Build Coastguard Worker_RECEIVE_BUFFER_SIZE = 2048 56*61c4878aSAndroid Build Coastguard Worker 57*61c4878aSAndroid Build Coastguard Worker 58*61c4878aSAndroid Build Coastguard Workerclass EventType(Enum): 59*61c4878aSAndroid Build Coastguard Worker TRANSFER_START = 1 60*61c4878aSAndroid Build Coastguard Worker PARAMETERS_RETRANSMIT = 2 61*61c4878aSAndroid Build Coastguard Worker PARAMETERS_CONTINUE = 3 62*61c4878aSAndroid Build Coastguard Worker START_ACK_CONFIRMATION = 4 63*61c4878aSAndroid Build Coastguard Worker 64*61c4878aSAndroid Build Coastguard Worker 65*61c4878aSAndroid Build Coastguard Workerclass Event(NamedTuple): 66*61c4878aSAndroid Build Coastguard Worker type: EventType 67*61c4878aSAndroid Build Coastguard Worker chunk: Chunk 68*61c4878aSAndroid Build Coastguard Worker 69*61c4878aSAndroid Build Coastguard Worker 70*61c4878aSAndroid Build Coastguard Workerclass Filter(abc.ABC): 71*61c4878aSAndroid Build Coastguard Worker """An abstract interface for manipulating a stream of data. 72*61c4878aSAndroid Build Coastguard Worker 73*61c4878aSAndroid Build Coastguard Worker ``Filter``s are used to implement various transforms to simulate real 74*61c4878aSAndroid Build Coastguard Worker world link properties. Some examples include: data corruption, 75*61c4878aSAndroid Build Coastguard Worker packet loss, packet reordering, rate limiting, latency modeling. 76*61c4878aSAndroid Build Coastguard Worker 77*61c4878aSAndroid Build Coastguard Worker A ``Filter`` implementation should implement the ``process`` method 78*61c4878aSAndroid Build Coastguard Worker and call ``self.send_data()`` when it has data to send. 79*61c4878aSAndroid Build Coastguard Worker """ 80*61c4878aSAndroid Build Coastguard Worker 81*61c4878aSAndroid Build Coastguard Worker def __init__(self, send_data: Callable[[bytes], Awaitable[None]]): 82*61c4878aSAndroid Build Coastguard Worker self.send_data = send_data 83*61c4878aSAndroid Build Coastguard Worker 84*61c4878aSAndroid Build Coastguard Worker @abc.abstractmethod 85*61c4878aSAndroid Build Coastguard Worker async def process(self, data: bytes) -> None: 86*61c4878aSAndroid Build Coastguard Worker """Processes incoming data. 87*61c4878aSAndroid Build Coastguard Worker 88*61c4878aSAndroid Build Coastguard Worker Implementations of this method may send arbitrary data, or none, using 89*61c4878aSAndroid Build Coastguard Worker the ``self.send_data()`` handler. 90*61c4878aSAndroid Build Coastguard Worker """ 91*61c4878aSAndroid Build Coastguard Worker 92*61c4878aSAndroid Build Coastguard Worker async def __call__(self, data: bytes) -> None: 93*61c4878aSAndroid Build Coastguard Worker await self.process(data) 94*61c4878aSAndroid Build Coastguard Worker 95*61c4878aSAndroid Build Coastguard Worker 96*61c4878aSAndroid Build Coastguard Workerclass HdlcPacketizer(Filter): 97*61c4878aSAndroid Build Coastguard Worker """A filter which aggregates data into complete HDLC packets. 98*61c4878aSAndroid Build Coastguard Worker 99*61c4878aSAndroid Build Coastguard Worker Since the proxy transport (SOCK_STREAM) has no framing and we want some 100*61c4878aSAndroid Build Coastguard Worker filters to operates on whole frames, this filter can be used so that 101*61c4878aSAndroid Build Coastguard Worker downstream filters see whole frames. 102*61c4878aSAndroid Build Coastguard Worker """ 103*61c4878aSAndroid Build Coastguard Worker 104*61c4878aSAndroid Build Coastguard Worker def __init__(self, send_data: Callable[[bytes], Awaitable[None]]): 105*61c4878aSAndroid Build Coastguard Worker super().__init__(send_data) 106*61c4878aSAndroid Build Coastguard Worker self.decoder = decode.FrameDecoder() 107*61c4878aSAndroid Build Coastguard Worker 108*61c4878aSAndroid Build Coastguard Worker async def process(self, data: bytes) -> None: 109*61c4878aSAndroid Build Coastguard Worker for frame in self.decoder.process(data): 110*61c4878aSAndroid Build Coastguard Worker await self.send_data(frame.raw_encoded) 111*61c4878aSAndroid Build Coastguard Worker 112*61c4878aSAndroid Build Coastguard Worker 113*61c4878aSAndroid Build Coastguard Workerclass DataDropper(Filter): 114*61c4878aSAndroid Build Coastguard Worker """A filter which drops some data. 115*61c4878aSAndroid Build Coastguard Worker 116*61c4878aSAndroid Build Coastguard Worker DataDropper will drop data passed through ``process()`` at the 117*61c4878aSAndroid Build Coastguard Worker specified ``rate``. 118*61c4878aSAndroid Build Coastguard Worker """ 119*61c4878aSAndroid Build Coastguard Worker 120*61c4878aSAndroid Build Coastguard Worker def __init__( 121*61c4878aSAndroid Build Coastguard Worker self, 122*61c4878aSAndroid Build Coastguard Worker send_data: Callable[[bytes], Awaitable[None]], 123*61c4878aSAndroid Build Coastguard Worker name: str, 124*61c4878aSAndroid Build Coastguard Worker rate: float, 125*61c4878aSAndroid Build Coastguard Worker seed: int | None = None, 126*61c4878aSAndroid Build Coastguard Worker ): 127*61c4878aSAndroid Build Coastguard Worker super().__init__(send_data) 128*61c4878aSAndroid Build Coastguard Worker self._rate = rate 129*61c4878aSAndroid Build Coastguard Worker self._name = name 130*61c4878aSAndroid Build Coastguard Worker if seed == None: 131*61c4878aSAndroid Build Coastguard Worker seed = time.time_ns() 132*61c4878aSAndroid Build Coastguard Worker self._rng = random.Random(seed) 133*61c4878aSAndroid Build Coastguard Worker _LOG.info(f'{name} DataDropper initialized with seed {seed}') 134*61c4878aSAndroid Build Coastguard Worker 135*61c4878aSAndroid Build Coastguard Worker async def process(self, data: bytes) -> None: 136*61c4878aSAndroid Build Coastguard Worker if self._rng.uniform(0.0, 1.0) < self._rate: 137*61c4878aSAndroid Build Coastguard Worker _LOG.info(f'{self._name} dropped {len(data)} bytes of data') 138*61c4878aSAndroid Build Coastguard Worker else: 139*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 140*61c4878aSAndroid Build Coastguard Worker 141*61c4878aSAndroid Build Coastguard Worker 142*61c4878aSAndroid Build Coastguard Workerclass KeepDropQueue(Filter): 143*61c4878aSAndroid Build Coastguard Worker """A filter which alternates between sending packets and dropping packets. 144*61c4878aSAndroid Build Coastguard Worker 145*61c4878aSAndroid Build Coastguard Worker A KeepDropQueue filter will alternate between keeping packets and dropping 146*61c4878aSAndroid Build Coastguard Worker chunks of data based on a keep/drop queue provided during its creation. The 147*61c4878aSAndroid Build Coastguard Worker queue is looped over unless a negative element is found. A negative number 148*61c4878aSAndroid Build Coastguard Worker is effectively the same as a value of infinity. 149*61c4878aSAndroid Build Coastguard Worker 150*61c4878aSAndroid Build Coastguard Worker This filter is typically most practical when used with a packetizer so data 151*61c4878aSAndroid Build Coastguard Worker can be dropped as distinct packets. 152*61c4878aSAndroid Build Coastguard Worker 153*61c4878aSAndroid Build Coastguard Worker Examples: 154*61c4878aSAndroid Build Coastguard Worker 155*61c4878aSAndroid Build Coastguard Worker keep_drop_queue = [3, 2]: 156*61c4878aSAndroid Build Coastguard Worker Keeps 3 packets, 157*61c4878aSAndroid Build Coastguard Worker Drops 2 packets, 158*61c4878aSAndroid Build Coastguard Worker Keeps 3 packets, 159*61c4878aSAndroid Build Coastguard Worker Drops 2 packets, 160*61c4878aSAndroid Build Coastguard Worker ... [loops indefinitely] 161*61c4878aSAndroid Build Coastguard Worker 162*61c4878aSAndroid Build Coastguard Worker keep_drop_queue = [5, 99, 1, -1]: 163*61c4878aSAndroid Build Coastguard Worker Keeps 5 packets, 164*61c4878aSAndroid Build Coastguard Worker Drops 99 packets, 165*61c4878aSAndroid Build Coastguard Worker Keeps 1 packet, 166*61c4878aSAndroid Build Coastguard Worker Drops all further packets. 167*61c4878aSAndroid Build Coastguard Worker """ 168*61c4878aSAndroid Build Coastguard Worker 169*61c4878aSAndroid Build Coastguard Worker def __init__( 170*61c4878aSAndroid Build Coastguard Worker self, 171*61c4878aSAndroid Build Coastguard Worker send_data: Callable[[bytes], Awaitable[None]], 172*61c4878aSAndroid Build Coastguard Worker name: str, 173*61c4878aSAndroid Build Coastguard Worker keep_drop_queue: Iterable[int], 174*61c4878aSAndroid Build Coastguard Worker only_consider_transfer_chunks: bool = False, 175*61c4878aSAndroid Build Coastguard Worker ): 176*61c4878aSAndroid Build Coastguard Worker super().__init__(send_data) 177*61c4878aSAndroid Build Coastguard Worker self._keep_drop_queue = list(keep_drop_queue) 178*61c4878aSAndroid Build Coastguard Worker self._loop_idx = 0 179*61c4878aSAndroid Build Coastguard Worker self._current_count = self._keep_drop_queue[0] 180*61c4878aSAndroid Build Coastguard Worker self._keep = True 181*61c4878aSAndroid Build Coastguard Worker self._name = name 182*61c4878aSAndroid Build Coastguard Worker self._only_consider_transfer_chunks = only_consider_transfer_chunks 183*61c4878aSAndroid Build Coastguard Worker 184*61c4878aSAndroid Build Coastguard Worker async def process(self, data: bytes) -> None: 185*61c4878aSAndroid Build Coastguard Worker if self._only_consider_transfer_chunks: 186*61c4878aSAndroid Build Coastguard Worker try: 187*61c4878aSAndroid Build Coastguard Worker _extract_transfer_chunk(data) 188*61c4878aSAndroid Build Coastguard Worker except Exception: 189*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 190*61c4878aSAndroid Build Coastguard Worker return 191*61c4878aSAndroid Build Coastguard Worker 192*61c4878aSAndroid Build Coastguard Worker # Move forward through the queue if needed. 193*61c4878aSAndroid Build Coastguard Worker while self._current_count == 0: 194*61c4878aSAndroid Build Coastguard Worker self._loop_idx += 1 195*61c4878aSAndroid Build Coastguard Worker self._current_count = self._keep_drop_queue[ 196*61c4878aSAndroid Build Coastguard Worker self._loop_idx % len(self._keep_drop_queue) 197*61c4878aSAndroid Build Coastguard Worker ] 198*61c4878aSAndroid Build Coastguard Worker self._keep = not self._keep 199*61c4878aSAndroid Build Coastguard Worker 200*61c4878aSAndroid Build Coastguard Worker if self._current_count > 0: 201*61c4878aSAndroid Build Coastguard Worker self._current_count -= 1 202*61c4878aSAndroid Build Coastguard Worker 203*61c4878aSAndroid Build Coastguard Worker if self._keep: 204*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 205*61c4878aSAndroid Build Coastguard Worker _LOG.info(f'{self._name} forwarded {len(data)} bytes of data') 206*61c4878aSAndroid Build Coastguard Worker else: 207*61c4878aSAndroid Build Coastguard Worker _LOG.info(f'{self._name} dropped {len(data)} bytes of data') 208*61c4878aSAndroid Build Coastguard Worker 209*61c4878aSAndroid Build Coastguard Worker 210*61c4878aSAndroid Build Coastguard Workerclass RateLimiter(Filter): 211*61c4878aSAndroid Build Coastguard Worker """A filter which limits transmission rate. 212*61c4878aSAndroid Build Coastguard Worker 213*61c4878aSAndroid Build Coastguard Worker This filter delays transmission of data by len(data)/rate. 214*61c4878aSAndroid Build Coastguard Worker """ 215*61c4878aSAndroid Build Coastguard Worker 216*61c4878aSAndroid Build Coastguard Worker def __init__( 217*61c4878aSAndroid Build Coastguard Worker self, send_data: Callable[[bytes], Awaitable[None]], rate: float 218*61c4878aSAndroid Build Coastguard Worker ): 219*61c4878aSAndroid Build Coastguard Worker super().__init__(send_data) 220*61c4878aSAndroid Build Coastguard Worker self._rate = rate 221*61c4878aSAndroid Build Coastguard Worker 222*61c4878aSAndroid Build Coastguard Worker async def process(self, data: bytes) -> None: 223*61c4878aSAndroid Build Coastguard Worker delay = len(data) / self._rate 224*61c4878aSAndroid Build Coastguard Worker await asyncio.sleep(delay) 225*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 226*61c4878aSAndroid Build Coastguard Worker 227*61c4878aSAndroid Build Coastguard Worker 228*61c4878aSAndroid Build Coastguard Workerclass DataTransposer(Filter): 229*61c4878aSAndroid Build Coastguard Worker """A filter which occasionally transposes two chunks of data. 230*61c4878aSAndroid Build Coastguard Worker 231*61c4878aSAndroid Build Coastguard Worker This filter transposes data at the specified rate. It does this by 232*61c4878aSAndroid Build Coastguard Worker holding a chunk to transpose until another chunk arrives. The filter 233*61c4878aSAndroid Build Coastguard Worker will not hold a chunk longer than ``timeout`` seconds. 234*61c4878aSAndroid Build Coastguard Worker """ 235*61c4878aSAndroid Build Coastguard Worker 236*61c4878aSAndroid Build Coastguard Worker def __init__( 237*61c4878aSAndroid Build Coastguard Worker self, 238*61c4878aSAndroid Build Coastguard Worker send_data: Callable[[bytes], Awaitable[None]], 239*61c4878aSAndroid Build Coastguard Worker name: str, 240*61c4878aSAndroid Build Coastguard Worker rate: float, 241*61c4878aSAndroid Build Coastguard Worker timeout: float, 242*61c4878aSAndroid Build Coastguard Worker seed: int, 243*61c4878aSAndroid Build Coastguard Worker ): 244*61c4878aSAndroid Build Coastguard Worker super().__init__(send_data) 245*61c4878aSAndroid Build Coastguard Worker self._name = name 246*61c4878aSAndroid Build Coastguard Worker self._rate = rate 247*61c4878aSAndroid Build Coastguard Worker self._timeout = timeout 248*61c4878aSAndroid Build Coastguard Worker self._data_queue = asyncio.Queue() 249*61c4878aSAndroid Build Coastguard Worker self._rng = random.Random(seed) 250*61c4878aSAndroid Build Coastguard Worker self._transpose_task = asyncio.create_task(self._transpose_handler()) 251*61c4878aSAndroid Build Coastguard Worker 252*61c4878aSAndroid Build Coastguard Worker _LOG.info(f'{name} DataTranspose initialized with seed {seed}') 253*61c4878aSAndroid Build Coastguard Worker 254*61c4878aSAndroid Build Coastguard Worker def __del__(self): 255*61c4878aSAndroid Build Coastguard Worker _LOG.info(f'{self._name} cleaning up transpose task.') 256*61c4878aSAndroid Build Coastguard Worker self._transpose_task.cancel() 257*61c4878aSAndroid Build Coastguard Worker 258*61c4878aSAndroid Build Coastguard Worker async def _transpose_handler(self): 259*61c4878aSAndroid Build Coastguard Worker """Async task that handles the packet transposition and timeouts""" 260*61c4878aSAndroid Build Coastguard Worker held_data: bytes | None = None 261*61c4878aSAndroid Build Coastguard Worker while True: 262*61c4878aSAndroid Build Coastguard Worker # Only use timeout if we have data held for transposition 263*61c4878aSAndroid Build Coastguard Worker timeout = None if held_data is None else self._timeout 264*61c4878aSAndroid Build Coastguard Worker try: 265*61c4878aSAndroid Build Coastguard Worker data = await asyncio.wait_for( 266*61c4878aSAndroid Build Coastguard Worker self._data_queue.get(), timeout=timeout 267*61c4878aSAndroid Build Coastguard Worker ) 268*61c4878aSAndroid Build Coastguard Worker 269*61c4878aSAndroid Build Coastguard Worker if held_data is not None: 270*61c4878aSAndroid Build Coastguard Worker # If we have held data, send it out of order. 271*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 272*61c4878aSAndroid Build Coastguard Worker await self.send_data(held_data) 273*61c4878aSAndroid Build Coastguard Worker held_data = None 274*61c4878aSAndroid Build Coastguard Worker else: 275*61c4878aSAndroid Build Coastguard Worker # Otherwise decide if we should transpose the current data. 276*61c4878aSAndroid Build Coastguard Worker if self._rng.uniform(0.0, 1.0) < self._rate: 277*61c4878aSAndroid Build Coastguard Worker _LOG.info( 278*61c4878aSAndroid Build Coastguard Worker f'{self._name} transposing {len(data)} bytes of data' 279*61c4878aSAndroid Build Coastguard Worker ) 280*61c4878aSAndroid Build Coastguard Worker held_data = data 281*61c4878aSAndroid Build Coastguard Worker else: 282*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 283*61c4878aSAndroid Build Coastguard Worker 284*61c4878aSAndroid Build Coastguard Worker except asyncio.TimeoutError: 285*61c4878aSAndroid Build Coastguard Worker _LOG.info(f'{self._name} sending data in order due to timeout') 286*61c4878aSAndroid Build Coastguard Worker await self.send_data(held_data) 287*61c4878aSAndroid Build Coastguard Worker held_data = None 288*61c4878aSAndroid Build Coastguard Worker 289*61c4878aSAndroid Build Coastguard Worker async def process(self, data: bytes) -> None: 290*61c4878aSAndroid Build Coastguard Worker # Queue data for processing by the transpose task. 291*61c4878aSAndroid Build Coastguard Worker await self._data_queue.put(data) 292*61c4878aSAndroid Build Coastguard Worker 293*61c4878aSAndroid Build Coastguard Worker 294*61c4878aSAndroid Build Coastguard Workerclass ServerFailure(Filter): 295*61c4878aSAndroid Build Coastguard Worker """A filter to simulate the server stopping sending packets. 296*61c4878aSAndroid Build Coastguard Worker 297*61c4878aSAndroid Build Coastguard Worker ServerFailure takes a list of numbers of packets to send before 298*61c4878aSAndroid Build Coastguard Worker dropping all subsequent packets until a TRANSFER_START packet 299*61c4878aSAndroid Build Coastguard Worker is seen. This process is repeated for each element in 300*61c4878aSAndroid Build Coastguard Worker packets_before_failure. After that list is exhausted, ServerFailure 301*61c4878aSAndroid Build Coastguard Worker will send all packets. 302*61c4878aSAndroid Build Coastguard Worker 303*61c4878aSAndroid Build Coastguard Worker This filter should be instantiated in the same filter stack as an 304*61c4878aSAndroid Build Coastguard Worker HdlcPacketizer so that EventFilter can decode complete packets. 305*61c4878aSAndroid Build Coastguard Worker """ 306*61c4878aSAndroid Build Coastguard Worker 307*61c4878aSAndroid Build Coastguard Worker def __init__( 308*61c4878aSAndroid Build Coastguard Worker self, 309*61c4878aSAndroid Build Coastguard Worker send_data: Callable[[bytes], Awaitable[None]], 310*61c4878aSAndroid Build Coastguard Worker name: str, 311*61c4878aSAndroid Build Coastguard Worker packets_before_failure_list: list[int], 312*61c4878aSAndroid Build Coastguard Worker start_immediately: bool = False, 313*61c4878aSAndroid Build Coastguard Worker only_consider_transfer_chunks: bool = False, 314*61c4878aSAndroid Build Coastguard Worker ): 315*61c4878aSAndroid Build Coastguard Worker super().__init__(send_data) 316*61c4878aSAndroid Build Coastguard Worker self._name = name 317*61c4878aSAndroid Build Coastguard Worker self._relay_packets = True 318*61c4878aSAndroid Build Coastguard Worker self._packets_before_failure_list = packets_before_failure_list 319*61c4878aSAndroid Build Coastguard Worker self._packets_before_failure = None 320*61c4878aSAndroid Build Coastguard Worker self._only_consider_transfer_chunks = only_consider_transfer_chunks 321*61c4878aSAndroid Build Coastguard Worker if start_immediately: 322*61c4878aSAndroid Build Coastguard Worker self.advance_packets_before_failure() 323*61c4878aSAndroid Build Coastguard Worker 324*61c4878aSAndroid Build Coastguard Worker def advance_packets_before_failure(self): 325*61c4878aSAndroid Build Coastguard Worker if len(self._packets_before_failure_list) > 0: 326*61c4878aSAndroid Build Coastguard Worker self._packets_before_failure = ( 327*61c4878aSAndroid Build Coastguard Worker self._packets_before_failure_list.pop(0) 328*61c4878aSAndroid Build Coastguard Worker ) 329*61c4878aSAndroid Build Coastguard Worker else: 330*61c4878aSAndroid Build Coastguard Worker self._packets_before_failure = None 331*61c4878aSAndroid Build Coastguard Worker 332*61c4878aSAndroid Build Coastguard Worker async def process(self, data: bytes) -> None: 333*61c4878aSAndroid Build Coastguard Worker if self._only_consider_transfer_chunks: 334*61c4878aSAndroid Build Coastguard Worker try: 335*61c4878aSAndroid Build Coastguard Worker _extract_transfer_chunk(data) 336*61c4878aSAndroid Build Coastguard Worker except Exception: 337*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 338*61c4878aSAndroid Build Coastguard Worker return 339*61c4878aSAndroid Build Coastguard Worker 340*61c4878aSAndroid Build Coastguard Worker if self._packets_before_failure is None: 341*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 342*61c4878aSAndroid Build Coastguard Worker elif self._packets_before_failure > 0: 343*61c4878aSAndroid Build Coastguard Worker self._packets_before_failure -= 1 344*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 345*61c4878aSAndroid Build Coastguard Worker 346*61c4878aSAndroid Build Coastguard Worker def handle_event(self, event: Event) -> None: 347*61c4878aSAndroid Build Coastguard Worker if event.type is EventType.TRANSFER_START: 348*61c4878aSAndroid Build Coastguard Worker self.advance_packets_before_failure() 349*61c4878aSAndroid Build Coastguard Worker 350*61c4878aSAndroid Build Coastguard Worker 351*61c4878aSAndroid Build Coastguard Workerclass WindowPacketDropper(Filter): 352*61c4878aSAndroid Build Coastguard Worker """A filter to allow the same packet in each window to be dropped. 353*61c4878aSAndroid Build Coastguard Worker 354*61c4878aSAndroid Build Coastguard Worker WindowPacketDropper with drop the nth packet in each window as 355*61c4878aSAndroid Build Coastguard Worker specified by window_packet_to_drop. This process will happen 356*61c4878aSAndroid Build Coastguard Worker indefinitely for each window. 357*61c4878aSAndroid Build Coastguard Worker 358*61c4878aSAndroid Build Coastguard Worker This filter should be instantiated in the same filter stack as an 359*61c4878aSAndroid Build Coastguard Worker HdlcPacketizer so that EventFilter can decode complete packets. 360*61c4878aSAndroid Build Coastguard Worker """ 361*61c4878aSAndroid Build Coastguard Worker 362*61c4878aSAndroid Build Coastguard Worker def __init__( 363*61c4878aSAndroid Build Coastguard Worker self, 364*61c4878aSAndroid Build Coastguard Worker send_data: Callable[[bytes], Awaitable[None]], 365*61c4878aSAndroid Build Coastguard Worker name: str, 366*61c4878aSAndroid Build Coastguard Worker window_packet_to_drop: int, 367*61c4878aSAndroid Build Coastguard Worker ): 368*61c4878aSAndroid Build Coastguard Worker super().__init__(send_data) 369*61c4878aSAndroid Build Coastguard Worker self._name = name 370*61c4878aSAndroid Build Coastguard Worker self._relay_packets = True 371*61c4878aSAndroid Build Coastguard Worker self._window_packet_to_drop = window_packet_to_drop 372*61c4878aSAndroid Build Coastguard Worker self._next_window_start_offset: int | None = 0 373*61c4878aSAndroid Build Coastguard Worker self._window_packet = 0 374*61c4878aSAndroid Build Coastguard Worker 375*61c4878aSAndroid Build Coastguard Worker async def process(self, data: bytes) -> None: 376*61c4878aSAndroid Build Coastguard Worker data_chunk = None 377*61c4878aSAndroid Build Coastguard Worker try: 378*61c4878aSAndroid Build Coastguard Worker chunk = _extract_transfer_chunk(data) 379*61c4878aSAndroid Build Coastguard Worker if chunk.type is Chunk.Type.DATA: 380*61c4878aSAndroid Build Coastguard Worker data_chunk = chunk 381*61c4878aSAndroid Build Coastguard Worker except Exception: 382*61c4878aSAndroid Build Coastguard Worker # Invalid / non-chunk data (e.g. text logs); ignore. 383*61c4878aSAndroid Build Coastguard Worker pass 384*61c4878aSAndroid Build Coastguard Worker 385*61c4878aSAndroid Build Coastguard Worker # Only count transfer data chunks as part of a window. 386*61c4878aSAndroid Build Coastguard Worker if data_chunk is not None: 387*61c4878aSAndroid Build Coastguard Worker if data_chunk.offset == self._next_window_start_offset: 388*61c4878aSAndroid Build Coastguard Worker # If a new window has been requested, wait until the first 389*61c4878aSAndroid Build Coastguard Worker # chunk matching its requested offset to begin counting window 390*61c4878aSAndroid Build Coastguard Worker # chunks. Any in-flight chunks from the previous window are 391*61c4878aSAndroid Build Coastguard Worker # allowed through. 392*61c4878aSAndroid Build Coastguard Worker self._window_packet = 0 393*61c4878aSAndroid Build Coastguard Worker self._next_window_start_offset = None 394*61c4878aSAndroid Build Coastguard Worker 395*61c4878aSAndroid Build Coastguard Worker if self._window_packet != self._window_packet_to_drop: 396*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 397*61c4878aSAndroid Build Coastguard Worker 398*61c4878aSAndroid Build Coastguard Worker self._window_packet += 1 399*61c4878aSAndroid Build Coastguard Worker else: 400*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 401*61c4878aSAndroid Build Coastguard Worker 402*61c4878aSAndroid Build Coastguard Worker def handle_event(self, event: Event) -> None: 403*61c4878aSAndroid Build Coastguard Worker if event.type in ( 404*61c4878aSAndroid Build Coastguard Worker EventType.PARAMETERS_RETRANSMIT, 405*61c4878aSAndroid Build Coastguard Worker EventType.PARAMETERS_CONTINUE, 406*61c4878aSAndroid Build Coastguard Worker EventType.START_ACK_CONFIRMATION, 407*61c4878aSAndroid Build Coastguard Worker ): 408*61c4878aSAndroid Build Coastguard Worker # A new transmission window has been requested, starting at the 409*61c4878aSAndroid Build Coastguard Worker # offset specified in the chunk. The receiver may already have data 410*61c4878aSAndroid Build Coastguard Worker # from the previous window in-flight, so don't immediately reset 411*61c4878aSAndroid Build Coastguard Worker # the window packet counter. 412*61c4878aSAndroid Build Coastguard Worker self._next_window_start_offset = event.chunk.offset 413*61c4878aSAndroid Build Coastguard Worker 414*61c4878aSAndroid Build Coastguard Worker 415*61c4878aSAndroid Build Coastguard Workerclass EventFilter(Filter): 416*61c4878aSAndroid Build Coastguard Worker """A filter that inspects packets and send events to other filters. 417*61c4878aSAndroid Build Coastguard Worker 418*61c4878aSAndroid Build Coastguard Worker This filter should be instantiated in the same filter stack as an 419*61c4878aSAndroid Build Coastguard Worker HdlcPacketizer so that it can decode complete packets. 420*61c4878aSAndroid Build Coastguard Worker """ 421*61c4878aSAndroid Build Coastguard Worker 422*61c4878aSAndroid Build Coastguard Worker def __init__( 423*61c4878aSAndroid Build Coastguard Worker self, 424*61c4878aSAndroid Build Coastguard Worker send_data: Callable[[bytes], Awaitable[None]], 425*61c4878aSAndroid Build Coastguard Worker name: str, 426*61c4878aSAndroid Build Coastguard Worker event_queue: asyncio.Queue, 427*61c4878aSAndroid Build Coastguard Worker ): 428*61c4878aSAndroid Build Coastguard Worker super().__init__(send_data) 429*61c4878aSAndroid Build Coastguard Worker self._name = name 430*61c4878aSAndroid Build Coastguard Worker self._queue = event_queue 431*61c4878aSAndroid Build Coastguard Worker 432*61c4878aSAndroid Build Coastguard Worker async def process(self, data: bytes) -> None: 433*61c4878aSAndroid Build Coastguard Worker try: 434*61c4878aSAndroid Build Coastguard Worker chunk = _extract_transfer_chunk(data) 435*61c4878aSAndroid Build Coastguard Worker if chunk.type is Chunk.Type.START: 436*61c4878aSAndroid Build Coastguard Worker await self._queue.put(Event(EventType.TRANSFER_START, chunk)) 437*61c4878aSAndroid Build Coastguard Worker if chunk.type is Chunk.Type.START_ACK_CONFIRMATION: 438*61c4878aSAndroid Build Coastguard Worker await self._queue.put( 439*61c4878aSAndroid Build Coastguard Worker Event(EventType.START_ACK_CONFIRMATION, chunk) 440*61c4878aSAndroid Build Coastguard Worker ) 441*61c4878aSAndroid Build Coastguard Worker elif chunk.type is Chunk.Type.PARAMETERS_RETRANSMIT: 442*61c4878aSAndroid Build Coastguard Worker await self._queue.put( 443*61c4878aSAndroid Build Coastguard Worker Event(EventType.PARAMETERS_RETRANSMIT, chunk) 444*61c4878aSAndroid Build Coastguard Worker ) 445*61c4878aSAndroid Build Coastguard Worker elif chunk.type is Chunk.Type.PARAMETERS_CONTINUE: 446*61c4878aSAndroid Build Coastguard Worker await self._queue.put( 447*61c4878aSAndroid Build Coastguard Worker Event(EventType.PARAMETERS_CONTINUE, chunk) 448*61c4878aSAndroid Build Coastguard Worker ) 449*61c4878aSAndroid Build Coastguard Worker except: 450*61c4878aSAndroid Build Coastguard Worker # Silently ignore invalid packets 451*61c4878aSAndroid Build Coastguard Worker pass 452*61c4878aSAndroid Build Coastguard Worker 453*61c4878aSAndroid Build Coastguard Worker await self.send_data(data) 454*61c4878aSAndroid Build Coastguard Worker 455*61c4878aSAndroid Build Coastguard Worker 456*61c4878aSAndroid Build Coastguard Workerdef _extract_transfer_chunk(data: bytes) -> Chunk: 457*61c4878aSAndroid Build Coastguard Worker """Gets a transfer Chunk from an HDLC frame containing an RPC packet. 458*61c4878aSAndroid Build Coastguard Worker 459*61c4878aSAndroid Build Coastguard Worker Raises an exception if a valid chunk does not exist. 460*61c4878aSAndroid Build Coastguard Worker """ 461*61c4878aSAndroid Build Coastguard Worker 462*61c4878aSAndroid Build Coastguard Worker decoder = decode.FrameDecoder() 463*61c4878aSAndroid Build Coastguard Worker for frame in decoder.process(data): 464*61c4878aSAndroid Build Coastguard Worker packet = packet_pb2.RpcPacket() 465*61c4878aSAndroid Build Coastguard Worker packet.ParseFromString(frame.data) 466*61c4878aSAndroid Build Coastguard Worker 467*61c4878aSAndroid Build Coastguard Worker if packet.payload: 468*61c4878aSAndroid Build Coastguard Worker raw_chunk = transfer_pb2.Chunk() 469*61c4878aSAndroid Build Coastguard Worker raw_chunk.ParseFromString(packet.payload) 470*61c4878aSAndroid Build Coastguard Worker return Chunk.from_message(raw_chunk) 471*61c4878aSAndroid Build Coastguard Worker 472*61c4878aSAndroid Build Coastguard Worker # The incoming data is expected to be HDLC-packetized, so only one 473*61c4878aSAndroid Build Coastguard Worker # frame should exist. 474*61c4878aSAndroid Build Coastguard Worker break 475*61c4878aSAndroid Build Coastguard Worker 476*61c4878aSAndroid Build Coastguard Worker raise ValueError("Invalid transfer chunk frame") 477*61c4878aSAndroid Build Coastguard Worker 478*61c4878aSAndroid Build Coastguard Worker 479*61c4878aSAndroid Build Coastguard Workerasync def _handle_simplex_events( 480*61c4878aSAndroid Build Coastguard Worker event_queue: asyncio.Queue, handlers: list[Callable[[Event], None]] 481*61c4878aSAndroid Build Coastguard Worker): 482*61c4878aSAndroid Build Coastguard Worker while True: 483*61c4878aSAndroid Build Coastguard Worker event = await event_queue.get() 484*61c4878aSAndroid Build Coastguard Worker for handler in handlers: 485*61c4878aSAndroid Build Coastguard Worker handler(event) 486*61c4878aSAndroid Build Coastguard Worker 487*61c4878aSAndroid Build Coastguard Worker 488*61c4878aSAndroid Build Coastguard Workerasync def _handle_simplex_connection( 489*61c4878aSAndroid Build Coastguard Worker name: str, 490*61c4878aSAndroid Build Coastguard Worker filter_stack_config: list[config_pb2.FilterConfig], 491*61c4878aSAndroid Build Coastguard Worker reader: asyncio.StreamReader, 492*61c4878aSAndroid Build Coastguard Worker writer: asyncio.StreamWriter, 493*61c4878aSAndroid Build Coastguard Worker inbound_event_queue: asyncio.Queue, 494*61c4878aSAndroid Build Coastguard Worker outbound_event_queue: asyncio.Queue, 495*61c4878aSAndroid Build Coastguard Worker) -> None: 496*61c4878aSAndroid Build Coastguard Worker """Handle a single direction of a bidirectional connection between 497*61c4878aSAndroid Build Coastguard Worker server and client.""" 498*61c4878aSAndroid Build Coastguard Worker 499*61c4878aSAndroid Build Coastguard Worker async def send(data: bytes): 500*61c4878aSAndroid Build Coastguard Worker writer.write(data) 501*61c4878aSAndroid Build Coastguard Worker await writer.drain() 502*61c4878aSAndroid Build Coastguard Worker 503*61c4878aSAndroid Build Coastguard Worker filter_stack = EventFilter(send, name, outbound_event_queue) 504*61c4878aSAndroid Build Coastguard Worker 505*61c4878aSAndroid Build Coastguard Worker event_handlers: list[Callable[[Event], None]] = [] 506*61c4878aSAndroid Build Coastguard Worker 507*61c4878aSAndroid Build Coastguard Worker # Build the filter stack from the bottom up 508*61c4878aSAndroid Build Coastguard Worker for config in reversed(filter_stack_config): 509*61c4878aSAndroid Build Coastguard Worker filter_name = config.WhichOneof("filter") 510*61c4878aSAndroid Build Coastguard Worker if filter_name == "hdlc_packetizer": 511*61c4878aSAndroid Build Coastguard Worker filter_stack = HdlcPacketizer(filter_stack) 512*61c4878aSAndroid Build Coastguard Worker elif filter_name == "data_dropper": 513*61c4878aSAndroid Build Coastguard Worker data_dropper = config.data_dropper 514*61c4878aSAndroid Build Coastguard Worker filter_stack = DataDropper( 515*61c4878aSAndroid Build Coastguard Worker filter_stack, name, data_dropper.rate, data_dropper.seed 516*61c4878aSAndroid Build Coastguard Worker ) 517*61c4878aSAndroid Build Coastguard Worker elif filter_name == "rate_limiter": 518*61c4878aSAndroid Build Coastguard Worker filter_stack = RateLimiter(filter_stack, config.rate_limiter.rate) 519*61c4878aSAndroid Build Coastguard Worker elif filter_name == "data_transposer": 520*61c4878aSAndroid Build Coastguard Worker transposer = config.data_transposer 521*61c4878aSAndroid Build Coastguard Worker filter_stack = DataTransposer( 522*61c4878aSAndroid Build Coastguard Worker filter_stack, 523*61c4878aSAndroid Build Coastguard Worker name, 524*61c4878aSAndroid Build Coastguard Worker transposer.rate, 525*61c4878aSAndroid Build Coastguard Worker transposer.timeout, 526*61c4878aSAndroid Build Coastguard Worker transposer.seed, 527*61c4878aSAndroid Build Coastguard Worker ) 528*61c4878aSAndroid Build Coastguard Worker elif filter_name == "server_failure": 529*61c4878aSAndroid Build Coastguard Worker server_failure = config.server_failure 530*61c4878aSAndroid Build Coastguard Worker filter_stack = ServerFailure( 531*61c4878aSAndroid Build Coastguard Worker filter_stack, 532*61c4878aSAndroid Build Coastguard Worker name, 533*61c4878aSAndroid Build Coastguard Worker server_failure.packets_before_failure, 534*61c4878aSAndroid Build Coastguard Worker server_failure.start_immediately, 535*61c4878aSAndroid Build Coastguard Worker server_failure.only_consider_transfer_chunks, 536*61c4878aSAndroid Build Coastguard Worker ) 537*61c4878aSAndroid Build Coastguard Worker event_handlers.append(filter_stack.handle_event) 538*61c4878aSAndroid Build Coastguard Worker elif filter_name == "keep_drop_queue": 539*61c4878aSAndroid Build Coastguard Worker keep_drop_queue = config.keep_drop_queue 540*61c4878aSAndroid Build Coastguard Worker filter_stack = KeepDropQueue( 541*61c4878aSAndroid Build Coastguard Worker filter_stack, 542*61c4878aSAndroid Build Coastguard Worker name, 543*61c4878aSAndroid Build Coastguard Worker keep_drop_queue.keep_drop_queue, 544*61c4878aSAndroid Build Coastguard Worker keep_drop_queue.only_consider_transfer_chunks, 545*61c4878aSAndroid Build Coastguard Worker ) 546*61c4878aSAndroid Build Coastguard Worker elif filter_name == "window_packet_dropper": 547*61c4878aSAndroid Build Coastguard Worker window_packet_dropper = config.window_packet_dropper 548*61c4878aSAndroid Build Coastguard Worker filter_stack = WindowPacketDropper( 549*61c4878aSAndroid Build Coastguard Worker filter_stack, name, window_packet_dropper.window_packet_to_drop 550*61c4878aSAndroid Build Coastguard Worker ) 551*61c4878aSAndroid Build Coastguard Worker event_handlers.append(filter_stack.handle_event) 552*61c4878aSAndroid Build Coastguard Worker else: 553*61c4878aSAndroid Build Coastguard Worker sys.exit(f'Unknown filter {filter_name}') 554*61c4878aSAndroid Build Coastguard Worker 555*61c4878aSAndroid Build Coastguard Worker event_task = asyncio.create_task( 556*61c4878aSAndroid Build Coastguard Worker _handle_simplex_events(inbound_event_queue, event_handlers) 557*61c4878aSAndroid Build Coastguard Worker ) 558*61c4878aSAndroid Build Coastguard Worker 559*61c4878aSAndroid Build Coastguard Worker while True: 560*61c4878aSAndroid Build Coastguard Worker # Arbitrarily chosen "page sized" read. 561*61c4878aSAndroid Build Coastguard Worker data = await reader.read(4096) 562*61c4878aSAndroid Build Coastguard Worker 563*61c4878aSAndroid Build Coastguard Worker # An empty data indicates that the connection is closed. 564*61c4878aSAndroid Build Coastguard Worker if not data: 565*61c4878aSAndroid Build Coastguard Worker _LOG.info(f'{name} connection closed.') 566*61c4878aSAndroid Build Coastguard Worker return 567*61c4878aSAndroid Build Coastguard Worker 568*61c4878aSAndroid Build Coastguard Worker await filter_stack.process(data) 569*61c4878aSAndroid Build Coastguard Worker 570*61c4878aSAndroid Build Coastguard Worker 571*61c4878aSAndroid Build Coastguard Workerasync def _handle_connection( 572*61c4878aSAndroid Build Coastguard Worker server_port: int, 573*61c4878aSAndroid Build Coastguard Worker config: config_pb2.ProxyConfig, 574*61c4878aSAndroid Build Coastguard Worker client_reader: asyncio.StreamReader, 575*61c4878aSAndroid Build Coastguard Worker client_writer: asyncio.StreamWriter, 576*61c4878aSAndroid Build Coastguard Worker) -> None: 577*61c4878aSAndroid Build Coastguard Worker """Handle a connection between server and client.""" 578*61c4878aSAndroid Build Coastguard Worker 579*61c4878aSAndroid Build Coastguard Worker client_addr = client_writer.get_extra_info('peername') 580*61c4878aSAndroid Build Coastguard Worker _LOG.info(f'New client connection from {client_addr}') 581*61c4878aSAndroid Build Coastguard Worker 582*61c4878aSAndroid Build Coastguard Worker # Open a new connection to the server for each client connection. 583*61c4878aSAndroid Build Coastguard Worker # 584*61c4878aSAndroid Build Coastguard Worker # TODO(konkers): catch exception and close client writer 585*61c4878aSAndroid Build Coastguard Worker server_reader, server_writer = await asyncio.open_connection( 586*61c4878aSAndroid Build Coastguard Worker 'localhost', server_port 587*61c4878aSAndroid Build Coastguard Worker ) 588*61c4878aSAndroid Build Coastguard Worker _LOG.info('New connection opened to server') 589*61c4878aSAndroid Build Coastguard Worker 590*61c4878aSAndroid Build Coastguard Worker # Queues for the simplex connections to pass events to each other. 591*61c4878aSAndroid Build Coastguard Worker server_event_queue = asyncio.Queue() 592*61c4878aSAndroid Build Coastguard Worker client_event_queue = asyncio.Queue() 593*61c4878aSAndroid Build Coastguard Worker 594*61c4878aSAndroid Build Coastguard Worker # Instantiate two simplex handler one for each direction of the connection. 595*61c4878aSAndroid Build Coastguard Worker _, pending = await asyncio.wait( 596*61c4878aSAndroid Build Coastguard Worker [ 597*61c4878aSAndroid Build Coastguard Worker asyncio.create_task( 598*61c4878aSAndroid Build Coastguard Worker _handle_simplex_connection( 599*61c4878aSAndroid Build Coastguard Worker "client", 600*61c4878aSAndroid Build Coastguard Worker config.client_filter_stack, 601*61c4878aSAndroid Build Coastguard Worker client_reader, 602*61c4878aSAndroid Build Coastguard Worker server_writer, 603*61c4878aSAndroid Build Coastguard Worker server_event_queue, 604*61c4878aSAndroid Build Coastguard Worker client_event_queue, 605*61c4878aSAndroid Build Coastguard Worker ) 606*61c4878aSAndroid Build Coastguard Worker ), 607*61c4878aSAndroid Build Coastguard Worker asyncio.create_task( 608*61c4878aSAndroid Build Coastguard Worker _handle_simplex_connection( 609*61c4878aSAndroid Build Coastguard Worker "server", 610*61c4878aSAndroid Build Coastguard Worker config.server_filter_stack, 611*61c4878aSAndroid Build Coastguard Worker server_reader, 612*61c4878aSAndroid Build Coastguard Worker client_writer, 613*61c4878aSAndroid Build Coastguard Worker client_event_queue, 614*61c4878aSAndroid Build Coastguard Worker server_event_queue, 615*61c4878aSAndroid Build Coastguard Worker ) 616*61c4878aSAndroid Build Coastguard Worker ), 617*61c4878aSAndroid Build Coastguard Worker ], 618*61c4878aSAndroid Build Coastguard Worker return_when=asyncio.FIRST_COMPLETED, 619*61c4878aSAndroid Build Coastguard Worker ) 620*61c4878aSAndroid Build Coastguard Worker 621*61c4878aSAndroid Build Coastguard Worker # When one side terminates the connection, also terminate the other side 622*61c4878aSAndroid Build Coastguard Worker for task in pending: 623*61c4878aSAndroid Build Coastguard Worker task.cancel() 624*61c4878aSAndroid Build Coastguard Worker 625*61c4878aSAndroid Build Coastguard Worker for stream in [client_writer, server_writer]: 626*61c4878aSAndroid Build Coastguard Worker stream.close() 627*61c4878aSAndroid Build Coastguard Worker 628*61c4878aSAndroid Build Coastguard Worker 629*61c4878aSAndroid Build Coastguard Workerdef _parse_args() -> argparse.Namespace: 630*61c4878aSAndroid Build Coastguard Worker parser = argparse.ArgumentParser( 631*61c4878aSAndroid Build Coastguard Worker description=__doc__, 632*61c4878aSAndroid Build Coastguard Worker formatter_class=argparse.RawDescriptionHelpFormatter, 633*61c4878aSAndroid Build Coastguard Worker ) 634*61c4878aSAndroid Build Coastguard Worker 635*61c4878aSAndroid Build Coastguard Worker parser.add_argument( 636*61c4878aSAndroid Build Coastguard Worker '--server-port', 637*61c4878aSAndroid Build Coastguard Worker type=int, 638*61c4878aSAndroid Build Coastguard Worker required=True, 639*61c4878aSAndroid Build Coastguard Worker help='Port of the integration test server. The proxy will forward connections to this port', 640*61c4878aSAndroid Build Coastguard Worker ) 641*61c4878aSAndroid Build Coastguard Worker parser.add_argument( 642*61c4878aSAndroid Build Coastguard Worker '--client-port', 643*61c4878aSAndroid Build Coastguard Worker type=int, 644*61c4878aSAndroid Build Coastguard Worker required=True, 645*61c4878aSAndroid Build Coastguard Worker help='Port on which to listen for connections from integration test client.', 646*61c4878aSAndroid Build Coastguard Worker ) 647*61c4878aSAndroid Build Coastguard Worker 648*61c4878aSAndroid Build Coastguard Worker return parser.parse_args() 649*61c4878aSAndroid Build Coastguard Worker 650*61c4878aSAndroid Build Coastguard Worker 651*61c4878aSAndroid Build Coastguard Workerdef _init_logging(level: int) -> None: 652*61c4878aSAndroid Build Coastguard Worker _LOG.setLevel(logging.DEBUG) 653*61c4878aSAndroid Build Coastguard Worker log_to_stderr = logging.StreamHandler() 654*61c4878aSAndroid Build Coastguard Worker log_to_stderr.setLevel(level) 655*61c4878aSAndroid Build Coastguard Worker log_to_stderr.setFormatter( 656*61c4878aSAndroid Build Coastguard Worker logging.Formatter( 657*61c4878aSAndroid Build Coastguard Worker fmt='%(asctime)s.%(msecs)03d-%(levelname)s: %(message)s', 658*61c4878aSAndroid Build Coastguard Worker datefmt='%H:%M:%S', 659*61c4878aSAndroid Build Coastguard Worker ) 660*61c4878aSAndroid Build Coastguard Worker ) 661*61c4878aSAndroid Build Coastguard Worker 662*61c4878aSAndroid Build Coastguard Worker _LOG.addHandler(log_to_stderr) 663*61c4878aSAndroid Build Coastguard Worker 664*61c4878aSAndroid Build Coastguard Worker 665*61c4878aSAndroid Build Coastguard Workerasync def _main(server_port: int, client_port: int) -> None: 666*61c4878aSAndroid Build Coastguard Worker _init_logging(logging.DEBUG) 667*61c4878aSAndroid Build Coastguard Worker 668*61c4878aSAndroid Build Coastguard Worker # Load config from stdin using synchronous IO 669*61c4878aSAndroid Build Coastguard Worker text_config = sys.stdin.buffer.read() 670*61c4878aSAndroid Build Coastguard Worker 671*61c4878aSAndroid Build Coastguard Worker config = text_format.Parse(text_config, config_pb2.ProxyConfig()) 672*61c4878aSAndroid Build Coastguard Worker 673*61c4878aSAndroid Build Coastguard Worker # Instantiate the TCP server. 674*61c4878aSAndroid Build Coastguard Worker server_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 675*61c4878aSAndroid Build Coastguard Worker server_socket.setsockopt( 676*61c4878aSAndroid Build Coastguard Worker socket.SOL_SOCKET, socket.SO_RCVBUF, _RECEIVE_BUFFER_SIZE 677*61c4878aSAndroid Build Coastguard Worker ) 678*61c4878aSAndroid Build Coastguard Worker server_socket.bind(('', client_port)) 679*61c4878aSAndroid Build Coastguard Worker server = await asyncio.start_server( 680*61c4878aSAndroid Build Coastguard Worker lambda reader, writer: _handle_connection( 681*61c4878aSAndroid Build Coastguard Worker server_port, config, reader, writer 682*61c4878aSAndroid Build Coastguard Worker ), 683*61c4878aSAndroid Build Coastguard Worker limit=_RECEIVE_BUFFER_SIZE, 684*61c4878aSAndroid Build Coastguard Worker sock=server_socket, 685*61c4878aSAndroid Build Coastguard Worker ) 686*61c4878aSAndroid Build Coastguard Worker 687*61c4878aSAndroid Build Coastguard Worker addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets) 688*61c4878aSAndroid Build Coastguard Worker _LOG.info(f'Listening for client connection on {addrs}') 689*61c4878aSAndroid Build Coastguard Worker 690*61c4878aSAndroid Build Coastguard Worker # Run the TCP server. 691*61c4878aSAndroid Build Coastguard Worker async with server: 692*61c4878aSAndroid Build Coastguard Worker await server.serve_forever() 693*61c4878aSAndroid Build Coastguard Worker 694*61c4878aSAndroid Build Coastguard Worker 695*61c4878aSAndroid Build Coastguard Workerif __name__ == '__main__': 696*61c4878aSAndroid Build Coastguard Worker asyncio.run(_main(**vars(_parse_args()))) 697