xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/interop/service.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2019 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""The Python implementation of the TestServicer."""
15
16import time
17
18import grpc
19
20from src.proto.grpc.testing import empty_pb2
21from src.proto.grpc.testing import messages_pb2
22from src.proto.grpc.testing import test_pb2_grpc
23
24_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial"
25_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin"
26_US_IN_A_SECOND = 1000 * 1000
27
28
29def _maybe_echo_metadata(servicer_context):
30    """Copies metadata from request to response if it is present."""
31    invocation_metadata = dict(servicer_context.invocation_metadata())
32    if _INITIAL_METADATA_KEY in invocation_metadata:
33        initial_metadatum = (
34            _INITIAL_METADATA_KEY,
35            invocation_metadata[_INITIAL_METADATA_KEY],
36        )
37        servicer_context.send_initial_metadata((initial_metadatum,))
38    if _TRAILING_METADATA_KEY in invocation_metadata:
39        trailing_metadatum = (
40            _TRAILING_METADATA_KEY,
41            invocation_metadata[_TRAILING_METADATA_KEY],
42        )
43        servicer_context.set_trailing_metadata((trailing_metadatum,))
44
45
46def _maybe_echo_status_and_message(request, servicer_context):
47    """Sets the response context code and details if the request asks for them"""
48    if request.HasField("response_status"):
49        servicer_context.set_code(request.response_status.code)
50        servicer_context.set_details(request.response_status.message)
51
52
53class TestService(test_pb2_grpc.TestServiceServicer):
54    def EmptyCall(self, request, context):
55        _maybe_echo_metadata(context)
56        return empty_pb2.Empty()
57
58    def UnaryCall(self, request, context):
59        _maybe_echo_metadata(context)
60        _maybe_echo_status_and_message(request, context)
61        return messages_pb2.SimpleResponse(
62            payload=messages_pb2.Payload(
63                type=messages_pb2.COMPRESSABLE,
64                body=b"\x00" * request.response_size,
65            )
66        )
67
68    def StreamingOutputCall(self, request, context):
69        _maybe_echo_status_and_message(request, context)
70        for response_parameters in request.response_parameters:
71            if response_parameters.interval_us != 0:
72                time.sleep(response_parameters.interval_us / _US_IN_A_SECOND)
73            yield messages_pb2.StreamingOutputCallResponse(
74                payload=messages_pb2.Payload(
75                    type=request.response_type,
76                    body=b"\x00" * response_parameters.size,
77                )
78            )
79
80    def StreamingInputCall(self, request_iterator, context):
81        aggregate_size = 0
82        for request in request_iterator:
83            if request.payload is not None and request.payload.body:
84                aggregate_size += len(request.payload.body)
85        return messages_pb2.StreamingInputCallResponse(
86            aggregated_payload_size=aggregate_size
87        )
88
89    def FullDuplexCall(self, request_iterator, context):
90        _maybe_echo_metadata(context)
91        for request in request_iterator:
92            _maybe_echo_status_and_message(request, context)
93            for response_parameters in request.response_parameters:
94                if response_parameters.interval_us != 0:
95                    time.sleep(
96                        response_parameters.interval_us / _US_IN_A_SECOND
97                    )
98                yield messages_pb2.StreamingOutputCallResponse(
99                    payload=messages_pb2.Payload(
100                        type=request.payload.type,
101                        body=b"\x00" * response_parameters.size,
102                    )
103                )
104
105    # NOTE(nathaniel): Apparently this is the same as the full-duplex call?
106    # NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)...
107    def HalfDuplexCall(self, request_iterator, context):
108        return self.FullDuplexCall(request_iterator, context)
109