xref: /aosp_15_r20/external/pigweed/pw_transfer/integration_test/proxy.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker#!/usr/bin/env python3
2*61c4878aSAndroid Build Coastguard Worker# Copyright 2022 The Pigweed Authors
3*61c4878aSAndroid Build Coastguard Worker#
4*61c4878aSAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5*61c4878aSAndroid Build Coastguard Worker# use this file except in compliance with the License. You may obtain a copy of
6*61c4878aSAndroid Build Coastguard Worker# the License at
7*61c4878aSAndroid Build Coastguard Worker#
8*61c4878aSAndroid Build Coastguard Worker#     https://www.apache.org/licenses/LICENSE-2.0
9*61c4878aSAndroid Build Coastguard Worker#
10*61c4878aSAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software
11*61c4878aSAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12*61c4878aSAndroid Build Coastguard Worker# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13*61c4878aSAndroid Build Coastguard Worker# License for the specific language governing permissions and limitations under
14*61c4878aSAndroid Build Coastguard Worker# the License.
15*61c4878aSAndroid Build Coastguard Worker"""Proxy for transfer integration testing.
16*61c4878aSAndroid Build Coastguard Worker
17*61c4878aSAndroid Build Coastguard WorkerThis module contains a proxy for transfer intergation testing.  It is capable
18*61c4878aSAndroid Build Coastguard Workerof introducing various link failures into the connection between the client and
19*61c4878aSAndroid Build Coastguard Workerserver.
20*61c4878aSAndroid Build Coastguard Worker"""
21*61c4878aSAndroid Build Coastguard Worker
22*61c4878aSAndroid Build Coastguard Workerimport abc
23*61c4878aSAndroid Build Coastguard Workerimport argparse
24*61c4878aSAndroid Build Coastguard Workerimport asyncio
25*61c4878aSAndroid Build Coastguard Workerfrom enum import Enum
26*61c4878aSAndroid Build Coastguard Workerimport logging
27*61c4878aSAndroid Build Coastguard Workerimport random
28*61c4878aSAndroid Build Coastguard Workerimport socket
29*61c4878aSAndroid Build Coastguard Workerimport sys
30*61c4878aSAndroid Build Coastguard Workerimport time
31*61c4878aSAndroid Build Coastguard Workerfrom typing import Awaitable, Callable, Iterable, NamedTuple
32*61c4878aSAndroid Build Coastguard Worker
33*61c4878aSAndroid Build Coastguard Workerfrom google.protobuf import text_format
34*61c4878aSAndroid Build Coastguard Worker
35*61c4878aSAndroid Build Coastguard Workerfrom pw_rpc.internal import packet_pb2
36*61c4878aSAndroid Build Coastguard Workerfrom pw_transfer import transfer_pb2
37*61c4878aSAndroid Build Coastguard Workerfrom pw_transfer.integration_test import config_pb2
38*61c4878aSAndroid Build Coastguard Workerfrom pw_hdlc import decode
39*61c4878aSAndroid Build Coastguard Workerfrom pw_transfer.chunk import Chunk
40*61c4878aSAndroid Build Coastguard Worker
41*61c4878aSAndroid Build Coastguard Worker_LOG = logging.getLogger('pw_transfer_intergration_test_proxy')
42*61c4878aSAndroid Build Coastguard Worker
43*61c4878aSAndroid Build Coastguard Worker# This is the maximum size of the socket receive buffers. Ideally, this is set
44*61c4878aSAndroid Build Coastguard Worker# to the lowest allowed value to minimize buffering between the proxy and
45*61c4878aSAndroid Build Coastguard Worker# clients so rate limiting causes the client to block and wait for the
46*61c4878aSAndroid Build Coastguard Worker# integration test proxy to drain rather than allowing OS buffers to backlog
47*61c4878aSAndroid Build Coastguard Worker# large quantities of data.
48*61c4878aSAndroid Build Coastguard Worker#
49*61c4878aSAndroid Build Coastguard Worker# Note that the OS may chose to not strictly follow this requested buffer size.
50*61c4878aSAndroid Build Coastguard Worker# Still, setting this value to be relatively small does reduce bufer sizes
51*61c4878aSAndroid Build Coastguard Worker# significantly enough to better reflect typical inter-device communication.
52*61c4878aSAndroid Build Coastguard Worker#
53*61c4878aSAndroid Build Coastguard Worker# For this to be effective, clients should also configure their sockets to a
54*61c4878aSAndroid Build Coastguard Worker# smaller send buffer size.
55*61c4878aSAndroid Build Coastguard Worker_RECEIVE_BUFFER_SIZE = 2048
56*61c4878aSAndroid Build Coastguard Worker
57*61c4878aSAndroid Build Coastguard Worker
58*61c4878aSAndroid Build Coastguard Workerclass EventType(Enum):
59*61c4878aSAndroid Build Coastguard Worker    TRANSFER_START = 1
60*61c4878aSAndroid Build Coastguard Worker    PARAMETERS_RETRANSMIT = 2
61*61c4878aSAndroid Build Coastguard Worker    PARAMETERS_CONTINUE = 3
62*61c4878aSAndroid Build Coastguard Worker    START_ACK_CONFIRMATION = 4
63*61c4878aSAndroid Build Coastguard Worker
64*61c4878aSAndroid Build Coastguard Worker
65*61c4878aSAndroid Build Coastguard Workerclass Event(NamedTuple):
66*61c4878aSAndroid Build Coastguard Worker    type: EventType
67*61c4878aSAndroid Build Coastguard Worker    chunk: Chunk
68*61c4878aSAndroid Build Coastguard Worker
69*61c4878aSAndroid Build Coastguard Worker
70*61c4878aSAndroid Build Coastguard Workerclass Filter(abc.ABC):
71*61c4878aSAndroid Build Coastguard Worker    """An abstract interface for manipulating a stream of data.
72*61c4878aSAndroid Build Coastguard Worker
73*61c4878aSAndroid Build Coastguard Worker    ``Filter``s are used to implement various transforms to simulate real
74*61c4878aSAndroid Build Coastguard Worker    world link properties.  Some examples include: data corruption,
75*61c4878aSAndroid Build Coastguard Worker    packet loss, packet reordering, rate limiting, latency modeling.
76*61c4878aSAndroid Build Coastguard Worker
77*61c4878aSAndroid Build Coastguard Worker    A ``Filter`` implementation should implement the ``process`` method
78*61c4878aSAndroid Build Coastguard Worker    and call ``self.send_data()`` when it has data to send.
79*61c4878aSAndroid Build Coastguard Worker    """
80*61c4878aSAndroid Build Coastguard Worker
81*61c4878aSAndroid Build Coastguard Worker    def __init__(self, send_data: Callable[[bytes], Awaitable[None]]):
82*61c4878aSAndroid Build Coastguard Worker        self.send_data = send_data
83*61c4878aSAndroid Build Coastguard Worker
84*61c4878aSAndroid Build Coastguard Worker    @abc.abstractmethod
85*61c4878aSAndroid Build Coastguard Worker    async def process(self, data: bytes) -> None:
86*61c4878aSAndroid Build Coastguard Worker        """Processes incoming data.
87*61c4878aSAndroid Build Coastguard Worker
88*61c4878aSAndroid Build Coastguard Worker        Implementations of this method may send arbitrary data, or none, using
89*61c4878aSAndroid Build Coastguard Worker        the ``self.send_data()`` handler.
90*61c4878aSAndroid Build Coastguard Worker        """
91*61c4878aSAndroid Build Coastguard Worker
92*61c4878aSAndroid Build Coastguard Worker    async def __call__(self, data: bytes) -> None:
93*61c4878aSAndroid Build Coastguard Worker        await self.process(data)
94*61c4878aSAndroid Build Coastguard Worker
95*61c4878aSAndroid Build Coastguard Worker
96*61c4878aSAndroid Build Coastguard Workerclass HdlcPacketizer(Filter):
97*61c4878aSAndroid Build Coastguard Worker    """A filter which aggregates data into complete HDLC packets.
98*61c4878aSAndroid Build Coastguard Worker
99*61c4878aSAndroid Build Coastguard Worker    Since the proxy transport (SOCK_STREAM) has no framing and we want some
100*61c4878aSAndroid Build Coastguard Worker    filters to operates on whole frames, this filter can be used so that
101*61c4878aSAndroid Build Coastguard Worker    downstream filters see whole frames.
102*61c4878aSAndroid Build Coastguard Worker    """
103*61c4878aSAndroid Build Coastguard Worker
104*61c4878aSAndroid Build Coastguard Worker    def __init__(self, send_data: Callable[[bytes], Awaitable[None]]):
105*61c4878aSAndroid Build Coastguard Worker        super().__init__(send_data)
106*61c4878aSAndroid Build Coastguard Worker        self.decoder = decode.FrameDecoder()
107*61c4878aSAndroid Build Coastguard Worker
108*61c4878aSAndroid Build Coastguard Worker    async def process(self, data: bytes) -> None:
109*61c4878aSAndroid Build Coastguard Worker        for frame in self.decoder.process(data):
110*61c4878aSAndroid Build Coastguard Worker            await self.send_data(frame.raw_encoded)
111*61c4878aSAndroid Build Coastguard Worker
112*61c4878aSAndroid Build Coastguard Worker
113*61c4878aSAndroid Build Coastguard Workerclass DataDropper(Filter):
114*61c4878aSAndroid Build Coastguard Worker    """A filter which drops some data.
115*61c4878aSAndroid Build Coastguard Worker
116*61c4878aSAndroid Build Coastguard Worker    DataDropper will drop data passed through ``process()`` at the
117*61c4878aSAndroid Build Coastguard Worker    specified ``rate``.
118*61c4878aSAndroid Build Coastguard Worker    """
119*61c4878aSAndroid Build Coastguard Worker
120*61c4878aSAndroid Build Coastguard Worker    def __init__(
121*61c4878aSAndroid Build Coastguard Worker        self,
122*61c4878aSAndroid Build Coastguard Worker        send_data: Callable[[bytes], Awaitable[None]],
123*61c4878aSAndroid Build Coastguard Worker        name: str,
124*61c4878aSAndroid Build Coastguard Worker        rate: float,
125*61c4878aSAndroid Build Coastguard Worker        seed: int | None = None,
126*61c4878aSAndroid Build Coastguard Worker    ):
127*61c4878aSAndroid Build Coastguard Worker        super().__init__(send_data)
128*61c4878aSAndroid Build Coastguard Worker        self._rate = rate
129*61c4878aSAndroid Build Coastguard Worker        self._name = name
130*61c4878aSAndroid Build Coastguard Worker        if seed == None:
131*61c4878aSAndroid Build Coastguard Worker            seed = time.time_ns()
132*61c4878aSAndroid Build Coastguard Worker        self._rng = random.Random(seed)
133*61c4878aSAndroid Build Coastguard Worker        _LOG.info(f'{name} DataDropper initialized with seed {seed}')
134*61c4878aSAndroid Build Coastguard Worker
135*61c4878aSAndroid Build Coastguard Worker    async def process(self, data: bytes) -> None:
136*61c4878aSAndroid Build Coastguard Worker        if self._rng.uniform(0.0, 1.0) < self._rate:
137*61c4878aSAndroid Build Coastguard Worker            _LOG.info(f'{self._name} dropped {len(data)} bytes of data')
138*61c4878aSAndroid Build Coastguard Worker        else:
139*61c4878aSAndroid Build Coastguard Worker            await self.send_data(data)
140*61c4878aSAndroid Build Coastguard Worker
141*61c4878aSAndroid Build Coastguard Worker
142*61c4878aSAndroid Build Coastguard Workerclass KeepDropQueue(Filter):
143*61c4878aSAndroid Build Coastguard Worker    """A filter which alternates between sending packets and dropping packets.
144*61c4878aSAndroid Build Coastguard Worker
145*61c4878aSAndroid Build Coastguard Worker    A KeepDropQueue filter will alternate between keeping packets and dropping
146*61c4878aSAndroid Build Coastguard Worker    chunks of data based on a keep/drop queue provided during its creation. The
147*61c4878aSAndroid Build Coastguard Worker    queue is looped over unless a negative element is found. A negative number
148*61c4878aSAndroid Build Coastguard Worker    is effectively the same as a value of infinity.
149*61c4878aSAndroid Build Coastguard Worker
150*61c4878aSAndroid Build Coastguard Worker     This filter is typically most practical when used with a packetizer so data
151*61c4878aSAndroid Build Coastguard Worker     can be dropped as distinct packets.
152*61c4878aSAndroid Build Coastguard Worker
153*61c4878aSAndroid Build Coastguard Worker    Examples:
154*61c4878aSAndroid Build Coastguard Worker
155*61c4878aSAndroid Build Coastguard Worker      keep_drop_queue = [3, 2]:
156*61c4878aSAndroid Build Coastguard Worker        Keeps 3 packets,
157*61c4878aSAndroid Build Coastguard Worker        Drops 2 packets,
158*61c4878aSAndroid Build Coastguard Worker        Keeps 3 packets,
159*61c4878aSAndroid Build Coastguard Worker        Drops 2 packets,
160*61c4878aSAndroid Build Coastguard Worker        ... [loops indefinitely]
161*61c4878aSAndroid Build Coastguard Worker
162*61c4878aSAndroid Build Coastguard Worker      keep_drop_queue = [5, 99, 1, -1]:
163*61c4878aSAndroid Build Coastguard Worker        Keeps 5 packets,
164*61c4878aSAndroid Build Coastguard Worker        Drops 99 packets,
165*61c4878aSAndroid Build Coastguard Worker        Keeps 1 packet,
166*61c4878aSAndroid Build Coastguard Worker        Drops all further packets.
167*61c4878aSAndroid Build Coastguard Worker    """
168*61c4878aSAndroid Build Coastguard Worker
169*61c4878aSAndroid Build Coastguard Worker    def __init__(
170*61c4878aSAndroid Build Coastguard Worker        self,
171*61c4878aSAndroid Build Coastguard Worker        send_data: Callable[[bytes], Awaitable[None]],
172*61c4878aSAndroid Build Coastguard Worker        name: str,
173*61c4878aSAndroid Build Coastguard Worker        keep_drop_queue: Iterable[int],
174*61c4878aSAndroid Build Coastguard Worker        only_consider_transfer_chunks: bool = False,
175*61c4878aSAndroid Build Coastguard Worker    ):
176*61c4878aSAndroid Build Coastguard Worker        super().__init__(send_data)
177*61c4878aSAndroid Build Coastguard Worker        self._keep_drop_queue = list(keep_drop_queue)
178*61c4878aSAndroid Build Coastguard Worker        self._loop_idx = 0
179*61c4878aSAndroid Build Coastguard Worker        self._current_count = self._keep_drop_queue[0]
180*61c4878aSAndroid Build Coastguard Worker        self._keep = True
181*61c4878aSAndroid Build Coastguard Worker        self._name = name
182*61c4878aSAndroid Build Coastguard Worker        self._only_consider_transfer_chunks = only_consider_transfer_chunks
183*61c4878aSAndroid Build Coastguard Worker
184*61c4878aSAndroid Build Coastguard Worker    async def process(self, data: bytes) -> None:
185*61c4878aSAndroid Build Coastguard Worker        if self._only_consider_transfer_chunks:
186*61c4878aSAndroid Build Coastguard Worker            try:
187*61c4878aSAndroid Build Coastguard Worker                _extract_transfer_chunk(data)
188*61c4878aSAndroid Build Coastguard Worker            except Exception:
189*61c4878aSAndroid Build Coastguard Worker                await self.send_data(data)
190*61c4878aSAndroid Build Coastguard Worker                return
191*61c4878aSAndroid Build Coastguard Worker
192*61c4878aSAndroid Build Coastguard Worker        # Move forward through the queue if needed.
193*61c4878aSAndroid Build Coastguard Worker        while self._current_count == 0:
194*61c4878aSAndroid Build Coastguard Worker            self._loop_idx += 1
195*61c4878aSAndroid Build Coastguard Worker            self._current_count = self._keep_drop_queue[
196*61c4878aSAndroid Build Coastguard Worker                self._loop_idx % len(self._keep_drop_queue)
197*61c4878aSAndroid Build Coastguard Worker            ]
198*61c4878aSAndroid Build Coastguard Worker            self._keep = not self._keep
199*61c4878aSAndroid Build Coastguard Worker
200*61c4878aSAndroid Build Coastguard Worker        if self._current_count > 0:
201*61c4878aSAndroid Build Coastguard Worker            self._current_count -= 1
202*61c4878aSAndroid Build Coastguard Worker
203*61c4878aSAndroid Build Coastguard Worker        if self._keep:
204*61c4878aSAndroid Build Coastguard Worker            await self.send_data(data)
205*61c4878aSAndroid Build Coastguard Worker            _LOG.info(f'{self._name} forwarded {len(data)} bytes of data')
206*61c4878aSAndroid Build Coastguard Worker        else:
207*61c4878aSAndroid Build Coastguard Worker            _LOG.info(f'{self._name} dropped {len(data)} bytes of data')
208*61c4878aSAndroid Build Coastguard Worker
209*61c4878aSAndroid Build Coastguard Worker
210*61c4878aSAndroid Build Coastguard Workerclass RateLimiter(Filter):
211*61c4878aSAndroid Build Coastguard Worker    """A filter which limits transmission rate.
212*61c4878aSAndroid Build Coastguard Worker
213*61c4878aSAndroid Build Coastguard Worker    This filter delays transmission of data by len(data)/rate.
214*61c4878aSAndroid Build Coastguard Worker    """
215*61c4878aSAndroid Build Coastguard Worker
216*61c4878aSAndroid Build Coastguard Worker    def __init__(
217*61c4878aSAndroid Build Coastguard Worker        self, send_data: Callable[[bytes], Awaitable[None]], rate: float
218*61c4878aSAndroid Build Coastguard Worker    ):
219*61c4878aSAndroid Build Coastguard Worker        super().__init__(send_data)
220*61c4878aSAndroid Build Coastguard Worker        self._rate = rate
221*61c4878aSAndroid Build Coastguard Worker
222*61c4878aSAndroid Build Coastguard Worker    async def process(self, data: bytes) -> None:
223*61c4878aSAndroid Build Coastguard Worker        delay = len(data) / self._rate
224*61c4878aSAndroid Build Coastguard Worker        await asyncio.sleep(delay)
225*61c4878aSAndroid Build Coastguard Worker        await self.send_data(data)
226*61c4878aSAndroid Build Coastguard Worker
227*61c4878aSAndroid Build Coastguard Worker
228*61c4878aSAndroid Build Coastguard Workerclass DataTransposer(Filter):
229*61c4878aSAndroid Build Coastguard Worker    """A filter which occasionally transposes two chunks of data.
230*61c4878aSAndroid Build Coastguard Worker
231*61c4878aSAndroid Build Coastguard Worker    This filter transposes data at the specified rate.  It does this by
232*61c4878aSAndroid Build Coastguard Worker    holding a chunk to transpose until another chunk arrives. The filter
233*61c4878aSAndroid Build Coastguard Worker    will not hold a chunk longer than ``timeout`` seconds.
234*61c4878aSAndroid Build Coastguard Worker    """
235*61c4878aSAndroid Build Coastguard Worker
236*61c4878aSAndroid Build Coastguard Worker    def __init__(
237*61c4878aSAndroid Build Coastguard Worker        self,
238*61c4878aSAndroid Build Coastguard Worker        send_data: Callable[[bytes], Awaitable[None]],
239*61c4878aSAndroid Build Coastguard Worker        name: str,
240*61c4878aSAndroid Build Coastguard Worker        rate: float,
241*61c4878aSAndroid Build Coastguard Worker        timeout: float,
242*61c4878aSAndroid Build Coastguard Worker        seed: int,
243*61c4878aSAndroid Build Coastguard Worker    ):
244*61c4878aSAndroid Build Coastguard Worker        super().__init__(send_data)
245*61c4878aSAndroid Build Coastguard Worker        self._name = name
246*61c4878aSAndroid Build Coastguard Worker        self._rate = rate
247*61c4878aSAndroid Build Coastguard Worker        self._timeout = timeout
248*61c4878aSAndroid Build Coastguard Worker        self._data_queue = asyncio.Queue()
249*61c4878aSAndroid Build Coastguard Worker        self._rng = random.Random(seed)
250*61c4878aSAndroid Build Coastguard Worker        self._transpose_task = asyncio.create_task(self._transpose_handler())
251*61c4878aSAndroid Build Coastguard Worker
252*61c4878aSAndroid Build Coastguard Worker        _LOG.info(f'{name} DataTranspose initialized with seed {seed}')
253*61c4878aSAndroid Build Coastguard Worker
254*61c4878aSAndroid Build Coastguard Worker    def __del__(self):
255*61c4878aSAndroid Build Coastguard Worker        _LOG.info(f'{self._name} cleaning up transpose task.')
256*61c4878aSAndroid Build Coastguard Worker        self._transpose_task.cancel()
257*61c4878aSAndroid Build Coastguard Worker
258*61c4878aSAndroid Build Coastguard Worker    async def _transpose_handler(self):
259*61c4878aSAndroid Build Coastguard Worker        """Async task that handles the packet transposition and timeouts"""
260*61c4878aSAndroid Build Coastguard Worker        held_data: bytes | None = None
261*61c4878aSAndroid Build Coastguard Worker        while True:
262*61c4878aSAndroid Build Coastguard Worker            # Only use timeout if we have data held for transposition
263*61c4878aSAndroid Build Coastguard Worker            timeout = None if held_data is None else self._timeout
264*61c4878aSAndroid Build Coastguard Worker            try:
265*61c4878aSAndroid Build Coastguard Worker                data = await asyncio.wait_for(
266*61c4878aSAndroid Build Coastguard Worker                    self._data_queue.get(), timeout=timeout
267*61c4878aSAndroid Build Coastguard Worker                )
268*61c4878aSAndroid Build Coastguard Worker
269*61c4878aSAndroid Build Coastguard Worker                if held_data is not None:
270*61c4878aSAndroid Build Coastguard Worker                    # If we have held data, send it out of order.
271*61c4878aSAndroid Build Coastguard Worker                    await self.send_data(data)
272*61c4878aSAndroid Build Coastguard Worker                    await self.send_data(held_data)
273*61c4878aSAndroid Build Coastguard Worker                    held_data = None
274*61c4878aSAndroid Build Coastguard Worker                else:
275*61c4878aSAndroid Build Coastguard Worker                    # Otherwise decide if we should transpose the current data.
276*61c4878aSAndroid Build Coastguard Worker                    if self._rng.uniform(0.0, 1.0) < self._rate:
277*61c4878aSAndroid Build Coastguard Worker                        _LOG.info(
278*61c4878aSAndroid Build Coastguard Worker                            f'{self._name} transposing {len(data)} bytes of data'
279*61c4878aSAndroid Build Coastguard Worker                        )
280*61c4878aSAndroid Build Coastguard Worker                        held_data = data
281*61c4878aSAndroid Build Coastguard Worker                    else:
282*61c4878aSAndroid Build Coastguard Worker                        await self.send_data(data)
283*61c4878aSAndroid Build Coastguard Worker
284*61c4878aSAndroid Build Coastguard Worker            except asyncio.TimeoutError:
285*61c4878aSAndroid Build Coastguard Worker                _LOG.info(f'{self._name} sending data in order due to timeout')
286*61c4878aSAndroid Build Coastguard Worker                await self.send_data(held_data)
287*61c4878aSAndroid Build Coastguard Worker                held_data = None
288*61c4878aSAndroid Build Coastguard Worker
289*61c4878aSAndroid Build Coastguard Worker    async def process(self, data: bytes) -> None:
290*61c4878aSAndroid Build Coastguard Worker        # Queue data for processing by the transpose task.
291*61c4878aSAndroid Build Coastguard Worker        await self._data_queue.put(data)
292*61c4878aSAndroid Build Coastguard Worker
293*61c4878aSAndroid Build Coastguard Worker
294*61c4878aSAndroid Build Coastguard Workerclass ServerFailure(Filter):
295*61c4878aSAndroid Build Coastguard Worker    """A filter to simulate the server stopping sending packets.
296*61c4878aSAndroid Build Coastguard Worker
297*61c4878aSAndroid Build Coastguard Worker    ServerFailure takes a list of numbers of packets to send before
298*61c4878aSAndroid Build Coastguard Worker    dropping all subsequent packets until a TRANSFER_START packet
299*61c4878aSAndroid Build Coastguard Worker    is seen.  This process is repeated for each element in
300*61c4878aSAndroid Build Coastguard Worker    packets_before_failure.  After that list is exhausted, ServerFailure
301*61c4878aSAndroid Build Coastguard Worker    will send all packets.
302*61c4878aSAndroid Build Coastguard Worker
303*61c4878aSAndroid Build Coastguard Worker    This filter should be instantiated in the same filter stack as an
304*61c4878aSAndroid Build Coastguard Worker    HdlcPacketizer so that EventFilter can decode complete packets.
305*61c4878aSAndroid Build Coastguard Worker    """
306*61c4878aSAndroid Build Coastguard Worker
307*61c4878aSAndroid Build Coastguard Worker    def __init__(
308*61c4878aSAndroid Build Coastguard Worker        self,
309*61c4878aSAndroid Build Coastguard Worker        send_data: Callable[[bytes], Awaitable[None]],
310*61c4878aSAndroid Build Coastguard Worker        name: str,
311*61c4878aSAndroid Build Coastguard Worker        packets_before_failure_list: list[int],
312*61c4878aSAndroid Build Coastguard Worker        start_immediately: bool = False,
313*61c4878aSAndroid Build Coastguard Worker        only_consider_transfer_chunks: bool = False,
314*61c4878aSAndroid Build Coastguard Worker    ):
315*61c4878aSAndroid Build Coastguard Worker        super().__init__(send_data)
316*61c4878aSAndroid Build Coastguard Worker        self._name = name
317*61c4878aSAndroid Build Coastguard Worker        self._relay_packets = True
318*61c4878aSAndroid Build Coastguard Worker        self._packets_before_failure_list = packets_before_failure_list
319*61c4878aSAndroid Build Coastguard Worker        self._packets_before_failure = None
320*61c4878aSAndroid Build Coastguard Worker        self._only_consider_transfer_chunks = only_consider_transfer_chunks
321*61c4878aSAndroid Build Coastguard Worker        if start_immediately:
322*61c4878aSAndroid Build Coastguard Worker            self.advance_packets_before_failure()
323*61c4878aSAndroid Build Coastguard Worker
324*61c4878aSAndroid Build Coastguard Worker    def advance_packets_before_failure(self):
325*61c4878aSAndroid Build Coastguard Worker        if len(self._packets_before_failure_list) > 0:
326*61c4878aSAndroid Build Coastguard Worker            self._packets_before_failure = (
327*61c4878aSAndroid Build Coastguard Worker                self._packets_before_failure_list.pop(0)
328*61c4878aSAndroid Build Coastguard Worker            )
329*61c4878aSAndroid Build Coastguard Worker        else:
330*61c4878aSAndroid Build Coastguard Worker            self._packets_before_failure = None
331*61c4878aSAndroid Build Coastguard Worker
332*61c4878aSAndroid Build Coastguard Worker    async def process(self, data: bytes) -> None:
333*61c4878aSAndroid Build Coastguard Worker        if self._only_consider_transfer_chunks:
334*61c4878aSAndroid Build Coastguard Worker            try:
335*61c4878aSAndroid Build Coastguard Worker                _extract_transfer_chunk(data)
336*61c4878aSAndroid Build Coastguard Worker            except Exception:
337*61c4878aSAndroid Build Coastguard Worker                await self.send_data(data)
338*61c4878aSAndroid Build Coastguard Worker                return
339*61c4878aSAndroid Build Coastguard Worker
340*61c4878aSAndroid Build Coastguard Worker        if self._packets_before_failure is None:
341*61c4878aSAndroid Build Coastguard Worker            await self.send_data(data)
342*61c4878aSAndroid Build Coastguard Worker        elif self._packets_before_failure > 0:
343*61c4878aSAndroid Build Coastguard Worker            self._packets_before_failure -= 1
344*61c4878aSAndroid Build Coastguard Worker            await self.send_data(data)
345*61c4878aSAndroid Build Coastguard Worker
346*61c4878aSAndroid Build Coastguard Worker    def handle_event(self, event: Event) -> None:
347*61c4878aSAndroid Build Coastguard Worker        if event.type is EventType.TRANSFER_START:
348*61c4878aSAndroid Build Coastguard Worker            self.advance_packets_before_failure()
349*61c4878aSAndroid Build Coastguard Worker
350*61c4878aSAndroid Build Coastguard Worker
351*61c4878aSAndroid Build Coastguard Workerclass WindowPacketDropper(Filter):
352*61c4878aSAndroid Build Coastguard Worker    """A filter to allow the same packet in each window to be dropped.
353*61c4878aSAndroid Build Coastguard Worker
354*61c4878aSAndroid Build Coastguard Worker    WindowPacketDropper with drop the nth packet in each window as
355*61c4878aSAndroid Build Coastguard Worker    specified by window_packet_to_drop.  This process will happen
356*61c4878aSAndroid Build Coastguard Worker    indefinitely for each window.
357*61c4878aSAndroid Build Coastguard Worker
358*61c4878aSAndroid Build Coastguard Worker    This filter should be instantiated in the same filter stack as an
359*61c4878aSAndroid Build Coastguard Worker    HdlcPacketizer so that EventFilter can decode complete packets.
360*61c4878aSAndroid Build Coastguard Worker    """
361*61c4878aSAndroid Build Coastguard Worker
362*61c4878aSAndroid Build Coastguard Worker    def __init__(
363*61c4878aSAndroid Build Coastguard Worker        self,
364*61c4878aSAndroid Build Coastguard Worker        send_data: Callable[[bytes], Awaitable[None]],
365*61c4878aSAndroid Build Coastguard Worker        name: str,
366*61c4878aSAndroid Build Coastguard Worker        window_packet_to_drop: int,
367*61c4878aSAndroid Build Coastguard Worker    ):
368*61c4878aSAndroid Build Coastguard Worker        super().__init__(send_data)
369*61c4878aSAndroid Build Coastguard Worker        self._name = name
370*61c4878aSAndroid Build Coastguard Worker        self._relay_packets = True
371*61c4878aSAndroid Build Coastguard Worker        self._window_packet_to_drop = window_packet_to_drop
372*61c4878aSAndroid Build Coastguard Worker        self._next_window_start_offset: int | None = 0
373*61c4878aSAndroid Build Coastguard Worker        self._window_packet = 0
374*61c4878aSAndroid Build Coastguard Worker
375*61c4878aSAndroid Build Coastguard Worker    async def process(self, data: bytes) -> None:
376*61c4878aSAndroid Build Coastguard Worker        data_chunk = None
377*61c4878aSAndroid Build Coastguard Worker        try:
378*61c4878aSAndroid Build Coastguard Worker            chunk = _extract_transfer_chunk(data)
379*61c4878aSAndroid Build Coastguard Worker            if chunk.type is Chunk.Type.DATA:
380*61c4878aSAndroid Build Coastguard Worker                data_chunk = chunk
381*61c4878aSAndroid Build Coastguard Worker        except Exception:
382*61c4878aSAndroid Build Coastguard Worker            # Invalid / non-chunk data (e.g. text logs); ignore.
383*61c4878aSAndroid Build Coastguard Worker            pass
384*61c4878aSAndroid Build Coastguard Worker
385*61c4878aSAndroid Build Coastguard Worker        # Only count transfer data chunks as part of a window.
386*61c4878aSAndroid Build Coastguard Worker        if data_chunk is not None:
387*61c4878aSAndroid Build Coastguard Worker            if data_chunk.offset == self._next_window_start_offset:
388*61c4878aSAndroid Build Coastguard Worker                # If a new window has been requested, wait until the first
389*61c4878aSAndroid Build Coastguard Worker                # chunk matching its requested offset to begin counting window
390*61c4878aSAndroid Build Coastguard Worker                # chunks. Any in-flight chunks from the previous window are
391*61c4878aSAndroid Build Coastguard Worker                # allowed through.
392*61c4878aSAndroid Build Coastguard Worker                self._window_packet = 0
393*61c4878aSAndroid Build Coastguard Worker                self._next_window_start_offset = None
394*61c4878aSAndroid Build Coastguard Worker
395*61c4878aSAndroid Build Coastguard Worker            if self._window_packet != self._window_packet_to_drop:
396*61c4878aSAndroid Build Coastguard Worker                await self.send_data(data)
397*61c4878aSAndroid Build Coastguard Worker
398*61c4878aSAndroid Build Coastguard Worker            self._window_packet += 1
399*61c4878aSAndroid Build Coastguard Worker        else:
400*61c4878aSAndroid Build Coastguard Worker            await self.send_data(data)
401*61c4878aSAndroid Build Coastguard Worker
402*61c4878aSAndroid Build Coastguard Worker    def handle_event(self, event: Event) -> None:
403*61c4878aSAndroid Build Coastguard Worker        if event.type in (
404*61c4878aSAndroid Build Coastguard Worker            EventType.PARAMETERS_RETRANSMIT,
405*61c4878aSAndroid Build Coastguard Worker            EventType.PARAMETERS_CONTINUE,
406*61c4878aSAndroid Build Coastguard Worker            EventType.START_ACK_CONFIRMATION,
407*61c4878aSAndroid Build Coastguard Worker        ):
408*61c4878aSAndroid Build Coastguard Worker            # A new transmission window has been requested, starting at the
409*61c4878aSAndroid Build Coastguard Worker            # offset specified in the chunk. The receiver may already have data
410*61c4878aSAndroid Build Coastguard Worker            # from the previous window in-flight, so don't immediately reset
411*61c4878aSAndroid Build Coastguard Worker            # the window packet counter.
412*61c4878aSAndroid Build Coastguard Worker            self._next_window_start_offset = event.chunk.offset
413*61c4878aSAndroid Build Coastguard Worker
414*61c4878aSAndroid Build Coastguard Worker
415*61c4878aSAndroid Build Coastguard Workerclass EventFilter(Filter):
416*61c4878aSAndroid Build Coastguard Worker    """A filter that inspects packets and send events to other filters.
417*61c4878aSAndroid Build Coastguard Worker
418*61c4878aSAndroid Build Coastguard Worker    This filter should be instantiated in the same filter stack as an
419*61c4878aSAndroid Build Coastguard Worker    HdlcPacketizer so that it can decode complete packets.
420*61c4878aSAndroid Build Coastguard Worker    """
421*61c4878aSAndroid Build Coastguard Worker
422*61c4878aSAndroid Build Coastguard Worker    def __init__(
423*61c4878aSAndroid Build Coastguard Worker        self,
424*61c4878aSAndroid Build Coastguard Worker        send_data: Callable[[bytes], Awaitable[None]],
425*61c4878aSAndroid Build Coastguard Worker        name: str,
426*61c4878aSAndroid Build Coastguard Worker        event_queue: asyncio.Queue,
427*61c4878aSAndroid Build Coastguard Worker    ):
428*61c4878aSAndroid Build Coastguard Worker        super().__init__(send_data)
429*61c4878aSAndroid Build Coastguard Worker        self._name = name
430*61c4878aSAndroid Build Coastguard Worker        self._queue = event_queue
431*61c4878aSAndroid Build Coastguard Worker
432*61c4878aSAndroid Build Coastguard Worker    async def process(self, data: bytes) -> None:
433*61c4878aSAndroid Build Coastguard Worker        try:
434*61c4878aSAndroid Build Coastguard Worker            chunk = _extract_transfer_chunk(data)
435*61c4878aSAndroid Build Coastguard Worker            if chunk.type is Chunk.Type.START:
436*61c4878aSAndroid Build Coastguard Worker                await self._queue.put(Event(EventType.TRANSFER_START, chunk))
437*61c4878aSAndroid Build Coastguard Worker            if chunk.type is Chunk.Type.START_ACK_CONFIRMATION:
438*61c4878aSAndroid Build Coastguard Worker                await self._queue.put(
439*61c4878aSAndroid Build Coastguard Worker                    Event(EventType.START_ACK_CONFIRMATION, chunk)
440*61c4878aSAndroid Build Coastguard Worker                )
441*61c4878aSAndroid Build Coastguard Worker            elif chunk.type is Chunk.Type.PARAMETERS_RETRANSMIT:
442*61c4878aSAndroid Build Coastguard Worker                await self._queue.put(
443*61c4878aSAndroid Build Coastguard Worker                    Event(EventType.PARAMETERS_RETRANSMIT, chunk)
444*61c4878aSAndroid Build Coastguard Worker                )
445*61c4878aSAndroid Build Coastguard Worker            elif chunk.type is Chunk.Type.PARAMETERS_CONTINUE:
446*61c4878aSAndroid Build Coastguard Worker                await self._queue.put(
447*61c4878aSAndroid Build Coastguard Worker                    Event(EventType.PARAMETERS_CONTINUE, chunk)
448*61c4878aSAndroid Build Coastguard Worker                )
449*61c4878aSAndroid Build Coastguard Worker        except:
450*61c4878aSAndroid Build Coastguard Worker            # Silently ignore invalid packets
451*61c4878aSAndroid Build Coastguard Worker            pass
452*61c4878aSAndroid Build Coastguard Worker
453*61c4878aSAndroid Build Coastguard Worker        await self.send_data(data)
454*61c4878aSAndroid Build Coastguard Worker
455*61c4878aSAndroid Build Coastguard Worker
456*61c4878aSAndroid Build Coastguard Workerdef _extract_transfer_chunk(data: bytes) -> Chunk:
457*61c4878aSAndroid Build Coastguard Worker    """Gets a transfer Chunk from an HDLC frame containing an RPC packet.
458*61c4878aSAndroid Build Coastguard Worker
459*61c4878aSAndroid Build Coastguard Worker    Raises an exception if a valid chunk does not exist.
460*61c4878aSAndroid Build Coastguard Worker    """
461*61c4878aSAndroid Build Coastguard Worker
462*61c4878aSAndroid Build Coastguard Worker    decoder = decode.FrameDecoder()
463*61c4878aSAndroid Build Coastguard Worker    for frame in decoder.process(data):
464*61c4878aSAndroid Build Coastguard Worker        packet = packet_pb2.RpcPacket()
465*61c4878aSAndroid Build Coastguard Worker        packet.ParseFromString(frame.data)
466*61c4878aSAndroid Build Coastguard Worker
467*61c4878aSAndroid Build Coastguard Worker        if packet.payload:
468*61c4878aSAndroid Build Coastguard Worker            raw_chunk = transfer_pb2.Chunk()
469*61c4878aSAndroid Build Coastguard Worker            raw_chunk.ParseFromString(packet.payload)
470*61c4878aSAndroid Build Coastguard Worker            return Chunk.from_message(raw_chunk)
471*61c4878aSAndroid Build Coastguard Worker
472*61c4878aSAndroid Build Coastguard Worker        # The incoming data is expected to be HDLC-packetized, so only one
473*61c4878aSAndroid Build Coastguard Worker        # frame should exist.
474*61c4878aSAndroid Build Coastguard Worker        break
475*61c4878aSAndroid Build Coastguard Worker
476*61c4878aSAndroid Build Coastguard Worker    raise ValueError("Invalid transfer chunk frame")
477*61c4878aSAndroid Build Coastguard Worker
478*61c4878aSAndroid Build Coastguard Worker
479*61c4878aSAndroid Build Coastguard Workerasync def _handle_simplex_events(
480*61c4878aSAndroid Build Coastguard Worker    event_queue: asyncio.Queue, handlers: list[Callable[[Event], None]]
481*61c4878aSAndroid Build Coastguard Worker):
482*61c4878aSAndroid Build Coastguard Worker    while True:
483*61c4878aSAndroid Build Coastguard Worker        event = await event_queue.get()
484*61c4878aSAndroid Build Coastguard Worker        for handler in handlers:
485*61c4878aSAndroid Build Coastguard Worker            handler(event)
486*61c4878aSAndroid Build Coastguard Worker
487*61c4878aSAndroid Build Coastguard Worker
488*61c4878aSAndroid Build Coastguard Workerasync def _handle_simplex_connection(
489*61c4878aSAndroid Build Coastguard Worker    name: str,
490*61c4878aSAndroid Build Coastguard Worker    filter_stack_config: list[config_pb2.FilterConfig],
491*61c4878aSAndroid Build Coastguard Worker    reader: asyncio.StreamReader,
492*61c4878aSAndroid Build Coastguard Worker    writer: asyncio.StreamWriter,
493*61c4878aSAndroid Build Coastguard Worker    inbound_event_queue: asyncio.Queue,
494*61c4878aSAndroid Build Coastguard Worker    outbound_event_queue: asyncio.Queue,
495*61c4878aSAndroid Build Coastguard Worker) -> None:
496*61c4878aSAndroid Build Coastguard Worker    """Handle a single direction of a bidirectional connection between
497*61c4878aSAndroid Build Coastguard Worker    server and client."""
498*61c4878aSAndroid Build Coastguard Worker
499*61c4878aSAndroid Build Coastguard Worker    async def send(data: bytes):
500*61c4878aSAndroid Build Coastguard Worker        writer.write(data)
501*61c4878aSAndroid Build Coastguard Worker        await writer.drain()
502*61c4878aSAndroid Build Coastguard Worker
503*61c4878aSAndroid Build Coastguard Worker    filter_stack = EventFilter(send, name, outbound_event_queue)
504*61c4878aSAndroid Build Coastguard Worker
505*61c4878aSAndroid Build Coastguard Worker    event_handlers: list[Callable[[Event], None]] = []
506*61c4878aSAndroid Build Coastguard Worker
507*61c4878aSAndroid Build Coastguard Worker    # Build the filter stack from the bottom up
508*61c4878aSAndroid Build Coastguard Worker    for config in reversed(filter_stack_config):
509*61c4878aSAndroid Build Coastguard Worker        filter_name = config.WhichOneof("filter")
510*61c4878aSAndroid Build Coastguard Worker        if filter_name == "hdlc_packetizer":
511*61c4878aSAndroid Build Coastguard Worker            filter_stack = HdlcPacketizer(filter_stack)
512*61c4878aSAndroid Build Coastguard Worker        elif filter_name == "data_dropper":
513*61c4878aSAndroid Build Coastguard Worker            data_dropper = config.data_dropper
514*61c4878aSAndroid Build Coastguard Worker            filter_stack = DataDropper(
515*61c4878aSAndroid Build Coastguard Worker                filter_stack, name, data_dropper.rate, data_dropper.seed
516*61c4878aSAndroid Build Coastguard Worker            )
517*61c4878aSAndroid Build Coastguard Worker        elif filter_name == "rate_limiter":
518*61c4878aSAndroid Build Coastguard Worker            filter_stack = RateLimiter(filter_stack, config.rate_limiter.rate)
519*61c4878aSAndroid Build Coastguard Worker        elif filter_name == "data_transposer":
520*61c4878aSAndroid Build Coastguard Worker            transposer = config.data_transposer
521*61c4878aSAndroid Build Coastguard Worker            filter_stack = DataTransposer(
522*61c4878aSAndroid Build Coastguard Worker                filter_stack,
523*61c4878aSAndroid Build Coastguard Worker                name,
524*61c4878aSAndroid Build Coastguard Worker                transposer.rate,
525*61c4878aSAndroid Build Coastguard Worker                transposer.timeout,
526*61c4878aSAndroid Build Coastguard Worker                transposer.seed,
527*61c4878aSAndroid Build Coastguard Worker            )
528*61c4878aSAndroid Build Coastguard Worker        elif filter_name == "server_failure":
529*61c4878aSAndroid Build Coastguard Worker            server_failure = config.server_failure
530*61c4878aSAndroid Build Coastguard Worker            filter_stack = ServerFailure(
531*61c4878aSAndroid Build Coastguard Worker                filter_stack,
532*61c4878aSAndroid Build Coastguard Worker                name,
533*61c4878aSAndroid Build Coastguard Worker                server_failure.packets_before_failure,
534*61c4878aSAndroid Build Coastguard Worker                server_failure.start_immediately,
535*61c4878aSAndroid Build Coastguard Worker                server_failure.only_consider_transfer_chunks,
536*61c4878aSAndroid Build Coastguard Worker            )
537*61c4878aSAndroid Build Coastguard Worker            event_handlers.append(filter_stack.handle_event)
538*61c4878aSAndroid Build Coastguard Worker        elif filter_name == "keep_drop_queue":
539*61c4878aSAndroid Build Coastguard Worker            keep_drop_queue = config.keep_drop_queue
540*61c4878aSAndroid Build Coastguard Worker            filter_stack = KeepDropQueue(
541*61c4878aSAndroid Build Coastguard Worker                filter_stack,
542*61c4878aSAndroid Build Coastguard Worker                name,
543*61c4878aSAndroid Build Coastguard Worker                keep_drop_queue.keep_drop_queue,
544*61c4878aSAndroid Build Coastguard Worker                keep_drop_queue.only_consider_transfer_chunks,
545*61c4878aSAndroid Build Coastguard Worker            )
546*61c4878aSAndroid Build Coastguard Worker        elif filter_name == "window_packet_dropper":
547*61c4878aSAndroid Build Coastguard Worker            window_packet_dropper = config.window_packet_dropper
548*61c4878aSAndroid Build Coastguard Worker            filter_stack = WindowPacketDropper(
549*61c4878aSAndroid Build Coastguard Worker                filter_stack, name, window_packet_dropper.window_packet_to_drop
550*61c4878aSAndroid Build Coastguard Worker            )
551*61c4878aSAndroid Build Coastguard Worker            event_handlers.append(filter_stack.handle_event)
552*61c4878aSAndroid Build Coastguard Worker        else:
553*61c4878aSAndroid Build Coastguard Worker            sys.exit(f'Unknown filter {filter_name}')
554*61c4878aSAndroid Build Coastguard Worker
555*61c4878aSAndroid Build Coastguard Worker    event_task = asyncio.create_task(
556*61c4878aSAndroid Build Coastguard Worker        _handle_simplex_events(inbound_event_queue, event_handlers)
557*61c4878aSAndroid Build Coastguard Worker    )
558*61c4878aSAndroid Build Coastguard Worker
559*61c4878aSAndroid Build Coastguard Worker    while True:
560*61c4878aSAndroid Build Coastguard Worker        # Arbitrarily chosen "page sized" read.
561*61c4878aSAndroid Build Coastguard Worker        data = await reader.read(4096)
562*61c4878aSAndroid Build Coastguard Worker
563*61c4878aSAndroid Build Coastguard Worker        # An empty data indicates that the connection is closed.
564*61c4878aSAndroid Build Coastguard Worker        if not data:
565*61c4878aSAndroid Build Coastguard Worker            _LOG.info(f'{name} connection closed.')
566*61c4878aSAndroid Build Coastguard Worker            return
567*61c4878aSAndroid Build Coastguard Worker
568*61c4878aSAndroid Build Coastguard Worker        await filter_stack.process(data)
569*61c4878aSAndroid Build Coastguard Worker
570*61c4878aSAndroid Build Coastguard Worker
571*61c4878aSAndroid Build Coastguard Workerasync def _handle_connection(
572*61c4878aSAndroid Build Coastguard Worker    server_port: int,
573*61c4878aSAndroid Build Coastguard Worker    config: config_pb2.ProxyConfig,
574*61c4878aSAndroid Build Coastguard Worker    client_reader: asyncio.StreamReader,
575*61c4878aSAndroid Build Coastguard Worker    client_writer: asyncio.StreamWriter,
576*61c4878aSAndroid Build Coastguard Worker) -> None:
577*61c4878aSAndroid Build Coastguard Worker    """Handle a connection between server and client."""
578*61c4878aSAndroid Build Coastguard Worker
579*61c4878aSAndroid Build Coastguard Worker    client_addr = client_writer.get_extra_info('peername')
580*61c4878aSAndroid Build Coastguard Worker    _LOG.info(f'New client connection from {client_addr}')
581*61c4878aSAndroid Build Coastguard Worker
582*61c4878aSAndroid Build Coastguard Worker    # Open a new connection to the server for each client connection.
583*61c4878aSAndroid Build Coastguard Worker    #
584*61c4878aSAndroid Build Coastguard Worker    # TODO(konkers): catch exception and close client writer
585*61c4878aSAndroid Build Coastguard Worker    server_reader, server_writer = await asyncio.open_connection(
586*61c4878aSAndroid Build Coastguard Worker        'localhost', server_port
587*61c4878aSAndroid Build Coastguard Worker    )
588*61c4878aSAndroid Build Coastguard Worker    _LOG.info('New connection opened to server')
589*61c4878aSAndroid Build Coastguard Worker
590*61c4878aSAndroid Build Coastguard Worker    # Queues for the simplex connections to pass events to each other.
591*61c4878aSAndroid Build Coastguard Worker    server_event_queue = asyncio.Queue()
592*61c4878aSAndroid Build Coastguard Worker    client_event_queue = asyncio.Queue()
593*61c4878aSAndroid Build Coastguard Worker
594*61c4878aSAndroid Build Coastguard Worker    # Instantiate two simplex handler one for each direction of the connection.
595*61c4878aSAndroid Build Coastguard Worker    _, pending = await asyncio.wait(
596*61c4878aSAndroid Build Coastguard Worker        [
597*61c4878aSAndroid Build Coastguard Worker            asyncio.create_task(
598*61c4878aSAndroid Build Coastguard Worker                _handle_simplex_connection(
599*61c4878aSAndroid Build Coastguard Worker                    "client",
600*61c4878aSAndroid Build Coastguard Worker                    config.client_filter_stack,
601*61c4878aSAndroid Build Coastguard Worker                    client_reader,
602*61c4878aSAndroid Build Coastguard Worker                    server_writer,
603*61c4878aSAndroid Build Coastguard Worker                    server_event_queue,
604*61c4878aSAndroid Build Coastguard Worker                    client_event_queue,
605*61c4878aSAndroid Build Coastguard Worker                )
606*61c4878aSAndroid Build Coastguard Worker            ),
607*61c4878aSAndroid Build Coastguard Worker            asyncio.create_task(
608*61c4878aSAndroid Build Coastguard Worker                _handle_simplex_connection(
609*61c4878aSAndroid Build Coastguard Worker                    "server",
610*61c4878aSAndroid Build Coastguard Worker                    config.server_filter_stack,
611*61c4878aSAndroid Build Coastguard Worker                    server_reader,
612*61c4878aSAndroid Build Coastguard Worker                    client_writer,
613*61c4878aSAndroid Build Coastguard Worker                    client_event_queue,
614*61c4878aSAndroid Build Coastguard Worker                    server_event_queue,
615*61c4878aSAndroid Build Coastguard Worker                )
616*61c4878aSAndroid Build Coastguard Worker            ),
617*61c4878aSAndroid Build Coastguard Worker        ],
618*61c4878aSAndroid Build Coastguard Worker        return_when=asyncio.FIRST_COMPLETED,
619*61c4878aSAndroid Build Coastguard Worker    )
620*61c4878aSAndroid Build Coastguard Worker
621*61c4878aSAndroid Build Coastguard Worker    # When one side terminates the connection, also terminate the other side
622*61c4878aSAndroid Build Coastguard Worker    for task in pending:
623*61c4878aSAndroid Build Coastguard Worker        task.cancel()
624*61c4878aSAndroid Build Coastguard Worker
625*61c4878aSAndroid Build Coastguard Worker    for stream in [client_writer, server_writer]:
626*61c4878aSAndroid Build Coastguard Worker        stream.close()
627*61c4878aSAndroid Build Coastguard Worker
628*61c4878aSAndroid Build Coastguard Worker
629*61c4878aSAndroid Build Coastguard Workerdef _parse_args() -> argparse.Namespace:
630*61c4878aSAndroid Build Coastguard Worker    parser = argparse.ArgumentParser(
631*61c4878aSAndroid Build Coastguard Worker        description=__doc__,
632*61c4878aSAndroid Build Coastguard Worker        formatter_class=argparse.RawDescriptionHelpFormatter,
633*61c4878aSAndroid Build Coastguard Worker    )
634*61c4878aSAndroid Build Coastguard Worker
635*61c4878aSAndroid Build Coastguard Worker    parser.add_argument(
636*61c4878aSAndroid Build Coastguard Worker        '--server-port',
637*61c4878aSAndroid Build Coastguard Worker        type=int,
638*61c4878aSAndroid Build Coastguard Worker        required=True,
639*61c4878aSAndroid Build Coastguard Worker        help='Port of the integration test server.  The proxy will forward connections to this port',
640*61c4878aSAndroid Build Coastguard Worker    )
641*61c4878aSAndroid Build Coastguard Worker    parser.add_argument(
642*61c4878aSAndroid Build Coastguard Worker        '--client-port',
643*61c4878aSAndroid Build Coastguard Worker        type=int,
644*61c4878aSAndroid Build Coastguard Worker        required=True,
645*61c4878aSAndroid Build Coastguard Worker        help='Port on which to listen for connections from integration test client.',
646*61c4878aSAndroid Build Coastguard Worker    )
647*61c4878aSAndroid Build Coastguard Worker
648*61c4878aSAndroid Build Coastguard Worker    return parser.parse_args()
649*61c4878aSAndroid Build Coastguard Worker
650*61c4878aSAndroid Build Coastguard Worker
651*61c4878aSAndroid Build Coastguard Workerdef _init_logging(level: int) -> None:
652*61c4878aSAndroid Build Coastguard Worker    _LOG.setLevel(logging.DEBUG)
653*61c4878aSAndroid Build Coastguard Worker    log_to_stderr = logging.StreamHandler()
654*61c4878aSAndroid Build Coastguard Worker    log_to_stderr.setLevel(level)
655*61c4878aSAndroid Build Coastguard Worker    log_to_stderr.setFormatter(
656*61c4878aSAndroid Build Coastguard Worker        logging.Formatter(
657*61c4878aSAndroid Build Coastguard Worker            fmt='%(asctime)s.%(msecs)03d-%(levelname)s: %(message)s',
658*61c4878aSAndroid Build Coastguard Worker            datefmt='%H:%M:%S',
659*61c4878aSAndroid Build Coastguard Worker        )
660*61c4878aSAndroid Build Coastguard Worker    )
661*61c4878aSAndroid Build Coastguard Worker
662*61c4878aSAndroid Build Coastguard Worker    _LOG.addHandler(log_to_stderr)
663*61c4878aSAndroid Build Coastguard Worker
664*61c4878aSAndroid Build Coastguard Worker
665*61c4878aSAndroid Build Coastguard Workerasync def _main(server_port: int, client_port: int) -> None:
666*61c4878aSAndroid Build Coastguard Worker    _init_logging(logging.DEBUG)
667*61c4878aSAndroid Build Coastguard Worker
668*61c4878aSAndroid Build Coastguard Worker    # Load config from stdin using synchronous IO
669*61c4878aSAndroid Build Coastguard Worker    text_config = sys.stdin.buffer.read()
670*61c4878aSAndroid Build Coastguard Worker
671*61c4878aSAndroid Build Coastguard Worker    config = text_format.Parse(text_config, config_pb2.ProxyConfig())
672*61c4878aSAndroid Build Coastguard Worker
673*61c4878aSAndroid Build Coastguard Worker    # Instantiate the TCP server.
674*61c4878aSAndroid Build Coastguard Worker    server_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
675*61c4878aSAndroid Build Coastguard Worker    server_socket.setsockopt(
676*61c4878aSAndroid Build Coastguard Worker        socket.SOL_SOCKET, socket.SO_RCVBUF, _RECEIVE_BUFFER_SIZE
677*61c4878aSAndroid Build Coastguard Worker    )
678*61c4878aSAndroid Build Coastguard Worker    server_socket.bind(('', client_port))
679*61c4878aSAndroid Build Coastguard Worker    server = await asyncio.start_server(
680*61c4878aSAndroid Build Coastguard Worker        lambda reader, writer: _handle_connection(
681*61c4878aSAndroid Build Coastguard Worker            server_port, config, reader, writer
682*61c4878aSAndroid Build Coastguard Worker        ),
683*61c4878aSAndroid Build Coastguard Worker        limit=_RECEIVE_BUFFER_SIZE,
684*61c4878aSAndroid Build Coastguard Worker        sock=server_socket,
685*61c4878aSAndroid Build Coastguard Worker    )
686*61c4878aSAndroid Build Coastguard Worker
687*61c4878aSAndroid Build Coastguard Worker    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
688*61c4878aSAndroid Build Coastguard Worker    _LOG.info(f'Listening for client connection on {addrs}')
689*61c4878aSAndroid Build Coastguard Worker
690*61c4878aSAndroid Build Coastguard Worker    # Run the TCP server.
691*61c4878aSAndroid Build Coastguard Worker    async with server:
692*61c4878aSAndroid Build Coastguard Worker        await server.serve_forever()
693*61c4878aSAndroid Build Coastguard Worker
694*61c4878aSAndroid Build Coastguard Worker
695*61c4878aSAndroid Build Coastguard Workerif __name__ == '__main__':
696*61c4878aSAndroid Build Coastguard Worker    asyncio.run(_main(**vars(_parse_args())))
697