1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import contextlib 16import importlib 17import itertools 18import os 19from os import path 20import pkgutil 21import shutil 22import sys 23import tempfile 24import threading 25import unittest 26 27from grpc.beta import implementations 28from grpc.beta import interfaces 29from grpc.framework.foundation import future 30from grpc.framework.interfaces.face import face 31from grpc_tools import protoc 32 33from tests.unit.framework.common import test_constants 34 35_RELATIVE_PROTO_PATH = "relative_proto_path" 36_RELATIVE_PYTHON_OUT = "relative_python_out" 37 38_PROTO_FILES_PATH_COMPONENTS = ( 39 ( 40 "beta_grpc_plugin_test", 41 "payload", 42 "test_payload.proto", 43 ), 44 ( 45 "beta_grpc_plugin_test", 46 "requests", 47 "r", 48 "test_requests.proto", 49 ), 50 ( 51 "beta_grpc_plugin_test", 52 "responses", 53 "test_responses.proto", 54 ), 55 ( 56 "beta_grpc_plugin_test", 57 "service", 58 "test_service.proto", 59 ), 60) 61 62_PAYLOAD_PB2 = "beta_grpc_plugin_test.payload.test_payload_pb2" 63_REQUESTS_PB2 = "beta_grpc_plugin_test.requests.r.test_requests_pb2" 64_RESPONSES_PB2 = "beta_grpc_plugin_test.responses.test_responses_pb2" 65_SERVICE_PB2 = "beta_grpc_plugin_test.service.test_service_pb2" 66 67# Identifiers of entities we expect to find in the generated module. 68SERVICER_IDENTIFIER = "BetaTestServiceServicer" 69STUB_IDENTIFIER = "BetaTestServiceStub" 70SERVER_FACTORY_IDENTIFIER = "beta_create_TestService_server" 71STUB_FACTORY_IDENTIFIER = "beta_create_TestService_stub" 72 73 74@contextlib.contextmanager 75def _system_path(path_insertion): 76 old_system_path = sys.path[:] 77 sys.path = sys.path[0:1] + path_insertion + sys.path[1:] 78 yield 79 sys.path = old_system_path 80 81 82def _create_directory_tree(root, path_components_sequence): 83 created = set() 84 for path_components in path_components_sequence: 85 thus_far = "" 86 for path_component in path_components: 87 relative_path = path.join(thus_far, path_component) 88 if relative_path not in created: 89 os.makedirs(path.join(root, relative_path)) 90 created.add(relative_path) 91 thus_far = path.join(thus_far, path_component) 92 93 94def _massage_proto_content(raw_proto_content): 95 imports_substituted = raw_proto_content.replace( 96 b'import "tests/protoc_plugin/protos/', 97 b'import "beta_grpc_plugin_test/', 98 ) 99 package_statement_substituted = imports_substituted.replace( 100 b"package grpc_protoc_plugin;", b"package beta_grpc_protoc_plugin;" 101 ) 102 return package_statement_substituted 103 104 105def _packagify(directory): 106 for subdirectory, _, _ in os.walk(directory): 107 init_file_name = path.join(subdirectory, "__init__.py") 108 with open(init_file_name, "wb") as init_file: 109 init_file.write(b"") 110 111 112class _ServicerMethods(object): 113 def __init__(self, payload_pb2, responses_pb2): 114 self._condition = threading.Condition() 115 self._paused = False 116 self._fail = False 117 self._payload_pb2 = payload_pb2 118 self._responses_pb2 = responses_pb2 119 120 @contextlib.contextmanager 121 def pause(self): # pylint: disable=invalid-name 122 with self._condition: 123 self._paused = True 124 yield 125 with self._condition: 126 self._paused = False 127 self._condition.notify_all() 128 129 @contextlib.contextmanager 130 def fail(self): # pylint: disable=invalid-name 131 with self._condition: 132 self._fail = True 133 yield 134 with self._condition: 135 self._fail = False 136 137 def _control(self): # pylint: disable=invalid-name 138 with self._condition: 139 if self._fail: 140 raise ValueError() 141 while self._paused: 142 self._condition.wait() 143 144 def UnaryCall(self, request, unused_rpc_context): 145 response = self._responses_pb2.SimpleResponse() 146 response.payload.payload_type = self._payload_pb2.COMPRESSABLE 147 response.payload.payload_compressable = "a" * request.response_size 148 self._control() 149 return response 150 151 def StreamingOutputCall(self, request, unused_rpc_context): 152 for parameter in request.response_parameters: 153 response = self._responses_pb2.StreamingOutputCallResponse() 154 response.payload.payload_type = self._payload_pb2.COMPRESSABLE 155 response.payload.payload_compressable = "a" * parameter.size 156 self._control() 157 yield response 158 159 def StreamingInputCall(self, request_iter, unused_rpc_context): 160 response = self._responses_pb2.StreamingInputCallResponse() 161 aggregated_payload_size = 0 162 for request in request_iter: 163 aggregated_payload_size += len(request.payload.payload_compressable) 164 response.aggregated_payload_size = aggregated_payload_size 165 self._control() 166 return response 167 168 def FullDuplexCall(self, request_iter, unused_rpc_context): 169 for request in request_iter: 170 for parameter in request.response_parameters: 171 response = self._responses_pb2.StreamingOutputCallResponse() 172 response.payload.payload_type = self._payload_pb2.COMPRESSABLE 173 response.payload.payload_compressable = "a" * parameter.size 174 self._control() 175 yield response 176 177 def HalfDuplexCall(self, request_iter, unused_rpc_context): 178 responses = [] 179 for request in request_iter: 180 for parameter in request.response_parameters: 181 response = self._responses_pb2.StreamingOutputCallResponse() 182 response.payload.payload_type = self._payload_pb2.COMPRESSABLE 183 response.payload.payload_compressable = "a" * parameter.size 184 self._control() 185 responses.append(response) 186 for response in responses: 187 yield response 188 189 190@contextlib.contextmanager 191def _CreateService(payload_pb2, responses_pb2, service_pb2): 192 """Provides a servicer backend and a stub. 193 194 The servicer is just the implementation of the actual servicer passed to the 195 face player of the python RPC implementation; the two are detached. 196 197 Yields: 198 A (servicer_methods, stub) pair where servicer_methods is the back-end of 199 the service bound to the stub and stub is the stub on which to invoke 200 RPCs. 201 """ 202 servicer_methods = _ServicerMethods(payload_pb2, responses_pb2) 203 204 class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)): 205 def UnaryCall(self, request, context): 206 return servicer_methods.UnaryCall(request, context) 207 208 def StreamingOutputCall(self, request, context): 209 return servicer_methods.StreamingOutputCall(request, context) 210 211 def StreamingInputCall(self, request_iter, context): 212 return servicer_methods.StreamingInputCall(request_iter, context) 213 214 def FullDuplexCall(self, request_iter, context): 215 return servicer_methods.FullDuplexCall(request_iter, context) 216 217 def HalfDuplexCall(self, request_iter, context): 218 return servicer_methods.HalfDuplexCall(request_iter, context) 219 220 servicer = Servicer() 221 server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer) 222 port = server.add_insecure_port("[::]:0") 223 server.start() 224 channel = implementations.insecure_channel("localhost", port) 225 stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel) 226 yield servicer_methods, stub 227 server.stop(0) 228 229 230@contextlib.contextmanager 231def _CreateIncompleteService(service_pb2): 232 """Provides a servicer backend that fails to implement methods and its stub. 233 234 The servicer is just the implementation of the actual servicer passed to the 235 face player of the python RPC implementation; the two are detached. 236 Args: 237 service_pb2: The service_pb2 module generated by this test. 238 Yields: 239 A (servicer_methods, stub) pair where servicer_methods is the back-end of 240 the service bound to the stub and stub is the stub on which to invoke 241 RPCs. 242 """ 243 244 class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)): 245 pass 246 247 servicer = Servicer() 248 server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer) 249 port = server.add_insecure_port("[::]:0") 250 server.start() 251 channel = implementations.insecure_channel("localhost", port) 252 stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel) 253 yield None, stub 254 server.stop(0) 255 256 257def _streaming_input_request_iterator(payload_pb2, requests_pb2): 258 for _ in range(3): 259 request = requests_pb2.StreamingInputCallRequest() 260 request.payload.payload_type = payload_pb2.COMPRESSABLE 261 request.payload.payload_compressable = "a" 262 yield request 263 264 265def _streaming_output_request(requests_pb2): 266 request = requests_pb2.StreamingOutputCallRequest() 267 sizes = [1, 2, 3] 268 request.response_parameters.add(size=sizes[0], interval_us=0) 269 request.response_parameters.add(size=sizes[1], interval_us=0) 270 request.response_parameters.add(size=sizes[2], interval_us=0) 271 return request 272 273 274def _full_duplex_request_iterator(requests_pb2): 275 request = requests_pb2.StreamingOutputCallRequest() 276 request.response_parameters.add(size=1, interval_us=0) 277 yield request 278 request = requests_pb2.StreamingOutputCallRequest() 279 request.response_parameters.add(size=2, interval_us=0) 280 request.response_parameters.add(size=3, interval_us=0) 281 yield request 282 283 284class PythonPluginTest(unittest.TestCase): 285 """Test case for the gRPC Python protoc-plugin. 286 287 While reading these tests, remember that the futures API 288 (`stub.method.future()`) only gives futures for the *response-unary* 289 methods and does not exist for response-streaming methods. 290 """ 291 292 def setUp(self): 293 self._directory = tempfile.mkdtemp(dir=".") 294 self._proto_path = path.join(self._directory, _RELATIVE_PROTO_PATH) 295 self._python_out = path.join(self._directory, _RELATIVE_PYTHON_OUT) 296 297 os.makedirs(self._proto_path) 298 os.makedirs(self._python_out) 299 300 directories_path_components = { 301 proto_file_path_components[:-1] 302 for proto_file_path_components in _PROTO_FILES_PATH_COMPONENTS 303 } 304 _create_directory_tree(self._proto_path, directories_path_components) 305 self._proto_file_names = set() 306 for proto_file_path_components in _PROTO_FILES_PATH_COMPONENTS: 307 raw_proto_content = pkgutil.get_data( 308 "tests.protoc_plugin.protos", 309 path.join(*proto_file_path_components[1:]), 310 ) 311 massaged_proto_content = _massage_proto_content(raw_proto_content) 312 proto_file_name = path.join( 313 self._proto_path, *proto_file_path_components 314 ) 315 with open(proto_file_name, "wb") as proto_file: 316 proto_file.write(massaged_proto_content) 317 self._proto_file_names.add(proto_file_name) 318 319 def tearDown(self): 320 shutil.rmtree(self._directory) 321 322 def _protoc(self): 323 args = [ 324 "", 325 "--proto_path={}".format(self._proto_path), 326 "--python_out={}".format(self._python_out), 327 "--grpc_python_out=grpc_1_0:{}".format(self._python_out), 328 ] + list(self._proto_file_names) 329 protoc_exit_code = protoc.main(args) 330 self.assertEqual(0, protoc_exit_code) 331 332 _packagify(self._python_out) 333 334 with _system_path([self._python_out]): 335 self._payload_pb2 = importlib.import_module(_PAYLOAD_PB2) 336 self._requests_pb2 = importlib.import_module(_REQUESTS_PB2) 337 self._responses_pb2 = importlib.import_module(_RESPONSES_PB2) 338 self._service_pb2 = importlib.import_module(_SERVICE_PB2) 339 340 def testImportAttributes(self): 341 self._protoc() 342 343 # check that we can access the generated module and its members. 344 self.assertIsNotNone( 345 getattr(self._service_pb2, SERVICER_IDENTIFIER, None) 346 ) 347 self.assertIsNotNone(getattr(self._service_pb2, STUB_IDENTIFIER, None)) 348 self.assertIsNotNone( 349 getattr(self._service_pb2, SERVER_FACTORY_IDENTIFIER, None) 350 ) 351 self.assertIsNotNone( 352 getattr(self._service_pb2, STUB_FACTORY_IDENTIFIER, None) 353 ) 354 355 def testUpDown(self): 356 self._protoc() 357 358 with _CreateService( 359 self._payload_pb2, self._responses_pb2, self._service_pb2 360 ): 361 self._requests_pb2.SimpleRequest(response_size=13) 362 363 def testIncompleteServicer(self): 364 self._protoc() 365 366 with _CreateIncompleteService(self._service_pb2) as (_, stub): 367 request = self._requests_pb2.SimpleRequest(response_size=13) 368 try: 369 stub.UnaryCall(request, test_constants.LONG_TIMEOUT) 370 except face.AbortionError as error: 371 self.assertEqual( 372 interfaces.StatusCode.UNIMPLEMENTED, error.code 373 ) 374 375 def testUnaryCall(self): 376 self._protoc() 377 378 with _CreateService( 379 self._payload_pb2, self._responses_pb2, self._service_pb2 380 ) as (methods, stub): 381 request = self._requests_pb2.SimpleRequest(response_size=13) 382 response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT) 383 expected_response = methods.UnaryCall(request, "not a real context!") 384 self.assertEqual(expected_response, response) 385 386 def testUnaryCallFuture(self): 387 self._protoc() 388 389 with _CreateService( 390 self._payload_pb2, self._responses_pb2, self._service_pb2 391 ) as (methods, stub): 392 request = self._requests_pb2.SimpleRequest(response_size=13) 393 # Check that the call does not block waiting for the server to respond. 394 with methods.pause(): 395 response_future = stub.UnaryCall.future( 396 request, test_constants.LONG_TIMEOUT 397 ) 398 response = response_future.result() 399 expected_response = methods.UnaryCall(request, "not a real RpcContext!") 400 self.assertEqual(expected_response, response) 401 402 def testUnaryCallFutureExpired(self): 403 self._protoc() 404 405 with _CreateService( 406 self._payload_pb2, self._responses_pb2, self._service_pb2 407 ) as (methods, stub): 408 request = self._requests_pb2.SimpleRequest(response_size=13) 409 with methods.pause(): 410 response_future = stub.UnaryCall.future( 411 request, test_constants.SHORT_TIMEOUT 412 ) 413 with self.assertRaises(face.ExpirationError): 414 response_future.result() 415 416 def testUnaryCallFutureCancelled(self): 417 self._protoc() 418 419 with _CreateService( 420 self._payload_pb2, self._responses_pb2, self._service_pb2 421 ) as (methods, stub): 422 request = self._requests_pb2.SimpleRequest(response_size=13) 423 with methods.pause(): 424 response_future = stub.UnaryCall.future(request, 1) 425 response_future.cancel() 426 self.assertTrue(response_future.cancelled()) 427 428 def testUnaryCallFutureFailed(self): 429 self._protoc() 430 431 with _CreateService( 432 self._payload_pb2, self._responses_pb2, self._service_pb2 433 ) as (methods, stub): 434 request = self._requests_pb2.SimpleRequest(response_size=13) 435 with methods.fail(): 436 response_future = stub.UnaryCall.future( 437 request, test_constants.LONG_TIMEOUT 438 ) 439 self.assertIsNotNone(response_future.exception()) 440 441 def testStreamingOutputCall(self): 442 self._protoc() 443 444 with _CreateService( 445 self._payload_pb2, self._responses_pb2, self._service_pb2 446 ) as (methods, stub): 447 request = _streaming_output_request(self._requests_pb2) 448 responses = stub.StreamingOutputCall( 449 request, test_constants.LONG_TIMEOUT 450 ) 451 expected_responses = methods.StreamingOutputCall( 452 request, "not a real RpcContext!" 453 ) 454 for expected_response, response in itertools.zip_longest( 455 expected_responses, responses 456 ): 457 self.assertEqual(expected_response, response) 458 459 def testStreamingOutputCallExpired(self): 460 self._protoc() 461 462 with _CreateService( 463 self._payload_pb2, self._responses_pb2, self._service_pb2 464 ) as (methods, stub): 465 request = _streaming_output_request(self._requests_pb2) 466 with methods.pause(): 467 responses = stub.StreamingOutputCall( 468 request, test_constants.SHORT_TIMEOUT 469 ) 470 with self.assertRaises(face.ExpirationError): 471 list(responses) 472 473 def testStreamingOutputCallCancelled(self): 474 self._protoc() 475 476 with _CreateService( 477 self._payload_pb2, self._responses_pb2, self._service_pb2 478 ) as (methods, stub): 479 request = _streaming_output_request(self._requests_pb2) 480 responses = stub.StreamingOutputCall( 481 request, test_constants.LONG_TIMEOUT 482 ) 483 next(responses) 484 responses.cancel() 485 with self.assertRaises(face.CancellationError): 486 next(responses) 487 488 def testStreamingOutputCallFailed(self): 489 self._protoc() 490 491 with _CreateService( 492 self._payload_pb2, self._responses_pb2, self._service_pb2 493 ) as (methods, stub): 494 request = _streaming_output_request(self._requests_pb2) 495 with methods.fail(): 496 responses = stub.StreamingOutputCall(request, 1) 497 self.assertIsNotNone(responses) 498 with self.assertRaises(face.RemoteError): 499 next(responses) 500 501 def testStreamingInputCall(self): 502 self._protoc() 503 504 with _CreateService( 505 self._payload_pb2, self._responses_pb2, self._service_pb2 506 ) as (methods, stub): 507 response = stub.StreamingInputCall( 508 _streaming_input_request_iterator( 509 self._payload_pb2, self._requests_pb2 510 ), 511 test_constants.LONG_TIMEOUT, 512 ) 513 expected_response = methods.StreamingInputCall( 514 _streaming_input_request_iterator( 515 self._payload_pb2, self._requests_pb2 516 ), 517 "not a real RpcContext!", 518 ) 519 self.assertEqual(expected_response, response) 520 521 def testStreamingInputCallFuture(self): 522 self._protoc() 523 524 with _CreateService( 525 self._payload_pb2, self._responses_pb2, self._service_pb2 526 ) as (methods, stub): 527 with methods.pause(): 528 response_future = stub.StreamingInputCall.future( 529 _streaming_input_request_iterator( 530 self._payload_pb2, self._requests_pb2 531 ), 532 test_constants.LONG_TIMEOUT, 533 ) 534 response = response_future.result() 535 expected_response = methods.StreamingInputCall( 536 _streaming_input_request_iterator( 537 self._payload_pb2, self._requests_pb2 538 ), 539 "not a real RpcContext!", 540 ) 541 self.assertEqual(expected_response, response) 542 543 def testStreamingInputCallFutureExpired(self): 544 self._protoc() 545 546 with _CreateService( 547 self._payload_pb2, self._responses_pb2, self._service_pb2 548 ) as (methods, stub): 549 with methods.pause(): 550 response_future = stub.StreamingInputCall.future( 551 _streaming_input_request_iterator( 552 self._payload_pb2, self._requests_pb2 553 ), 554 test_constants.SHORT_TIMEOUT, 555 ) 556 with self.assertRaises(face.ExpirationError): 557 response_future.result() 558 self.assertIsInstance( 559 response_future.exception(), face.ExpirationError 560 ) 561 562 def testStreamingInputCallFutureCancelled(self): 563 self._protoc() 564 565 with _CreateService( 566 self._payload_pb2, self._responses_pb2, self._service_pb2 567 ) as (methods, stub): 568 with methods.pause(): 569 response_future = stub.StreamingInputCall.future( 570 _streaming_input_request_iterator( 571 self._payload_pb2, self._requests_pb2 572 ), 573 test_constants.LONG_TIMEOUT, 574 ) 575 response_future.cancel() 576 self.assertTrue(response_future.cancelled()) 577 with self.assertRaises(future.CancelledError): 578 response_future.result() 579 580 def testStreamingInputCallFutureFailed(self): 581 self._protoc() 582 583 with _CreateService( 584 self._payload_pb2, self._responses_pb2, self._service_pb2 585 ) as (methods, stub): 586 with methods.fail(): 587 response_future = stub.StreamingInputCall.future( 588 _streaming_input_request_iterator( 589 self._payload_pb2, self._requests_pb2 590 ), 591 test_constants.LONG_TIMEOUT, 592 ) 593 self.assertIsNotNone(response_future.exception()) 594 595 def testFullDuplexCall(self): 596 self._protoc() 597 598 with _CreateService( 599 self._payload_pb2, self._responses_pb2, self._service_pb2 600 ) as (methods, stub): 601 responses = stub.FullDuplexCall( 602 _full_duplex_request_iterator(self._requests_pb2), 603 test_constants.LONG_TIMEOUT, 604 ) 605 expected_responses = methods.FullDuplexCall( 606 _full_duplex_request_iterator(self._requests_pb2), 607 "not a real RpcContext!", 608 ) 609 for expected_response, response in itertools.zip_longest( 610 expected_responses, responses 611 ): 612 self.assertEqual(expected_response, response) 613 614 def testFullDuplexCallExpired(self): 615 self._protoc() 616 617 request_iterator = _full_duplex_request_iterator(self._requests_pb2) 618 with _CreateService( 619 self._payload_pb2, self._responses_pb2, self._service_pb2 620 ) as (methods, stub): 621 with methods.pause(): 622 responses = stub.FullDuplexCall( 623 request_iterator, test_constants.SHORT_TIMEOUT 624 ) 625 with self.assertRaises(face.ExpirationError): 626 list(responses) 627 628 def testFullDuplexCallCancelled(self): 629 self._protoc() 630 631 with _CreateService( 632 self._payload_pb2, self._responses_pb2, self._service_pb2 633 ) as (methods, stub): 634 request_iterator = _full_duplex_request_iterator(self._requests_pb2) 635 responses = stub.FullDuplexCall( 636 request_iterator, test_constants.LONG_TIMEOUT 637 ) 638 next(responses) 639 responses.cancel() 640 with self.assertRaises(face.CancellationError): 641 next(responses) 642 643 def testFullDuplexCallFailed(self): 644 self._protoc() 645 646 request_iterator = _full_duplex_request_iterator(self._requests_pb2) 647 with _CreateService( 648 self._payload_pb2, self._responses_pb2, self._service_pb2 649 ) as (methods, stub): 650 with methods.fail(): 651 responses = stub.FullDuplexCall( 652 request_iterator, test_constants.LONG_TIMEOUT 653 ) 654 self.assertIsNotNone(responses) 655 with self.assertRaises(face.RemoteError): 656 next(responses) 657 658 def testHalfDuplexCall(self): 659 self._protoc() 660 661 with _CreateService( 662 self._payload_pb2, self._responses_pb2, self._service_pb2 663 ) as (methods, stub): 664 665 def half_duplex_request_iterator(): 666 request = self._requests_pb2.StreamingOutputCallRequest() 667 request.response_parameters.add(size=1, interval_us=0) 668 yield request 669 request = self._requests_pb2.StreamingOutputCallRequest() 670 request.response_parameters.add(size=2, interval_us=0) 671 request.response_parameters.add(size=3, interval_us=0) 672 yield request 673 674 responses = stub.HalfDuplexCall( 675 half_duplex_request_iterator(), test_constants.LONG_TIMEOUT 676 ) 677 expected_responses = methods.HalfDuplexCall( 678 half_duplex_request_iterator(), "not a real RpcContext!" 679 ) 680 for check in itertools.zip_longest(expected_responses, responses): 681 expected_response, response = check 682 self.assertEqual(expected_response, response) 683 684 def testHalfDuplexCallWedged(self): 685 self._protoc() 686 687 condition = threading.Condition() 688 wait_cell = [False] 689 690 @contextlib.contextmanager 691 def wait(): # pylint: disable=invalid-name 692 # Where's Python 3's 'nonlocal' statement when you need it? 693 with condition: 694 wait_cell[0] = True 695 yield 696 with condition: 697 wait_cell[0] = False 698 condition.notify_all() 699 700 def half_duplex_request_iterator(): 701 request = self._requests_pb2.StreamingOutputCallRequest() 702 request.response_parameters.add(size=1, interval_us=0) 703 yield request 704 with condition: 705 while wait_cell[0]: 706 condition.wait() 707 708 with _CreateService( 709 self._payload_pb2, self._responses_pb2, self._service_pb2 710 ) as (methods, stub): 711 with wait(): 712 responses = stub.HalfDuplexCall( 713 half_duplex_request_iterator(), test_constants.SHORT_TIMEOUT 714 ) 715 # half-duplex waits for the client to send all info 716 with self.assertRaises(face.ExpirationError): 717 next(responses) 718 719 720if __name__ == "__main__": 721 unittest.main(verbosity=2) 722