1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Test of RPCs made against gRPC Python's application-layer API.""" 15 16from concurrent import futures 17import itertools 18import logging 19import threading 20import unittest 21 22import grpc 23from grpc.framework.foundation import logging_pool 24 25from tests.unit._rpc_test_helpers import ( 26 stream_stream_non_blocking_multi_callable, 27) 28from tests.unit._rpc_test_helpers import ( 29 unary_stream_non_blocking_multi_callable, 30) 31from tests.unit._rpc_test_helpers import BaseRPCTest 32from tests.unit._rpc_test_helpers import Callback 33from tests.unit._rpc_test_helpers import TIMEOUT_SHORT 34from tests.unit._rpc_test_helpers import stream_stream_multi_callable 35from tests.unit._rpc_test_helpers import stream_unary_multi_callable 36from tests.unit._rpc_test_helpers import unary_stream_multi_callable 37from tests.unit._rpc_test_helpers import unary_unary_multi_callable 38from tests.unit.framework.common import test_constants 39 40 41class RPCPart2Test(BaseRPCTest, unittest.TestCase): 42 def testDefaultThreadPoolIsUsed(self): 43 self._consume_one_stream_response_unary_request( 44 unary_stream_multi_callable(self._channel) 45 ) 46 self.assertFalse(self._thread_pool.was_used()) 47 48 def testExperimentalThreadPoolIsUsed(self): 49 self._consume_one_stream_response_unary_request( 50 unary_stream_non_blocking_multi_callable(self._channel) 51 ) 52 self.assertTrue(self._thread_pool.was_used()) 53 54 def testUnrecognizedMethod(self): 55 request = b"abc" 56 57 with self.assertRaises(grpc.RpcError) as exception_context: 58 self._channel.unary_unary("NoSuchMethod")(request) 59 60 self.assertEqual( 61 grpc.StatusCode.UNIMPLEMENTED, exception_context.exception.code() 62 ) 63 64 def testSuccessfulUnaryRequestBlockingUnaryResponse(self): 65 request = b"\x07\x08" 66 expected_response = self._handler.handle_unary_unary(request, None) 67 68 multi_callable = unary_unary_multi_callable(self._channel) 69 response = multi_callable( 70 request, 71 metadata=(("test", "SuccessfulUnaryRequestBlockingUnaryResponse"),), 72 ) 73 74 self.assertEqual(expected_response, response) 75 76 def testSuccessfulUnaryRequestBlockingUnaryResponseWithCall(self): 77 request = b"\x07\x08" 78 expected_response = self._handler.handle_unary_unary(request, None) 79 80 multi_callable = unary_unary_multi_callable(self._channel) 81 response, call = multi_callable.with_call( 82 request, 83 metadata=( 84 ("test", "SuccessfulUnaryRequestBlockingUnaryResponseWithCall"), 85 ), 86 ) 87 88 self.assertEqual(expected_response, response) 89 self.assertIs(grpc.StatusCode.OK, call.code()) 90 self.assertEqual("", call.debug_error_string()) 91 92 def testSuccessfulUnaryRequestFutureUnaryResponse(self): 93 request = b"\x07\x08" 94 expected_response = self._handler.handle_unary_unary(request, None) 95 96 multi_callable = unary_unary_multi_callable(self._channel) 97 response_future = multi_callable.future( 98 request, 99 metadata=(("test", "SuccessfulUnaryRequestFutureUnaryResponse"),), 100 ) 101 response = response_future.result() 102 103 self.assertIsInstance(response_future, grpc.Future) 104 self.assertIsInstance(response_future, grpc.Call) 105 self.assertEqual(expected_response, response) 106 self.assertIsNone(response_future.exception()) 107 self.assertIsNone(response_future.traceback()) 108 109 def testSuccessfulUnaryRequestStreamResponse(self): 110 request = b"\x37\x58" 111 expected_responses = tuple( 112 self._handler.handle_unary_stream(request, None) 113 ) 114 115 multi_callable = unary_stream_multi_callable(self._channel) 116 response_iterator = multi_callable( 117 request, 118 metadata=(("test", "SuccessfulUnaryRequestStreamResponse"),), 119 ) 120 responses = tuple(response_iterator) 121 122 self.assertSequenceEqual(expected_responses, responses) 123 124 def testSuccessfulStreamRequestBlockingUnaryResponse(self): 125 requests = tuple( 126 b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH) 127 ) 128 expected_response = self._handler.handle_stream_unary( 129 iter(requests), None 130 ) 131 request_iterator = iter(requests) 132 133 multi_callable = stream_unary_multi_callable(self._channel) 134 response = multi_callable( 135 request_iterator, 136 metadata=( 137 ("test", "SuccessfulStreamRequestBlockingUnaryResponse"), 138 ), 139 ) 140 141 self.assertEqual(expected_response, response) 142 143 def testSuccessfulStreamRequestBlockingUnaryResponseWithCall(self): 144 requests = tuple( 145 b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH) 146 ) 147 expected_response = self._handler.handle_stream_unary( 148 iter(requests), None 149 ) 150 request_iterator = iter(requests) 151 152 multi_callable = stream_unary_multi_callable(self._channel) 153 response, call = multi_callable.with_call( 154 request_iterator, 155 metadata=( 156 ( 157 "test", 158 "SuccessfulStreamRequestBlockingUnaryResponseWithCall", 159 ), 160 ), 161 ) 162 163 self.assertEqual(expected_response, response) 164 self.assertIs(grpc.StatusCode.OK, call.code()) 165 166 def testSuccessfulStreamRequestFutureUnaryResponse(self): 167 requests = tuple( 168 b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH) 169 ) 170 expected_response = self._handler.handle_stream_unary( 171 iter(requests), None 172 ) 173 request_iterator = iter(requests) 174 175 multi_callable = stream_unary_multi_callable(self._channel) 176 response_future = multi_callable.future( 177 request_iterator, 178 metadata=(("test", "SuccessfulStreamRequestFutureUnaryResponse"),), 179 ) 180 response = response_future.result() 181 182 self.assertEqual(expected_response, response) 183 self.assertIsNone(response_future.exception()) 184 self.assertIsNone(response_future.traceback()) 185 186 def testSuccessfulStreamRequestStreamResponse(self): 187 requests = tuple( 188 b"\x77\x58" for _ in range(test_constants.STREAM_LENGTH) 189 ) 190 191 expected_responses = tuple( 192 self._handler.handle_stream_stream(iter(requests), None) 193 ) 194 request_iterator = iter(requests) 195 196 multi_callable = stream_stream_multi_callable(self._channel) 197 response_iterator = multi_callable( 198 request_iterator, 199 metadata=(("test", "SuccessfulStreamRequestStreamResponse"),), 200 ) 201 responses = tuple(response_iterator) 202 203 self.assertSequenceEqual(expected_responses, responses) 204 205 def testSequentialInvocations(self): 206 first_request = b"\x07\x08" 207 second_request = b"\x0809" 208 expected_first_response = self._handler.handle_unary_unary( 209 first_request, None 210 ) 211 expected_second_response = self._handler.handle_unary_unary( 212 second_request, None 213 ) 214 215 multi_callable = unary_unary_multi_callable(self._channel) 216 first_response = multi_callable( 217 first_request, metadata=(("test", "SequentialInvocations"),) 218 ) 219 second_response = multi_callable( 220 second_request, metadata=(("test", "SequentialInvocations"),) 221 ) 222 223 self.assertEqual(expected_first_response, first_response) 224 self.assertEqual(expected_second_response, second_response) 225 226 def testConcurrentBlockingInvocations(self): 227 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 228 requests = tuple( 229 b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH) 230 ) 231 expected_response = self._handler.handle_stream_unary( 232 iter(requests), None 233 ) 234 expected_responses = [ 235 expected_response 236 ] * test_constants.THREAD_CONCURRENCY 237 response_futures = [None] * test_constants.THREAD_CONCURRENCY 238 239 multi_callable = stream_unary_multi_callable(self._channel) 240 for index in range(test_constants.THREAD_CONCURRENCY): 241 request_iterator = iter(requests) 242 response_future = pool.submit( 243 multi_callable, 244 request_iterator, 245 metadata=(("test", "ConcurrentBlockingInvocations"),), 246 ) 247 response_futures[index] = response_future 248 responses = tuple( 249 response_future.result() for response_future in response_futures 250 ) 251 252 pool.shutdown(wait=True) 253 self.assertSequenceEqual(expected_responses, responses) 254 255 def testConcurrentFutureInvocations(self): 256 requests = tuple( 257 b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH) 258 ) 259 expected_response = self._handler.handle_stream_unary( 260 iter(requests), None 261 ) 262 expected_responses = [ 263 expected_response 264 ] * test_constants.THREAD_CONCURRENCY 265 response_futures = [None] * test_constants.THREAD_CONCURRENCY 266 267 multi_callable = stream_unary_multi_callable(self._channel) 268 for index in range(test_constants.THREAD_CONCURRENCY): 269 request_iterator = iter(requests) 270 response_future = multi_callable.future( 271 request_iterator, 272 metadata=(("test", "ConcurrentFutureInvocations"),), 273 ) 274 response_futures[index] = response_future 275 responses = tuple( 276 response_future.result() for response_future in response_futures 277 ) 278 279 self.assertSequenceEqual(expected_responses, responses) 280 281 def testWaitingForSomeButNotAllConcurrentFutureInvocations(self): 282 pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) 283 request = b"\x67\x68" 284 expected_response = self._handler.handle_unary_unary(request, None) 285 response_futures = [None] * test_constants.THREAD_CONCURRENCY 286 lock = threading.Lock() 287 test_is_running_cell = [True] 288 289 def wrap_future(future): 290 def wrap(): 291 try: 292 return future.result() 293 except grpc.RpcError: 294 with lock: 295 if test_is_running_cell[0]: 296 raise 297 return None 298 299 return wrap 300 301 multi_callable = unary_unary_multi_callable(self._channel) 302 for index in range(test_constants.THREAD_CONCURRENCY): 303 inner_response_future = multi_callable.future( 304 request, 305 metadata=( 306 ( 307 "test", 308 "WaitingForSomeButNotAllConcurrentFutureInvocations", 309 ), 310 ), 311 ) 312 outer_response_future = pool.submit( 313 wrap_future(inner_response_future) 314 ) 315 response_futures[index] = outer_response_future 316 317 some_completed_response_futures_iterator = itertools.islice( 318 futures.as_completed(response_futures), 319 test_constants.THREAD_CONCURRENCY // 2, 320 ) 321 for response_future in some_completed_response_futures_iterator: 322 self.assertEqual(expected_response, response_future.result()) 323 with lock: 324 test_is_running_cell[0] = False 325 326 def testConsumingOneStreamResponseUnaryRequest(self): 327 self._consume_one_stream_response_unary_request( 328 unary_stream_multi_callable(self._channel) 329 ) 330 331 def testConsumingOneStreamResponseUnaryRequestNonBlocking(self): 332 self._consume_one_stream_response_unary_request( 333 unary_stream_non_blocking_multi_callable(self._channel) 334 ) 335 336 def testConsumingSomeButNotAllStreamResponsesUnaryRequest(self): 337 self._consume_some_but_not_all_stream_responses_unary_request( 338 unary_stream_multi_callable(self._channel) 339 ) 340 341 def testConsumingSomeButNotAllStreamResponsesUnaryRequestNonBlocking(self): 342 self._consume_some_but_not_all_stream_responses_unary_request( 343 unary_stream_non_blocking_multi_callable(self._channel) 344 ) 345 346 def testConsumingSomeButNotAllStreamResponsesStreamRequest(self): 347 self._consume_some_but_not_all_stream_responses_stream_request( 348 stream_stream_multi_callable(self._channel) 349 ) 350 351 def testConsumingSomeButNotAllStreamResponsesStreamRequestNonBlocking(self): 352 self._consume_some_but_not_all_stream_responses_stream_request( 353 stream_stream_non_blocking_multi_callable(self._channel) 354 ) 355 356 def testConsumingTooManyStreamResponsesStreamRequest(self): 357 self._consume_too_many_stream_responses_stream_request( 358 stream_stream_multi_callable(self._channel) 359 ) 360 361 def testConsumingTooManyStreamResponsesStreamRequestNonBlocking(self): 362 self._consume_too_many_stream_responses_stream_request( 363 stream_stream_non_blocking_multi_callable(self._channel) 364 ) 365 366 def testCancelledUnaryRequestUnaryResponse(self): 367 request = b"\x07\x17" 368 369 multi_callable = unary_unary_multi_callable(self._channel) 370 with self._control.pause(): 371 response_future = multi_callable.future( 372 request, 373 metadata=(("test", "CancelledUnaryRequestUnaryResponse"),), 374 ) 375 response_future.cancel() 376 377 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code()) 378 self.assertTrue(response_future.cancelled()) 379 with self.assertRaises(grpc.FutureCancelledError): 380 response_future.result() 381 with self.assertRaises(grpc.FutureCancelledError): 382 response_future.exception() 383 with self.assertRaises(grpc.FutureCancelledError): 384 response_future.traceback() 385 386 def testCancelledUnaryRequestStreamResponse(self): 387 self._cancelled_unary_request_stream_response( 388 unary_stream_multi_callable(self._channel) 389 ) 390 391 def testCancelledUnaryRequestStreamResponseNonBlocking(self): 392 self._cancelled_unary_request_stream_response( 393 unary_stream_non_blocking_multi_callable(self._channel) 394 ) 395 396 def testCancelledStreamRequestUnaryResponse(self): 397 requests = tuple( 398 b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH) 399 ) 400 request_iterator = iter(requests) 401 402 multi_callable = stream_unary_multi_callable(self._channel) 403 with self._control.pause(): 404 response_future = multi_callable.future( 405 request_iterator, 406 metadata=(("test", "CancelledStreamRequestUnaryResponse"),), 407 ) 408 self._control.block_until_paused() 409 response_future.cancel() 410 411 self.assertIs(grpc.StatusCode.CANCELLED, response_future.code()) 412 self.assertTrue(response_future.cancelled()) 413 with self.assertRaises(grpc.FutureCancelledError): 414 response_future.result() 415 with self.assertRaises(grpc.FutureCancelledError): 416 response_future.exception() 417 with self.assertRaises(grpc.FutureCancelledError): 418 response_future.traceback() 419 self.assertIsNotNone(response_future.initial_metadata()) 420 self.assertIsNotNone(response_future.details()) 421 self.assertIsNotNone(response_future.trailing_metadata()) 422 423 def testCancelledStreamRequestStreamResponse(self): 424 self._cancelled_stream_request_stream_response( 425 stream_stream_multi_callable(self._channel) 426 ) 427 428 def testCancelledStreamRequestStreamResponseNonBlocking(self): 429 self._cancelled_stream_request_stream_response( 430 stream_stream_non_blocking_multi_callable(self._channel) 431 ) 432 433 def testExpiredUnaryRequestBlockingUnaryResponse(self): 434 request = b"\x07\x17" 435 436 multi_callable = unary_unary_multi_callable(self._channel) 437 with self._control.pause(): 438 with self.assertRaises(grpc.RpcError) as exception_context: 439 multi_callable.with_call( 440 request, 441 timeout=TIMEOUT_SHORT, 442 metadata=( 443 ("test", "ExpiredUnaryRequestBlockingUnaryResponse"), 444 ), 445 ) 446 447 self.assertIsInstance(exception_context.exception, grpc.Call) 448 self.assertIsNotNone(exception_context.exception.initial_metadata()) 449 self.assertIs( 450 grpc.StatusCode.DEADLINE_EXCEEDED, 451 exception_context.exception.code(), 452 ) 453 self.assertIsNotNone(exception_context.exception.details()) 454 self.assertIsNotNone(exception_context.exception.trailing_metadata()) 455 456 def testExpiredUnaryRequestFutureUnaryResponse(self): 457 request = b"\x07\x17" 458 callback = Callback() 459 460 multi_callable = unary_unary_multi_callable(self._channel) 461 with self._control.pause(): 462 response_future = multi_callable.future( 463 request, 464 timeout=TIMEOUT_SHORT, 465 metadata=(("test", "ExpiredUnaryRequestFutureUnaryResponse"),), 466 ) 467 response_future.add_done_callback(callback) 468 value_passed_to_callback = callback.value() 469 470 self.assertIs(response_future, value_passed_to_callback) 471 self.assertIsNotNone(response_future.initial_metadata()) 472 self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code()) 473 self.assertIsNotNone(response_future.details()) 474 self.assertIsNotNone(response_future.trailing_metadata()) 475 with self.assertRaises(grpc.RpcError) as exception_context: 476 response_future.result() 477 self.assertIs( 478 grpc.StatusCode.DEADLINE_EXCEEDED, 479 exception_context.exception.code(), 480 ) 481 self.assertIsInstance(response_future.exception(), grpc.RpcError) 482 self.assertIsNotNone(response_future.traceback()) 483 self.assertIs( 484 grpc.StatusCode.DEADLINE_EXCEEDED, 485 response_future.exception().code(), 486 ) 487 488 def testExpiredUnaryRequestStreamResponse(self): 489 self._expired_unary_request_stream_response( 490 unary_stream_multi_callable(self._channel) 491 ) 492 493 def testExpiredUnaryRequestStreamResponseNonBlocking(self): 494 self._expired_unary_request_stream_response( 495 unary_stream_non_blocking_multi_callable(self._channel) 496 ) 497 498 499if __name__ == "__main__": 500 logging.basicConfig() 501 unittest.main(verbosity=2) 502