xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio/grpc/_simple_stubs.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2020 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Functions that obviate explicit stubs and explicit channels."""
15
16import collections
17import datetime
18import logging
19import os
20import threading
21from typing import (
22    Any,
23    AnyStr,
24    Callable,
25    Dict,
26    Iterator,
27    Optional,
28    Sequence,
29    Tuple,
30    TypeVar,
31    Union,
32)
33
34import grpc
35from grpc.experimental import experimental_api
36
37RequestType = TypeVar("RequestType")
38ResponseType = TypeVar("ResponseType")
39
40OptionsType = Sequence[Tuple[str, str]]
41CacheKey = Tuple[
42    str,
43    OptionsType,
44    Optional[grpc.ChannelCredentials],
45    Optional[grpc.Compression],
46]
47
48_LOGGER = logging.getLogger(__name__)
49
50_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"
51if _EVICTION_PERIOD_KEY in os.environ:
52    _EVICTION_PERIOD = datetime.timedelta(
53        seconds=float(os.environ[_EVICTION_PERIOD_KEY])
54    )
55    _LOGGER.debug(
56        "Setting managed channel eviction period to %s", _EVICTION_PERIOD
57    )
58else:
59    _EVICTION_PERIOD = datetime.timedelta(minutes=10)
60
61_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"
62if _MAXIMUM_CHANNELS_KEY in os.environ:
63    _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY])
64    _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS)
65else:
66    _MAXIMUM_CHANNELS = 2**8
67
68_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS"
69if _DEFAULT_TIMEOUT_KEY in os.environ:
70    _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY])
71    _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT)
72else:
73    _DEFAULT_TIMEOUT = 60.0
74
75
76def _create_channel(
77    target: str,
78    options: Sequence[Tuple[str, str]],
79    channel_credentials: Optional[grpc.ChannelCredentials],
80    compression: Optional[grpc.Compression],
81) -> grpc.Channel:
82    _LOGGER.debug(
83        f"Creating secure channel with credentials '{channel_credentials}', "
84        + f"options '{options}' and compression '{compression}'"
85    )
86    return grpc.secure_channel(
87        target,
88        credentials=channel_credentials,
89        options=options,
90        compression=compression,
91    )
92
93
94class ChannelCache:
95    # NOTE(rbellevi): Untyped due to reference cycle.
96    _singleton = None
97    _lock: threading.RLock = threading.RLock()
98    _condition: threading.Condition = threading.Condition(lock=_lock)
99    _eviction_ready: threading.Event = threading.Event()
100
101    _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]]
102    _eviction_thread: threading.Thread
103
104    def __init__(self):
105        self._mapping = collections.OrderedDict()
106        self._eviction_thread = threading.Thread(
107            target=ChannelCache._perform_evictions, daemon=True
108        )
109        self._eviction_thread.start()
110
111    @staticmethod
112    def get():
113        with ChannelCache._lock:
114            if ChannelCache._singleton is None:
115                ChannelCache._singleton = ChannelCache()
116        ChannelCache._eviction_ready.wait()
117        return ChannelCache._singleton
118
119    def _evict_locked(self, key: CacheKey):
120        channel, _ = self._mapping.pop(key)
121        _LOGGER.debug(
122            "Evicting channel %s with configuration %s.", channel, key
123        )
124        channel.close()
125        del channel
126
127    @staticmethod
128    def _perform_evictions():
129        while True:
130            with ChannelCache._lock:
131                ChannelCache._eviction_ready.set()
132                if not ChannelCache._singleton._mapping:
133                    ChannelCache._condition.wait()
134                elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS:
135                    key = next(iter(ChannelCache._singleton._mapping.keys()))
136                    ChannelCache._singleton._evict_locked(key)
137                    # And immediately reevaluate.
138                else:
139                    key, (_, eviction_time) = next(
140                        iter(ChannelCache._singleton._mapping.items())
141                    )
142                    now = datetime.datetime.now()
143                    if eviction_time <= now:
144                        ChannelCache._singleton._evict_locked(key)
145                        continue
146                    else:
147                        time_to_eviction = (eviction_time - now).total_seconds()
148                        # NOTE: We aim to *eventually* coalesce to a state in
149                        # which no overdue channels are in the cache and the
150                        # length of the cache is longer than _MAXIMUM_CHANNELS.
151                        # We tolerate momentary states in which these two
152                        # criteria are not met.
153                        ChannelCache._condition.wait(timeout=time_to_eviction)
154
155    def get_channel(
156        self,
157        target: str,
158        options: Sequence[Tuple[str, str]],
159        channel_credentials: Optional[grpc.ChannelCredentials],
160        insecure: bool,
161        compression: Optional[grpc.Compression],
162        method: str,
163        _registered_method: bool,
164    ) -> Tuple[grpc.Channel, Optional[int]]:
165        """Get a channel from cache or creates a new channel.
166
167        This method also takes care of register method for channel,
168          which means we'll register a new call handle if we're calling a
169          non-registered method for an existing channel.
170
171        Returns:
172            A tuple with two items. The first item is the channel, second item is
173              the call handle if the method is registered, None if it's not registered.
174        """
175        if insecure and channel_credentials:
176            raise ValueError(
177                "The insecure option is mutually exclusive with "
178                + "the channel_credentials option. Please use one "
179                + "or the other."
180            )
181        if insecure:
182            channel_credentials = (
183                grpc.experimental.insecure_channel_credentials()
184            )
185        elif channel_credentials is None:
186            _LOGGER.debug("Defaulting to SSL channel credentials.")
187            channel_credentials = grpc.ssl_channel_credentials()
188        key = (target, options, channel_credentials, compression)
189        with self._lock:
190            channel_data = self._mapping.get(key, None)
191            call_handle = None
192            if channel_data is not None:
193                channel = channel_data[0]
194                # Register a new call handle if we're calling a registered method for an
195                # existing channel and this method is not registered.
196                if _registered_method:
197                    call_handle = channel._get_registered_call_handle(method)
198                self._mapping.pop(key)
199                self._mapping[key] = (
200                    channel,
201                    datetime.datetime.now() + _EVICTION_PERIOD,
202                )
203                return channel, call_handle
204            else:
205                channel = _create_channel(
206                    target, options, channel_credentials, compression
207                )
208                if _registered_method:
209                    call_handle = channel._get_registered_call_handle(method)
210                self._mapping[key] = (
211                    channel,
212                    datetime.datetime.now() + _EVICTION_PERIOD,
213                )
214                if (
215                    len(self._mapping) == 1
216                    or len(self._mapping) >= _MAXIMUM_CHANNELS
217                ):
218                    self._condition.notify()
219                return channel, call_handle
220
221    def _test_only_channel_count(self) -> int:
222        with self._lock:
223            return len(self._mapping)
224
225
226@experimental_api
227# pylint: disable=too-many-locals
228def unary_unary(
229    request: RequestType,
230    target: str,
231    method: str,
232    request_serializer: Optional[Callable[[Any], bytes]] = None,
233    response_deserializer: Optional[Callable[[bytes], Any]] = None,
234    options: Sequence[Tuple[AnyStr, AnyStr]] = (),
235    channel_credentials: Optional[grpc.ChannelCredentials] = None,
236    insecure: bool = False,
237    call_credentials: Optional[grpc.CallCredentials] = None,
238    compression: Optional[grpc.Compression] = None,
239    wait_for_ready: Optional[bool] = None,
240    timeout: Optional[float] = _DEFAULT_TIMEOUT,
241    metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
242    _registered_method: Optional[bool] = False,
243) -> ResponseType:
244    """Invokes a unary-unary RPC without an explicitly specified channel.
245
246    THIS IS AN EXPERIMENTAL API.
247
248    This is backed by a per-process cache of channels. Channels are evicted
249    from the cache after a fixed period by a background. Channels will also be
250    evicted if more than a configured maximum accumulate.
251
252    The default eviction period is 10 minutes. One may set the environment
253    variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
254
255    The default maximum number of channels is 256. One may set the
256    environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
257    this.
258
259    Args:
260      request: An iterator that yields request values for the RPC.
261      target: The server address.
262      method: The name of the RPC method.
263      request_serializer: Optional :term:`serializer` for serializing the request
264        message. Request goes unserialized in case None is passed.
265      response_deserializer: Optional :term:`deserializer` for deserializing the response
266        message. Response goes undeserialized in case None is passed.
267      options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
268        runtime) to configure the channel.
269      channel_credentials: A credential applied to the whole channel, e.g. the
270        return value of grpc.ssl_channel_credentials() or
271        grpc.insecure_channel_credentials().
272      insecure: If True, specifies channel_credentials as
273        :term:`grpc.insecure_channel_credentials()`. This option is mutually
274        exclusive with the `channel_credentials` option.
275      call_credentials: A call credential applied to each call individually,
276        e.g. the output of grpc.metadata_call_credentials() or
277        grpc.access_token_call_credentials().
278      compression: An optional value indicating the compression method to be
279        used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
280      wait_for_ready: An optional flag indicating whether the RPC should fail
281        immediately if the connection is not ready at the time the RPC is
282        invoked, or if it should wait until the connection to the server
283        becomes ready. When using this option, the user will likely also want
284        to set a timeout. Defaults to True.
285      timeout: An optional duration of time in seconds to allow for the RPC,
286        after which an exception will be raised. If timeout is unspecified,
287        defaults to a timeout controlled by the
288        GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
289        unset, defaults to 60 seconds. Supply a value of None to indicate that
290        no timeout should be enforced.
291      metadata: Optional metadata to send to the server.
292
293    Returns:
294      The response to the RPC.
295    """
296    channel, method_handle = ChannelCache.get().get_channel(
297        target,
298        options,
299        channel_credentials,
300        insecure,
301        compression,
302        method,
303        _registered_method,
304    )
305    multicallable = channel.unary_unary(
306        method, request_serializer, response_deserializer, method_handle
307    )
308    wait_for_ready = wait_for_ready if wait_for_ready is not None else True
309    return multicallable(
310        request,
311        metadata=metadata,
312        wait_for_ready=wait_for_ready,
313        credentials=call_credentials,
314        timeout=timeout,
315    )
316
317
318@experimental_api
319# pylint: disable=too-many-locals
320def unary_stream(
321    request: RequestType,
322    target: str,
323    method: str,
324    request_serializer: Optional[Callable[[Any], bytes]] = None,
325    response_deserializer: Optional[Callable[[bytes], Any]] = None,
326    options: Sequence[Tuple[AnyStr, AnyStr]] = (),
327    channel_credentials: Optional[grpc.ChannelCredentials] = None,
328    insecure: bool = False,
329    call_credentials: Optional[grpc.CallCredentials] = None,
330    compression: Optional[grpc.Compression] = None,
331    wait_for_ready: Optional[bool] = None,
332    timeout: Optional[float] = _DEFAULT_TIMEOUT,
333    metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
334    _registered_method: Optional[bool] = False,
335) -> Iterator[ResponseType]:
336    """Invokes a unary-stream RPC without an explicitly specified channel.
337
338    THIS IS AN EXPERIMENTAL API.
339
340    This is backed by a per-process cache of channels. Channels are evicted
341    from the cache after a fixed period by a background. Channels will also be
342    evicted if more than a configured maximum accumulate.
343
344    The default eviction period is 10 minutes. One may set the environment
345    variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
346
347    The default maximum number of channels is 256. One may set the
348    environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
349    this.
350
351    Args:
352      request: An iterator that yields request values for the RPC.
353      target: The server address.
354      method: The name of the RPC method.
355      request_serializer: Optional :term:`serializer` for serializing the request
356        message. Request goes unserialized in case None is passed.
357      response_deserializer: Optional :term:`deserializer` for deserializing the response
358        message. Response goes undeserialized in case None is passed.
359      options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
360        runtime) to configure the channel.
361      channel_credentials: A credential applied to the whole channel, e.g. the
362        return value of grpc.ssl_channel_credentials().
363      insecure: If True, specifies channel_credentials as
364        :term:`grpc.insecure_channel_credentials()`. This option is mutually
365        exclusive with the `channel_credentials` option.
366      call_credentials: A call credential applied to each call individually,
367        e.g. the output of grpc.metadata_call_credentials() or
368        grpc.access_token_call_credentials().
369      compression: An optional value indicating the compression method to be
370        used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
371      wait_for_ready: An optional flag indicating whether the RPC should fail
372        immediately if the connection is not ready at the time the RPC is
373        invoked, or if it should wait until the connection to the server
374        becomes ready. When using this option, the user will likely also want
375        to set a timeout. Defaults to True.
376      timeout: An optional duration of time in seconds to allow for the RPC,
377        after which an exception will be raised. If timeout is unspecified,
378        defaults to a timeout controlled by the
379        GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
380        unset, defaults to 60 seconds. Supply a value of None to indicate that
381        no timeout should be enforced.
382      metadata: Optional metadata to send to the server.
383
384    Returns:
385      An iterator of responses.
386    """
387    channel, method_handle = ChannelCache.get().get_channel(
388        target,
389        options,
390        channel_credentials,
391        insecure,
392        compression,
393        method,
394        _registered_method,
395    )
396    multicallable = channel.unary_stream(
397        method, request_serializer, response_deserializer, method_handle
398    )
399    wait_for_ready = wait_for_ready if wait_for_ready is not None else True
400    return multicallable(
401        request,
402        metadata=metadata,
403        wait_for_ready=wait_for_ready,
404        credentials=call_credentials,
405        timeout=timeout,
406    )
407
408
409@experimental_api
410# pylint: disable=too-many-locals
411def stream_unary(
412    request_iterator: Iterator[RequestType],
413    target: str,
414    method: str,
415    request_serializer: Optional[Callable[[Any], bytes]] = None,
416    response_deserializer: Optional[Callable[[bytes], Any]] = None,
417    options: Sequence[Tuple[AnyStr, AnyStr]] = (),
418    channel_credentials: Optional[grpc.ChannelCredentials] = None,
419    insecure: bool = False,
420    call_credentials: Optional[grpc.CallCredentials] = None,
421    compression: Optional[grpc.Compression] = None,
422    wait_for_ready: Optional[bool] = None,
423    timeout: Optional[float] = _DEFAULT_TIMEOUT,
424    metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
425    _registered_method: Optional[bool] = False,
426) -> ResponseType:
427    """Invokes a stream-unary RPC without an explicitly specified channel.
428
429    THIS IS AN EXPERIMENTAL API.
430
431    This is backed by a per-process cache of channels. Channels are evicted
432    from the cache after a fixed period by a background. Channels will also be
433    evicted if more than a configured maximum accumulate.
434
435    The default eviction period is 10 minutes. One may set the environment
436    variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
437
438    The default maximum number of channels is 256. One may set the
439    environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
440    this.
441
442    Args:
443      request_iterator: An iterator that yields request values for the RPC.
444      target: The server address.
445      method: The name of the RPC method.
446      request_serializer: Optional :term:`serializer` for serializing the request
447        message. Request goes unserialized in case None is passed.
448      response_deserializer: Optional :term:`deserializer` for deserializing the response
449        message. Response goes undeserialized in case None is passed.
450      options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
451        runtime) to configure the channel.
452      channel_credentials: A credential applied to the whole channel, e.g. the
453        return value of grpc.ssl_channel_credentials().
454      call_credentials: A call credential applied to each call individually,
455        e.g. the output of grpc.metadata_call_credentials() or
456        grpc.access_token_call_credentials().
457      insecure: If True, specifies channel_credentials as
458        :term:`grpc.insecure_channel_credentials()`. This option is mutually
459        exclusive with the `channel_credentials` option.
460      compression: An optional value indicating the compression method to be
461        used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
462      wait_for_ready: An optional flag indicating whether the RPC should fail
463        immediately if the connection is not ready at the time the RPC is
464        invoked, or if it should wait until the connection to the server
465        becomes ready. When using this option, the user will likely also want
466        to set a timeout. Defaults to True.
467      timeout: An optional duration of time in seconds to allow for the RPC,
468        after which an exception will be raised. If timeout is unspecified,
469        defaults to a timeout controlled by the
470        GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
471        unset, defaults to 60 seconds. Supply a value of None to indicate that
472        no timeout should be enforced.
473      metadata: Optional metadata to send to the server.
474
475    Returns:
476      The response to the RPC.
477    """
478    channel, method_handle = ChannelCache.get().get_channel(
479        target,
480        options,
481        channel_credentials,
482        insecure,
483        compression,
484        method,
485        _registered_method,
486    )
487    multicallable = channel.stream_unary(
488        method, request_serializer, response_deserializer, method_handle
489    )
490    wait_for_ready = wait_for_ready if wait_for_ready is not None else True
491    return multicallable(
492        request_iterator,
493        metadata=metadata,
494        wait_for_ready=wait_for_ready,
495        credentials=call_credentials,
496        timeout=timeout,
497    )
498
499
500@experimental_api
501# pylint: disable=too-many-locals
502def stream_stream(
503    request_iterator: Iterator[RequestType],
504    target: str,
505    method: str,
506    request_serializer: Optional[Callable[[Any], bytes]] = None,
507    response_deserializer: Optional[Callable[[bytes], Any]] = None,
508    options: Sequence[Tuple[AnyStr, AnyStr]] = (),
509    channel_credentials: Optional[grpc.ChannelCredentials] = None,
510    insecure: bool = False,
511    call_credentials: Optional[grpc.CallCredentials] = None,
512    compression: Optional[grpc.Compression] = None,
513    wait_for_ready: Optional[bool] = None,
514    timeout: Optional[float] = _DEFAULT_TIMEOUT,
515    metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None,
516    _registered_method: Optional[bool] = False,
517) -> Iterator[ResponseType]:
518    """Invokes a stream-stream RPC without an explicitly specified channel.
519
520    THIS IS AN EXPERIMENTAL API.
521
522    This is backed by a per-process cache of channels. Channels are evicted
523    from the cache after a fixed period by a background. Channels will also be
524    evicted if more than a configured maximum accumulate.
525
526    The default eviction period is 10 minutes. One may set the environment
527    variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
528
529    The default maximum number of channels is 256. One may set the
530    environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
531    this.
532
533    Args:
534      request_iterator: An iterator that yields request values for the RPC.
535      target: The server address.
536      method: The name of the RPC method.
537      request_serializer: Optional :term:`serializer` for serializing the request
538        message. Request goes unserialized in case None is passed.
539      response_deserializer: Optional :term:`deserializer` for deserializing the response
540        message. Response goes undeserialized in case None is passed.
541      options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core
542        runtime) to configure the channel.
543      channel_credentials: A credential applied to the whole channel, e.g. the
544        return value of grpc.ssl_channel_credentials().
545      call_credentials: A call credential applied to each call individually,
546        e.g. the output of grpc.metadata_call_credentials() or
547        grpc.access_token_call_credentials().
548      insecure: If True, specifies channel_credentials as
549        :term:`grpc.insecure_channel_credentials()`. This option is mutually
550        exclusive with the `channel_credentials` option.
551      compression: An optional value indicating the compression method to be
552        used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
553      wait_for_ready: An optional flag indicating whether the RPC should fail
554        immediately if the connection is not ready at the time the RPC is
555        invoked, or if it should wait until the connection to the server
556        becomes ready. When using this option, the user will likely also want
557        to set a timeout. Defaults to True.
558      timeout: An optional duration of time in seconds to allow for the RPC,
559        after which an exception will be raised. If timeout is unspecified,
560        defaults to a timeout controlled by the
561        GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is
562        unset, defaults to 60 seconds. Supply a value of None to indicate that
563        no timeout should be enforced.
564      metadata: Optional metadata to send to the server.
565
566    Returns:
567      An iterator of responses.
568    """
569    channel, method_handle = ChannelCache.get().get_channel(
570        target,
571        options,
572        channel_credentials,
573        insecure,
574        compression,
575        method,
576        _registered_method,
577    )
578    multicallable = channel.stream_stream(
579        method, request_serializer, response_deserializer, method_handle
580    )
581    wait_for_ready = wait_for_ready if wait_for_ready is not None else True
582    return multicallable(
583        request_iterator,
584        metadata=metadata,
585        wait_for_ready=wait_for_ready,
586        credentials=call_credentials,
587        timeout=timeout,
588    )
589