xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import contextlib
16import importlib
17import itertools
18import os
19from os import path
20import pkgutil
21import shutil
22import sys
23import tempfile
24import threading
25import unittest
26
27from grpc.beta import implementations
28from grpc.beta import interfaces
29from grpc.framework.foundation import future
30from grpc.framework.interfaces.face import face
31from grpc_tools import protoc
32
33from tests.unit.framework.common import test_constants
34
35_RELATIVE_PROTO_PATH = "relative_proto_path"
36_RELATIVE_PYTHON_OUT = "relative_python_out"
37
38_PROTO_FILES_PATH_COMPONENTS = (
39    (
40        "beta_grpc_plugin_test",
41        "payload",
42        "test_payload.proto",
43    ),
44    (
45        "beta_grpc_plugin_test",
46        "requests",
47        "r",
48        "test_requests.proto",
49    ),
50    (
51        "beta_grpc_plugin_test",
52        "responses",
53        "test_responses.proto",
54    ),
55    (
56        "beta_grpc_plugin_test",
57        "service",
58        "test_service.proto",
59    ),
60)
61
62_PAYLOAD_PB2 = "beta_grpc_plugin_test.payload.test_payload_pb2"
63_REQUESTS_PB2 = "beta_grpc_plugin_test.requests.r.test_requests_pb2"
64_RESPONSES_PB2 = "beta_grpc_plugin_test.responses.test_responses_pb2"
65_SERVICE_PB2 = "beta_grpc_plugin_test.service.test_service_pb2"
66
67# Identifiers of entities we expect to find in the generated module.
68SERVICER_IDENTIFIER = "BetaTestServiceServicer"
69STUB_IDENTIFIER = "BetaTestServiceStub"
70SERVER_FACTORY_IDENTIFIER = "beta_create_TestService_server"
71STUB_FACTORY_IDENTIFIER = "beta_create_TestService_stub"
72
73
74@contextlib.contextmanager
75def _system_path(path_insertion):
76    old_system_path = sys.path[:]
77    sys.path = sys.path[0:1] + path_insertion + sys.path[1:]
78    yield
79    sys.path = old_system_path
80
81
82def _create_directory_tree(root, path_components_sequence):
83    created = set()
84    for path_components in path_components_sequence:
85        thus_far = ""
86        for path_component in path_components:
87            relative_path = path.join(thus_far, path_component)
88            if relative_path not in created:
89                os.makedirs(path.join(root, relative_path))
90                created.add(relative_path)
91            thus_far = path.join(thus_far, path_component)
92
93
94def _massage_proto_content(raw_proto_content):
95    imports_substituted = raw_proto_content.replace(
96        b'import "tests/protoc_plugin/protos/',
97        b'import "beta_grpc_plugin_test/',
98    )
99    package_statement_substituted = imports_substituted.replace(
100        b"package grpc_protoc_plugin;", b"package beta_grpc_protoc_plugin;"
101    )
102    return package_statement_substituted
103
104
105def _packagify(directory):
106    for subdirectory, _, _ in os.walk(directory):
107        init_file_name = path.join(subdirectory, "__init__.py")
108        with open(init_file_name, "wb") as init_file:
109            init_file.write(b"")
110
111
112class _ServicerMethods(object):
113    def __init__(self, payload_pb2, responses_pb2):
114        self._condition = threading.Condition()
115        self._paused = False
116        self._fail = False
117        self._payload_pb2 = payload_pb2
118        self._responses_pb2 = responses_pb2
119
120    @contextlib.contextmanager
121    def pause(self):  # pylint: disable=invalid-name
122        with self._condition:
123            self._paused = True
124        yield
125        with self._condition:
126            self._paused = False
127            self._condition.notify_all()
128
129    @contextlib.contextmanager
130    def fail(self):  # pylint: disable=invalid-name
131        with self._condition:
132            self._fail = True
133        yield
134        with self._condition:
135            self._fail = False
136
137    def _control(self):  # pylint: disable=invalid-name
138        with self._condition:
139            if self._fail:
140                raise ValueError()
141            while self._paused:
142                self._condition.wait()
143
144    def UnaryCall(self, request, unused_rpc_context):
145        response = self._responses_pb2.SimpleResponse()
146        response.payload.payload_type = self._payload_pb2.COMPRESSABLE
147        response.payload.payload_compressable = "a" * request.response_size
148        self._control()
149        return response
150
151    def StreamingOutputCall(self, request, unused_rpc_context):
152        for parameter in request.response_parameters:
153            response = self._responses_pb2.StreamingOutputCallResponse()
154            response.payload.payload_type = self._payload_pb2.COMPRESSABLE
155            response.payload.payload_compressable = "a" * parameter.size
156            self._control()
157            yield response
158
159    def StreamingInputCall(self, request_iter, unused_rpc_context):
160        response = self._responses_pb2.StreamingInputCallResponse()
161        aggregated_payload_size = 0
162        for request in request_iter:
163            aggregated_payload_size += len(request.payload.payload_compressable)
164        response.aggregated_payload_size = aggregated_payload_size
165        self._control()
166        return response
167
168    def FullDuplexCall(self, request_iter, unused_rpc_context):
169        for request in request_iter:
170            for parameter in request.response_parameters:
171                response = self._responses_pb2.StreamingOutputCallResponse()
172                response.payload.payload_type = self._payload_pb2.COMPRESSABLE
173                response.payload.payload_compressable = "a" * parameter.size
174                self._control()
175                yield response
176
177    def HalfDuplexCall(self, request_iter, unused_rpc_context):
178        responses = []
179        for request in request_iter:
180            for parameter in request.response_parameters:
181                response = self._responses_pb2.StreamingOutputCallResponse()
182                response.payload.payload_type = self._payload_pb2.COMPRESSABLE
183                response.payload.payload_compressable = "a" * parameter.size
184                self._control()
185                responses.append(response)
186        for response in responses:
187            yield response
188
189
190@contextlib.contextmanager
191def _CreateService(payload_pb2, responses_pb2, service_pb2):
192    """Provides a servicer backend and a stub.
193
194    The servicer is just the implementation of the actual servicer passed to the
195    face player of the python RPC implementation; the two are detached.
196
197    Yields:
198      A (servicer_methods, stub) pair where servicer_methods is the back-end of
199        the service bound to the stub and stub is the stub on which to invoke
200        RPCs.
201    """
202    servicer_methods = _ServicerMethods(payload_pb2, responses_pb2)
203
204    class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
205        def UnaryCall(self, request, context):
206            return servicer_methods.UnaryCall(request, context)
207
208        def StreamingOutputCall(self, request, context):
209            return servicer_methods.StreamingOutputCall(request, context)
210
211        def StreamingInputCall(self, request_iter, context):
212            return servicer_methods.StreamingInputCall(request_iter, context)
213
214        def FullDuplexCall(self, request_iter, context):
215            return servicer_methods.FullDuplexCall(request_iter, context)
216
217        def HalfDuplexCall(self, request_iter, context):
218            return servicer_methods.HalfDuplexCall(request_iter, context)
219
220    servicer = Servicer()
221    server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
222    port = server.add_insecure_port("[::]:0")
223    server.start()
224    channel = implementations.insecure_channel("localhost", port)
225    stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
226    yield servicer_methods, stub
227    server.stop(0)
228
229
230@contextlib.contextmanager
231def _CreateIncompleteService(service_pb2):
232    """Provides a servicer backend that fails to implement methods and its stub.
233
234    The servicer is just the implementation of the actual servicer passed to the
235    face player of the python RPC implementation; the two are detached.
236    Args:
237      service_pb2: The service_pb2 module generated by this test.
238    Yields:
239      A (servicer_methods, stub) pair where servicer_methods is the back-end of
240        the service bound to the stub and stub is the stub on which to invoke
241        RPCs.
242    """
243
244    class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
245        pass
246
247    servicer = Servicer()
248    server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
249    port = server.add_insecure_port("[::]:0")
250    server.start()
251    channel = implementations.insecure_channel("localhost", port)
252    stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
253    yield None, stub
254    server.stop(0)
255
256
257def _streaming_input_request_iterator(payload_pb2, requests_pb2):
258    for _ in range(3):
259        request = requests_pb2.StreamingInputCallRequest()
260        request.payload.payload_type = payload_pb2.COMPRESSABLE
261        request.payload.payload_compressable = "a"
262        yield request
263
264
265def _streaming_output_request(requests_pb2):
266    request = requests_pb2.StreamingOutputCallRequest()
267    sizes = [1, 2, 3]
268    request.response_parameters.add(size=sizes[0], interval_us=0)
269    request.response_parameters.add(size=sizes[1], interval_us=0)
270    request.response_parameters.add(size=sizes[2], interval_us=0)
271    return request
272
273
274def _full_duplex_request_iterator(requests_pb2):
275    request = requests_pb2.StreamingOutputCallRequest()
276    request.response_parameters.add(size=1, interval_us=0)
277    yield request
278    request = requests_pb2.StreamingOutputCallRequest()
279    request.response_parameters.add(size=2, interval_us=0)
280    request.response_parameters.add(size=3, interval_us=0)
281    yield request
282
283
284class PythonPluginTest(unittest.TestCase):
285    """Test case for the gRPC Python protoc-plugin.
286
287    While reading these tests, remember that the futures API
288    (`stub.method.future()`) only gives futures for the *response-unary*
289    methods and does not exist for response-streaming methods.
290    """
291
292    def setUp(self):
293        self._directory = tempfile.mkdtemp(dir=".")
294        self._proto_path = path.join(self._directory, _RELATIVE_PROTO_PATH)
295        self._python_out = path.join(self._directory, _RELATIVE_PYTHON_OUT)
296
297        os.makedirs(self._proto_path)
298        os.makedirs(self._python_out)
299
300        directories_path_components = {
301            proto_file_path_components[:-1]
302            for proto_file_path_components in _PROTO_FILES_PATH_COMPONENTS
303        }
304        _create_directory_tree(self._proto_path, directories_path_components)
305        self._proto_file_names = set()
306        for proto_file_path_components in _PROTO_FILES_PATH_COMPONENTS:
307            raw_proto_content = pkgutil.get_data(
308                "tests.protoc_plugin.protos",
309                path.join(*proto_file_path_components[1:]),
310            )
311            massaged_proto_content = _massage_proto_content(raw_proto_content)
312            proto_file_name = path.join(
313                self._proto_path, *proto_file_path_components
314            )
315            with open(proto_file_name, "wb") as proto_file:
316                proto_file.write(massaged_proto_content)
317            self._proto_file_names.add(proto_file_name)
318
319    def tearDown(self):
320        shutil.rmtree(self._directory)
321
322    def _protoc(self):
323        args = [
324            "",
325            "--proto_path={}".format(self._proto_path),
326            "--python_out={}".format(self._python_out),
327            "--grpc_python_out=grpc_1_0:{}".format(self._python_out),
328        ] + list(self._proto_file_names)
329        protoc_exit_code = protoc.main(args)
330        self.assertEqual(0, protoc_exit_code)
331
332        _packagify(self._python_out)
333
334        with _system_path([self._python_out]):
335            self._payload_pb2 = importlib.import_module(_PAYLOAD_PB2)
336            self._requests_pb2 = importlib.import_module(_REQUESTS_PB2)
337            self._responses_pb2 = importlib.import_module(_RESPONSES_PB2)
338            self._service_pb2 = importlib.import_module(_SERVICE_PB2)
339
340    def testImportAttributes(self):
341        self._protoc()
342
343        # check that we can access the generated module and its members.
344        self.assertIsNotNone(
345            getattr(self._service_pb2, SERVICER_IDENTIFIER, None)
346        )
347        self.assertIsNotNone(getattr(self._service_pb2, STUB_IDENTIFIER, None))
348        self.assertIsNotNone(
349            getattr(self._service_pb2, SERVER_FACTORY_IDENTIFIER, None)
350        )
351        self.assertIsNotNone(
352            getattr(self._service_pb2, STUB_FACTORY_IDENTIFIER, None)
353        )
354
355    def testUpDown(self):
356        self._protoc()
357
358        with _CreateService(
359            self._payload_pb2, self._responses_pb2, self._service_pb2
360        ):
361            self._requests_pb2.SimpleRequest(response_size=13)
362
363    def testIncompleteServicer(self):
364        self._protoc()
365
366        with _CreateIncompleteService(self._service_pb2) as (_, stub):
367            request = self._requests_pb2.SimpleRequest(response_size=13)
368            try:
369                stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
370            except face.AbortionError as error:
371                self.assertEqual(
372                    interfaces.StatusCode.UNIMPLEMENTED, error.code
373                )
374
375    def testUnaryCall(self):
376        self._protoc()
377
378        with _CreateService(
379            self._payload_pb2, self._responses_pb2, self._service_pb2
380        ) as (methods, stub):
381            request = self._requests_pb2.SimpleRequest(response_size=13)
382            response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
383        expected_response = methods.UnaryCall(request, "not a real context!")
384        self.assertEqual(expected_response, response)
385
386    def testUnaryCallFuture(self):
387        self._protoc()
388
389        with _CreateService(
390            self._payload_pb2, self._responses_pb2, self._service_pb2
391        ) as (methods, stub):
392            request = self._requests_pb2.SimpleRequest(response_size=13)
393            # Check that the call does not block waiting for the server to respond.
394            with methods.pause():
395                response_future = stub.UnaryCall.future(
396                    request, test_constants.LONG_TIMEOUT
397                )
398            response = response_future.result()
399        expected_response = methods.UnaryCall(request, "not a real RpcContext!")
400        self.assertEqual(expected_response, response)
401
402    def testUnaryCallFutureExpired(self):
403        self._protoc()
404
405        with _CreateService(
406            self._payload_pb2, self._responses_pb2, self._service_pb2
407        ) as (methods, stub):
408            request = self._requests_pb2.SimpleRequest(response_size=13)
409            with methods.pause():
410                response_future = stub.UnaryCall.future(
411                    request, test_constants.SHORT_TIMEOUT
412                )
413                with self.assertRaises(face.ExpirationError):
414                    response_future.result()
415
416    def testUnaryCallFutureCancelled(self):
417        self._protoc()
418
419        with _CreateService(
420            self._payload_pb2, self._responses_pb2, self._service_pb2
421        ) as (methods, stub):
422            request = self._requests_pb2.SimpleRequest(response_size=13)
423            with methods.pause():
424                response_future = stub.UnaryCall.future(request, 1)
425                response_future.cancel()
426                self.assertTrue(response_future.cancelled())
427
428    def testUnaryCallFutureFailed(self):
429        self._protoc()
430
431        with _CreateService(
432            self._payload_pb2, self._responses_pb2, self._service_pb2
433        ) as (methods, stub):
434            request = self._requests_pb2.SimpleRequest(response_size=13)
435            with methods.fail():
436                response_future = stub.UnaryCall.future(
437                    request, test_constants.LONG_TIMEOUT
438                )
439                self.assertIsNotNone(response_future.exception())
440
441    def testStreamingOutputCall(self):
442        self._protoc()
443
444        with _CreateService(
445            self._payload_pb2, self._responses_pb2, self._service_pb2
446        ) as (methods, stub):
447            request = _streaming_output_request(self._requests_pb2)
448            responses = stub.StreamingOutputCall(
449                request, test_constants.LONG_TIMEOUT
450            )
451            expected_responses = methods.StreamingOutputCall(
452                request, "not a real RpcContext!"
453            )
454            for expected_response, response in itertools.zip_longest(
455                expected_responses, responses
456            ):
457                self.assertEqual(expected_response, response)
458
459    def testStreamingOutputCallExpired(self):
460        self._protoc()
461
462        with _CreateService(
463            self._payload_pb2, self._responses_pb2, self._service_pb2
464        ) as (methods, stub):
465            request = _streaming_output_request(self._requests_pb2)
466            with methods.pause():
467                responses = stub.StreamingOutputCall(
468                    request, test_constants.SHORT_TIMEOUT
469                )
470                with self.assertRaises(face.ExpirationError):
471                    list(responses)
472
473    def testStreamingOutputCallCancelled(self):
474        self._protoc()
475
476        with _CreateService(
477            self._payload_pb2, self._responses_pb2, self._service_pb2
478        ) as (methods, stub):
479            request = _streaming_output_request(self._requests_pb2)
480            responses = stub.StreamingOutputCall(
481                request, test_constants.LONG_TIMEOUT
482            )
483            next(responses)
484            responses.cancel()
485            with self.assertRaises(face.CancellationError):
486                next(responses)
487
488    def testStreamingOutputCallFailed(self):
489        self._protoc()
490
491        with _CreateService(
492            self._payload_pb2, self._responses_pb2, self._service_pb2
493        ) as (methods, stub):
494            request = _streaming_output_request(self._requests_pb2)
495            with methods.fail():
496                responses = stub.StreamingOutputCall(request, 1)
497                self.assertIsNotNone(responses)
498                with self.assertRaises(face.RemoteError):
499                    next(responses)
500
501    def testStreamingInputCall(self):
502        self._protoc()
503
504        with _CreateService(
505            self._payload_pb2, self._responses_pb2, self._service_pb2
506        ) as (methods, stub):
507            response = stub.StreamingInputCall(
508                _streaming_input_request_iterator(
509                    self._payload_pb2, self._requests_pb2
510                ),
511                test_constants.LONG_TIMEOUT,
512            )
513        expected_response = methods.StreamingInputCall(
514            _streaming_input_request_iterator(
515                self._payload_pb2, self._requests_pb2
516            ),
517            "not a real RpcContext!",
518        )
519        self.assertEqual(expected_response, response)
520
521    def testStreamingInputCallFuture(self):
522        self._protoc()
523
524        with _CreateService(
525            self._payload_pb2, self._responses_pb2, self._service_pb2
526        ) as (methods, stub):
527            with methods.pause():
528                response_future = stub.StreamingInputCall.future(
529                    _streaming_input_request_iterator(
530                        self._payload_pb2, self._requests_pb2
531                    ),
532                    test_constants.LONG_TIMEOUT,
533                )
534            response = response_future.result()
535        expected_response = methods.StreamingInputCall(
536            _streaming_input_request_iterator(
537                self._payload_pb2, self._requests_pb2
538            ),
539            "not a real RpcContext!",
540        )
541        self.assertEqual(expected_response, response)
542
543    def testStreamingInputCallFutureExpired(self):
544        self._protoc()
545
546        with _CreateService(
547            self._payload_pb2, self._responses_pb2, self._service_pb2
548        ) as (methods, stub):
549            with methods.pause():
550                response_future = stub.StreamingInputCall.future(
551                    _streaming_input_request_iterator(
552                        self._payload_pb2, self._requests_pb2
553                    ),
554                    test_constants.SHORT_TIMEOUT,
555                )
556                with self.assertRaises(face.ExpirationError):
557                    response_future.result()
558                self.assertIsInstance(
559                    response_future.exception(), face.ExpirationError
560                )
561
562    def testStreamingInputCallFutureCancelled(self):
563        self._protoc()
564
565        with _CreateService(
566            self._payload_pb2, self._responses_pb2, self._service_pb2
567        ) as (methods, stub):
568            with methods.pause():
569                response_future = stub.StreamingInputCall.future(
570                    _streaming_input_request_iterator(
571                        self._payload_pb2, self._requests_pb2
572                    ),
573                    test_constants.LONG_TIMEOUT,
574                )
575                response_future.cancel()
576                self.assertTrue(response_future.cancelled())
577            with self.assertRaises(future.CancelledError):
578                response_future.result()
579
580    def testStreamingInputCallFutureFailed(self):
581        self._protoc()
582
583        with _CreateService(
584            self._payload_pb2, self._responses_pb2, self._service_pb2
585        ) as (methods, stub):
586            with methods.fail():
587                response_future = stub.StreamingInputCall.future(
588                    _streaming_input_request_iterator(
589                        self._payload_pb2, self._requests_pb2
590                    ),
591                    test_constants.LONG_TIMEOUT,
592                )
593                self.assertIsNotNone(response_future.exception())
594
595    def testFullDuplexCall(self):
596        self._protoc()
597
598        with _CreateService(
599            self._payload_pb2, self._responses_pb2, self._service_pb2
600        ) as (methods, stub):
601            responses = stub.FullDuplexCall(
602                _full_duplex_request_iterator(self._requests_pb2),
603                test_constants.LONG_TIMEOUT,
604            )
605            expected_responses = methods.FullDuplexCall(
606                _full_duplex_request_iterator(self._requests_pb2),
607                "not a real RpcContext!",
608            )
609            for expected_response, response in itertools.zip_longest(
610                expected_responses, responses
611            ):
612                self.assertEqual(expected_response, response)
613
614    def testFullDuplexCallExpired(self):
615        self._protoc()
616
617        request_iterator = _full_duplex_request_iterator(self._requests_pb2)
618        with _CreateService(
619            self._payload_pb2, self._responses_pb2, self._service_pb2
620        ) as (methods, stub):
621            with methods.pause():
622                responses = stub.FullDuplexCall(
623                    request_iterator, test_constants.SHORT_TIMEOUT
624                )
625                with self.assertRaises(face.ExpirationError):
626                    list(responses)
627
628    def testFullDuplexCallCancelled(self):
629        self._protoc()
630
631        with _CreateService(
632            self._payload_pb2, self._responses_pb2, self._service_pb2
633        ) as (methods, stub):
634            request_iterator = _full_duplex_request_iterator(self._requests_pb2)
635            responses = stub.FullDuplexCall(
636                request_iterator, test_constants.LONG_TIMEOUT
637            )
638            next(responses)
639            responses.cancel()
640            with self.assertRaises(face.CancellationError):
641                next(responses)
642
643    def testFullDuplexCallFailed(self):
644        self._protoc()
645
646        request_iterator = _full_duplex_request_iterator(self._requests_pb2)
647        with _CreateService(
648            self._payload_pb2, self._responses_pb2, self._service_pb2
649        ) as (methods, stub):
650            with methods.fail():
651                responses = stub.FullDuplexCall(
652                    request_iterator, test_constants.LONG_TIMEOUT
653                )
654                self.assertIsNotNone(responses)
655                with self.assertRaises(face.RemoteError):
656                    next(responses)
657
658    def testHalfDuplexCall(self):
659        self._protoc()
660
661        with _CreateService(
662            self._payload_pb2, self._responses_pb2, self._service_pb2
663        ) as (methods, stub):
664
665            def half_duplex_request_iterator():
666                request = self._requests_pb2.StreamingOutputCallRequest()
667                request.response_parameters.add(size=1, interval_us=0)
668                yield request
669                request = self._requests_pb2.StreamingOutputCallRequest()
670                request.response_parameters.add(size=2, interval_us=0)
671                request.response_parameters.add(size=3, interval_us=0)
672                yield request
673
674            responses = stub.HalfDuplexCall(
675                half_duplex_request_iterator(), test_constants.LONG_TIMEOUT
676            )
677            expected_responses = methods.HalfDuplexCall(
678                half_duplex_request_iterator(), "not a real RpcContext!"
679            )
680            for check in itertools.zip_longest(expected_responses, responses):
681                expected_response, response = check
682                self.assertEqual(expected_response, response)
683
684    def testHalfDuplexCallWedged(self):
685        self._protoc()
686
687        condition = threading.Condition()
688        wait_cell = [False]
689
690        @contextlib.contextmanager
691        def wait():  # pylint: disable=invalid-name
692            # Where's Python 3's 'nonlocal' statement when you need it?
693            with condition:
694                wait_cell[0] = True
695            yield
696            with condition:
697                wait_cell[0] = False
698                condition.notify_all()
699
700        def half_duplex_request_iterator():
701            request = self._requests_pb2.StreamingOutputCallRequest()
702            request.response_parameters.add(size=1, interval_us=0)
703            yield request
704            with condition:
705                while wait_cell[0]:
706                    condition.wait()
707
708        with _CreateService(
709            self._payload_pb2, self._responses_pb2, self._service_pb2
710        ) as (methods, stub):
711            with wait():
712                responses = stub.HalfDuplexCall(
713                    half_duplex_request_iterator(), test_constants.SHORT_TIMEOUT
714                )
715                # half-duplex waits for the client to send all info
716                with self.assertRaises(face.ExpirationError):
717                    next(responses)
718
719
720if __name__ == "__main__":
721    unittest.main(verbosity=2)
722