xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/unit/_rpc_part_2_test.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Test of RPCs made against gRPC Python's application-layer API."""
15
16from concurrent import futures
17import itertools
18import logging
19import threading
20import unittest
21
22import grpc
23from grpc.framework.foundation import logging_pool
24
25from tests.unit._rpc_test_helpers import (
26    stream_stream_non_blocking_multi_callable,
27)
28from tests.unit._rpc_test_helpers import (
29    unary_stream_non_blocking_multi_callable,
30)
31from tests.unit._rpc_test_helpers import BaseRPCTest
32from tests.unit._rpc_test_helpers import Callback
33from tests.unit._rpc_test_helpers import TIMEOUT_SHORT
34from tests.unit._rpc_test_helpers import stream_stream_multi_callable
35from tests.unit._rpc_test_helpers import stream_unary_multi_callable
36from tests.unit._rpc_test_helpers import unary_stream_multi_callable
37from tests.unit._rpc_test_helpers import unary_unary_multi_callable
38from tests.unit.framework.common import test_constants
39
40
41class RPCPart2Test(BaseRPCTest, unittest.TestCase):
42    def testDefaultThreadPoolIsUsed(self):
43        self._consume_one_stream_response_unary_request(
44            unary_stream_multi_callable(self._channel)
45        )
46        self.assertFalse(self._thread_pool.was_used())
47
48    def testExperimentalThreadPoolIsUsed(self):
49        self._consume_one_stream_response_unary_request(
50            unary_stream_non_blocking_multi_callable(self._channel)
51        )
52        self.assertTrue(self._thread_pool.was_used())
53
54    def testUnrecognizedMethod(self):
55        request = b"abc"
56
57        with self.assertRaises(grpc.RpcError) as exception_context:
58            self._channel.unary_unary("NoSuchMethod")(request)
59
60        self.assertEqual(
61            grpc.StatusCode.UNIMPLEMENTED, exception_context.exception.code()
62        )
63
64    def testSuccessfulUnaryRequestBlockingUnaryResponse(self):
65        request = b"\x07\x08"
66        expected_response = self._handler.handle_unary_unary(request, None)
67
68        multi_callable = unary_unary_multi_callable(self._channel)
69        response = multi_callable(
70            request,
71            metadata=(("test", "SuccessfulUnaryRequestBlockingUnaryResponse"),),
72        )
73
74        self.assertEqual(expected_response, response)
75
76    def testSuccessfulUnaryRequestBlockingUnaryResponseWithCall(self):
77        request = b"\x07\x08"
78        expected_response = self._handler.handle_unary_unary(request, None)
79
80        multi_callable = unary_unary_multi_callable(self._channel)
81        response, call = multi_callable.with_call(
82            request,
83            metadata=(
84                ("test", "SuccessfulUnaryRequestBlockingUnaryResponseWithCall"),
85            ),
86        )
87
88        self.assertEqual(expected_response, response)
89        self.assertIs(grpc.StatusCode.OK, call.code())
90        self.assertEqual("", call.debug_error_string())
91
92    def testSuccessfulUnaryRequestFutureUnaryResponse(self):
93        request = b"\x07\x08"
94        expected_response = self._handler.handle_unary_unary(request, None)
95
96        multi_callable = unary_unary_multi_callable(self._channel)
97        response_future = multi_callable.future(
98            request,
99            metadata=(("test", "SuccessfulUnaryRequestFutureUnaryResponse"),),
100        )
101        response = response_future.result()
102
103        self.assertIsInstance(response_future, grpc.Future)
104        self.assertIsInstance(response_future, grpc.Call)
105        self.assertEqual(expected_response, response)
106        self.assertIsNone(response_future.exception())
107        self.assertIsNone(response_future.traceback())
108
109    def testSuccessfulUnaryRequestStreamResponse(self):
110        request = b"\x37\x58"
111        expected_responses = tuple(
112            self._handler.handle_unary_stream(request, None)
113        )
114
115        multi_callable = unary_stream_multi_callable(self._channel)
116        response_iterator = multi_callable(
117            request,
118            metadata=(("test", "SuccessfulUnaryRequestStreamResponse"),),
119        )
120        responses = tuple(response_iterator)
121
122        self.assertSequenceEqual(expected_responses, responses)
123
124    def testSuccessfulStreamRequestBlockingUnaryResponse(self):
125        requests = tuple(
126            b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH)
127        )
128        expected_response = self._handler.handle_stream_unary(
129            iter(requests), None
130        )
131        request_iterator = iter(requests)
132
133        multi_callable = stream_unary_multi_callable(self._channel)
134        response = multi_callable(
135            request_iterator,
136            metadata=(
137                ("test", "SuccessfulStreamRequestBlockingUnaryResponse"),
138            ),
139        )
140
141        self.assertEqual(expected_response, response)
142
143    def testSuccessfulStreamRequestBlockingUnaryResponseWithCall(self):
144        requests = tuple(
145            b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH)
146        )
147        expected_response = self._handler.handle_stream_unary(
148            iter(requests), None
149        )
150        request_iterator = iter(requests)
151
152        multi_callable = stream_unary_multi_callable(self._channel)
153        response, call = multi_callable.with_call(
154            request_iterator,
155            metadata=(
156                (
157                    "test",
158                    "SuccessfulStreamRequestBlockingUnaryResponseWithCall",
159                ),
160            ),
161        )
162
163        self.assertEqual(expected_response, response)
164        self.assertIs(grpc.StatusCode.OK, call.code())
165
166    def testSuccessfulStreamRequestFutureUnaryResponse(self):
167        requests = tuple(
168            b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH)
169        )
170        expected_response = self._handler.handle_stream_unary(
171            iter(requests), None
172        )
173        request_iterator = iter(requests)
174
175        multi_callable = stream_unary_multi_callable(self._channel)
176        response_future = multi_callable.future(
177            request_iterator,
178            metadata=(("test", "SuccessfulStreamRequestFutureUnaryResponse"),),
179        )
180        response = response_future.result()
181
182        self.assertEqual(expected_response, response)
183        self.assertIsNone(response_future.exception())
184        self.assertIsNone(response_future.traceback())
185
186    def testSuccessfulStreamRequestStreamResponse(self):
187        requests = tuple(
188            b"\x77\x58" for _ in range(test_constants.STREAM_LENGTH)
189        )
190
191        expected_responses = tuple(
192            self._handler.handle_stream_stream(iter(requests), None)
193        )
194        request_iterator = iter(requests)
195
196        multi_callable = stream_stream_multi_callable(self._channel)
197        response_iterator = multi_callable(
198            request_iterator,
199            metadata=(("test", "SuccessfulStreamRequestStreamResponse"),),
200        )
201        responses = tuple(response_iterator)
202
203        self.assertSequenceEqual(expected_responses, responses)
204
205    def testSequentialInvocations(self):
206        first_request = b"\x07\x08"
207        second_request = b"\x0809"
208        expected_first_response = self._handler.handle_unary_unary(
209            first_request, None
210        )
211        expected_second_response = self._handler.handle_unary_unary(
212            second_request, None
213        )
214
215        multi_callable = unary_unary_multi_callable(self._channel)
216        first_response = multi_callable(
217            first_request, metadata=(("test", "SequentialInvocations"),)
218        )
219        second_response = multi_callable(
220            second_request, metadata=(("test", "SequentialInvocations"),)
221        )
222
223        self.assertEqual(expected_first_response, first_response)
224        self.assertEqual(expected_second_response, second_response)
225
226    def testConcurrentBlockingInvocations(self):
227        pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
228        requests = tuple(
229            b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH)
230        )
231        expected_response = self._handler.handle_stream_unary(
232            iter(requests), None
233        )
234        expected_responses = [
235            expected_response
236        ] * test_constants.THREAD_CONCURRENCY
237        response_futures = [None] * test_constants.THREAD_CONCURRENCY
238
239        multi_callable = stream_unary_multi_callable(self._channel)
240        for index in range(test_constants.THREAD_CONCURRENCY):
241            request_iterator = iter(requests)
242            response_future = pool.submit(
243                multi_callable,
244                request_iterator,
245                metadata=(("test", "ConcurrentBlockingInvocations"),),
246            )
247            response_futures[index] = response_future
248        responses = tuple(
249            response_future.result() for response_future in response_futures
250        )
251
252        pool.shutdown(wait=True)
253        self.assertSequenceEqual(expected_responses, responses)
254
255    def testConcurrentFutureInvocations(self):
256        requests = tuple(
257            b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH)
258        )
259        expected_response = self._handler.handle_stream_unary(
260            iter(requests), None
261        )
262        expected_responses = [
263            expected_response
264        ] * test_constants.THREAD_CONCURRENCY
265        response_futures = [None] * test_constants.THREAD_CONCURRENCY
266
267        multi_callable = stream_unary_multi_callable(self._channel)
268        for index in range(test_constants.THREAD_CONCURRENCY):
269            request_iterator = iter(requests)
270            response_future = multi_callable.future(
271                request_iterator,
272                metadata=(("test", "ConcurrentFutureInvocations"),),
273            )
274            response_futures[index] = response_future
275        responses = tuple(
276            response_future.result() for response_future in response_futures
277        )
278
279        self.assertSequenceEqual(expected_responses, responses)
280
281    def testWaitingForSomeButNotAllConcurrentFutureInvocations(self):
282        pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
283        request = b"\x67\x68"
284        expected_response = self._handler.handle_unary_unary(request, None)
285        response_futures = [None] * test_constants.THREAD_CONCURRENCY
286        lock = threading.Lock()
287        test_is_running_cell = [True]
288
289        def wrap_future(future):
290            def wrap():
291                try:
292                    return future.result()
293                except grpc.RpcError:
294                    with lock:
295                        if test_is_running_cell[0]:
296                            raise
297                    return None
298
299            return wrap
300
301        multi_callable = unary_unary_multi_callable(self._channel)
302        for index in range(test_constants.THREAD_CONCURRENCY):
303            inner_response_future = multi_callable.future(
304                request,
305                metadata=(
306                    (
307                        "test",
308                        "WaitingForSomeButNotAllConcurrentFutureInvocations",
309                    ),
310                ),
311            )
312            outer_response_future = pool.submit(
313                wrap_future(inner_response_future)
314            )
315            response_futures[index] = outer_response_future
316
317        some_completed_response_futures_iterator = itertools.islice(
318            futures.as_completed(response_futures),
319            test_constants.THREAD_CONCURRENCY // 2,
320        )
321        for response_future in some_completed_response_futures_iterator:
322            self.assertEqual(expected_response, response_future.result())
323        with lock:
324            test_is_running_cell[0] = False
325
326    def testConsumingOneStreamResponseUnaryRequest(self):
327        self._consume_one_stream_response_unary_request(
328            unary_stream_multi_callable(self._channel)
329        )
330
331    def testConsumingOneStreamResponseUnaryRequestNonBlocking(self):
332        self._consume_one_stream_response_unary_request(
333            unary_stream_non_blocking_multi_callable(self._channel)
334        )
335
336    def testConsumingSomeButNotAllStreamResponsesUnaryRequest(self):
337        self._consume_some_but_not_all_stream_responses_unary_request(
338            unary_stream_multi_callable(self._channel)
339        )
340
341    def testConsumingSomeButNotAllStreamResponsesUnaryRequestNonBlocking(self):
342        self._consume_some_but_not_all_stream_responses_unary_request(
343            unary_stream_non_blocking_multi_callable(self._channel)
344        )
345
346    def testConsumingSomeButNotAllStreamResponsesStreamRequest(self):
347        self._consume_some_but_not_all_stream_responses_stream_request(
348            stream_stream_multi_callable(self._channel)
349        )
350
351    def testConsumingSomeButNotAllStreamResponsesStreamRequestNonBlocking(self):
352        self._consume_some_but_not_all_stream_responses_stream_request(
353            stream_stream_non_blocking_multi_callable(self._channel)
354        )
355
356    def testConsumingTooManyStreamResponsesStreamRequest(self):
357        self._consume_too_many_stream_responses_stream_request(
358            stream_stream_multi_callable(self._channel)
359        )
360
361    def testConsumingTooManyStreamResponsesStreamRequestNonBlocking(self):
362        self._consume_too_many_stream_responses_stream_request(
363            stream_stream_non_blocking_multi_callable(self._channel)
364        )
365
366    def testCancelledUnaryRequestUnaryResponse(self):
367        request = b"\x07\x17"
368
369        multi_callable = unary_unary_multi_callable(self._channel)
370        with self._control.pause():
371            response_future = multi_callable.future(
372                request,
373                metadata=(("test", "CancelledUnaryRequestUnaryResponse"),),
374            )
375            response_future.cancel()
376
377        self.assertIs(grpc.StatusCode.CANCELLED, response_future.code())
378        self.assertTrue(response_future.cancelled())
379        with self.assertRaises(grpc.FutureCancelledError):
380            response_future.result()
381        with self.assertRaises(grpc.FutureCancelledError):
382            response_future.exception()
383        with self.assertRaises(grpc.FutureCancelledError):
384            response_future.traceback()
385
386    def testCancelledUnaryRequestStreamResponse(self):
387        self._cancelled_unary_request_stream_response(
388            unary_stream_multi_callable(self._channel)
389        )
390
391    def testCancelledUnaryRequestStreamResponseNonBlocking(self):
392        self._cancelled_unary_request_stream_response(
393            unary_stream_non_blocking_multi_callable(self._channel)
394        )
395
396    def testCancelledStreamRequestUnaryResponse(self):
397        requests = tuple(
398            b"\x07\x08" for _ in range(test_constants.STREAM_LENGTH)
399        )
400        request_iterator = iter(requests)
401
402        multi_callable = stream_unary_multi_callable(self._channel)
403        with self._control.pause():
404            response_future = multi_callable.future(
405                request_iterator,
406                metadata=(("test", "CancelledStreamRequestUnaryResponse"),),
407            )
408            self._control.block_until_paused()
409            response_future.cancel()
410
411        self.assertIs(grpc.StatusCode.CANCELLED, response_future.code())
412        self.assertTrue(response_future.cancelled())
413        with self.assertRaises(grpc.FutureCancelledError):
414            response_future.result()
415        with self.assertRaises(grpc.FutureCancelledError):
416            response_future.exception()
417        with self.assertRaises(grpc.FutureCancelledError):
418            response_future.traceback()
419        self.assertIsNotNone(response_future.initial_metadata())
420        self.assertIsNotNone(response_future.details())
421        self.assertIsNotNone(response_future.trailing_metadata())
422
423    def testCancelledStreamRequestStreamResponse(self):
424        self._cancelled_stream_request_stream_response(
425            stream_stream_multi_callable(self._channel)
426        )
427
428    def testCancelledStreamRequestStreamResponseNonBlocking(self):
429        self._cancelled_stream_request_stream_response(
430            stream_stream_non_blocking_multi_callable(self._channel)
431        )
432
433    def testExpiredUnaryRequestBlockingUnaryResponse(self):
434        request = b"\x07\x17"
435
436        multi_callable = unary_unary_multi_callable(self._channel)
437        with self._control.pause():
438            with self.assertRaises(grpc.RpcError) as exception_context:
439                multi_callable.with_call(
440                    request,
441                    timeout=TIMEOUT_SHORT,
442                    metadata=(
443                        ("test", "ExpiredUnaryRequestBlockingUnaryResponse"),
444                    ),
445                )
446
447        self.assertIsInstance(exception_context.exception, grpc.Call)
448        self.assertIsNotNone(exception_context.exception.initial_metadata())
449        self.assertIs(
450            grpc.StatusCode.DEADLINE_EXCEEDED,
451            exception_context.exception.code(),
452        )
453        self.assertIsNotNone(exception_context.exception.details())
454        self.assertIsNotNone(exception_context.exception.trailing_metadata())
455
456    def testExpiredUnaryRequestFutureUnaryResponse(self):
457        request = b"\x07\x17"
458        callback = Callback()
459
460        multi_callable = unary_unary_multi_callable(self._channel)
461        with self._control.pause():
462            response_future = multi_callable.future(
463                request,
464                timeout=TIMEOUT_SHORT,
465                metadata=(("test", "ExpiredUnaryRequestFutureUnaryResponse"),),
466            )
467            response_future.add_done_callback(callback)
468            value_passed_to_callback = callback.value()
469
470        self.assertIs(response_future, value_passed_to_callback)
471        self.assertIsNotNone(response_future.initial_metadata())
472        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code())
473        self.assertIsNotNone(response_future.details())
474        self.assertIsNotNone(response_future.trailing_metadata())
475        with self.assertRaises(grpc.RpcError) as exception_context:
476            response_future.result()
477        self.assertIs(
478            grpc.StatusCode.DEADLINE_EXCEEDED,
479            exception_context.exception.code(),
480        )
481        self.assertIsInstance(response_future.exception(), grpc.RpcError)
482        self.assertIsNotNone(response_future.traceback())
483        self.assertIs(
484            grpc.StatusCode.DEADLINE_EXCEEDED,
485            response_future.exception().code(),
486        )
487
488    def testExpiredUnaryRequestStreamResponse(self):
489        self._expired_unary_request_stream_response(
490            unary_stream_multi_callable(self._channel)
491        )
492
493    def testExpiredUnaryRequestStreamResponseNonBlocking(self):
494        self._expired_unary_request_stream_response(
495            unary_stream_non_blocking_multi_callable(self._channel)
496        )
497
498
499if __name__ == "__main__":
500    logging.basicConfig()
501    unittest.main(verbosity=2)
502