1# Copyright 2020 The gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Functions that obviate explicit stubs and explicit channels.""" 15 16import collections 17import datetime 18import logging 19import os 20import threading 21from typing import ( 22 Any, 23 AnyStr, 24 Callable, 25 Dict, 26 Iterator, 27 Optional, 28 Sequence, 29 Tuple, 30 TypeVar, 31 Union, 32) 33 34import grpc 35from grpc.experimental import experimental_api 36 37RequestType = TypeVar("RequestType") 38ResponseType = TypeVar("ResponseType") 39 40OptionsType = Sequence[Tuple[str, str]] 41CacheKey = Tuple[ 42 str, 43 OptionsType, 44 Optional[grpc.ChannelCredentials], 45 Optional[grpc.Compression], 46] 47 48_LOGGER = logging.getLogger(__name__) 49 50_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" 51if _EVICTION_PERIOD_KEY in os.environ: 52 _EVICTION_PERIOD = datetime.timedelta( 53 seconds=float(os.environ[_EVICTION_PERIOD_KEY]) 54 ) 55 _LOGGER.debug( 56 "Setting managed channel eviction period to %s", _EVICTION_PERIOD 57 ) 58else: 59 _EVICTION_PERIOD = datetime.timedelta(minutes=10) 60 61_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" 62if _MAXIMUM_CHANNELS_KEY in os.environ: 63 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY]) 64 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS) 65else: 66 _MAXIMUM_CHANNELS = 2**8 67 68_DEFAULT_TIMEOUT_KEY = "GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS" 69if _DEFAULT_TIMEOUT_KEY in os.environ: 70 _DEFAULT_TIMEOUT = float(os.environ[_DEFAULT_TIMEOUT_KEY]) 71 _LOGGER.debug("Setting default timeout seconds to %f", _DEFAULT_TIMEOUT) 72else: 73 _DEFAULT_TIMEOUT = 60.0 74 75 76def _create_channel( 77 target: str, 78 options: Sequence[Tuple[str, str]], 79 channel_credentials: Optional[grpc.ChannelCredentials], 80 compression: Optional[grpc.Compression], 81) -> grpc.Channel: 82 _LOGGER.debug( 83 f"Creating secure channel with credentials '{channel_credentials}', " 84 + f"options '{options}' and compression '{compression}'" 85 ) 86 return grpc.secure_channel( 87 target, 88 credentials=channel_credentials, 89 options=options, 90 compression=compression, 91 ) 92 93 94class ChannelCache: 95 # NOTE(rbellevi): Untyped due to reference cycle. 96 _singleton = None 97 _lock: threading.RLock = threading.RLock() 98 _condition: threading.Condition = threading.Condition(lock=_lock) 99 _eviction_ready: threading.Event = threading.Event() 100 101 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]] 102 _eviction_thread: threading.Thread 103 104 def __init__(self): 105 self._mapping = collections.OrderedDict() 106 self._eviction_thread = threading.Thread( 107 target=ChannelCache._perform_evictions, daemon=True 108 ) 109 self._eviction_thread.start() 110 111 @staticmethod 112 def get(): 113 with ChannelCache._lock: 114 if ChannelCache._singleton is None: 115 ChannelCache._singleton = ChannelCache() 116 ChannelCache._eviction_ready.wait() 117 return ChannelCache._singleton 118 119 def _evict_locked(self, key: CacheKey): 120 channel, _ = self._mapping.pop(key) 121 _LOGGER.debug( 122 "Evicting channel %s with configuration %s.", channel, key 123 ) 124 channel.close() 125 del channel 126 127 @staticmethod 128 def _perform_evictions(): 129 while True: 130 with ChannelCache._lock: 131 ChannelCache._eviction_ready.set() 132 if not ChannelCache._singleton._mapping: 133 ChannelCache._condition.wait() 134 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS: 135 key = next(iter(ChannelCache._singleton._mapping.keys())) 136 ChannelCache._singleton._evict_locked(key) 137 # And immediately reevaluate. 138 else: 139 key, (_, eviction_time) = next( 140 iter(ChannelCache._singleton._mapping.items()) 141 ) 142 now = datetime.datetime.now() 143 if eviction_time <= now: 144 ChannelCache._singleton._evict_locked(key) 145 continue 146 else: 147 time_to_eviction = (eviction_time - now).total_seconds() 148 # NOTE: We aim to *eventually* coalesce to a state in 149 # which no overdue channels are in the cache and the 150 # length of the cache is longer than _MAXIMUM_CHANNELS. 151 # We tolerate momentary states in which these two 152 # criteria are not met. 153 ChannelCache._condition.wait(timeout=time_to_eviction) 154 155 def get_channel( 156 self, 157 target: str, 158 options: Sequence[Tuple[str, str]], 159 channel_credentials: Optional[grpc.ChannelCredentials], 160 insecure: bool, 161 compression: Optional[grpc.Compression], 162 method: str, 163 _registered_method: bool, 164 ) -> Tuple[grpc.Channel, Optional[int]]: 165 """Get a channel from cache or creates a new channel. 166 167 This method also takes care of register method for channel, 168 which means we'll register a new call handle if we're calling a 169 non-registered method for an existing channel. 170 171 Returns: 172 A tuple with two items. The first item is the channel, second item is 173 the call handle if the method is registered, None if it's not registered. 174 """ 175 if insecure and channel_credentials: 176 raise ValueError( 177 "The insecure option is mutually exclusive with " 178 + "the channel_credentials option. Please use one " 179 + "or the other." 180 ) 181 if insecure: 182 channel_credentials = ( 183 grpc.experimental.insecure_channel_credentials() 184 ) 185 elif channel_credentials is None: 186 _LOGGER.debug("Defaulting to SSL channel credentials.") 187 channel_credentials = grpc.ssl_channel_credentials() 188 key = (target, options, channel_credentials, compression) 189 with self._lock: 190 channel_data = self._mapping.get(key, None) 191 call_handle = None 192 if channel_data is not None: 193 channel = channel_data[0] 194 # Register a new call handle if we're calling a registered method for an 195 # existing channel and this method is not registered. 196 if _registered_method: 197 call_handle = channel._get_registered_call_handle(method) 198 self._mapping.pop(key) 199 self._mapping[key] = ( 200 channel, 201 datetime.datetime.now() + _EVICTION_PERIOD, 202 ) 203 return channel, call_handle 204 else: 205 channel = _create_channel( 206 target, options, channel_credentials, compression 207 ) 208 if _registered_method: 209 call_handle = channel._get_registered_call_handle(method) 210 self._mapping[key] = ( 211 channel, 212 datetime.datetime.now() + _EVICTION_PERIOD, 213 ) 214 if ( 215 len(self._mapping) == 1 216 or len(self._mapping) >= _MAXIMUM_CHANNELS 217 ): 218 self._condition.notify() 219 return channel, call_handle 220 221 def _test_only_channel_count(self) -> int: 222 with self._lock: 223 return len(self._mapping) 224 225 226@experimental_api 227# pylint: disable=too-many-locals 228def unary_unary( 229 request: RequestType, 230 target: str, 231 method: str, 232 request_serializer: Optional[Callable[[Any], bytes]] = None, 233 response_deserializer: Optional[Callable[[bytes], Any]] = None, 234 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 235 channel_credentials: Optional[grpc.ChannelCredentials] = None, 236 insecure: bool = False, 237 call_credentials: Optional[grpc.CallCredentials] = None, 238 compression: Optional[grpc.Compression] = None, 239 wait_for_ready: Optional[bool] = None, 240 timeout: Optional[float] = _DEFAULT_TIMEOUT, 241 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 242 _registered_method: Optional[bool] = False, 243) -> ResponseType: 244 """Invokes a unary-unary RPC without an explicitly specified channel. 245 246 THIS IS AN EXPERIMENTAL API. 247 248 This is backed by a per-process cache of channels. Channels are evicted 249 from the cache after a fixed period by a background. Channels will also be 250 evicted if more than a configured maximum accumulate. 251 252 The default eviction period is 10 minutes. One may set the environment 253 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 254 255 The default maximum number of channels is 256. One may set the 256 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 257 this. 258 259 Args: 260 request: An iterator that yields request values for the RPC. 261 target: The server address. 262 method: The name of the RPC method. 263 request_serializer: Optional :term:`serializer` for serializing the request 264 message. Request goes unserialized in case None is passed. 265 response_deserializer: Optional :term:`deserializer` for deserializing the response 266 message. Response goes undeserialized in case None is passed. 267 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 268 runtime) to configure the channel. 269 channel_credentials: A credential applied to the whole channel, e.g. the 270 return value of grpc.ssl_channel_credentials() or 271 grpc.insecure_channel_credentials(). 272 insecure: If True, specifies channel_credentials as 273 :term:`grpc.insecure_channel_credentials()`. This option is mutually 274 exclusive with the `channel_credentials` option. 275 call_credentials: A call credential applied to each call individually, 276 e.g. the output of grpc.metadata_call_credentials() or 277 grpc.access_token_call_credentials(). 278 compression: An optional value indicating the compression method to be 279 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 280 wait_for_ready: An optional flag indicating whether the RPC should fail 281 immediately if the connection is not ready at the time the RPC is 282 invoked, or if it should wait until the connection to the server 283 becomes ready. When using this option, the user will likely also want 284 to set a timeout. Defaults to True. 285 timeout: An optional duration of time in seconds to allow for the RPC, 286 after which an exception will be raised. If timeout is unspecified, 287 defaults to a timeout controlled by the 288 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 289 unset, defaults to 60 seconds. Supply a value of None to indicate that 290 no timeout should be enforced. 291 metadata: Optional metadata to send to the server. 292 293 Returns: 294 The response to the RPC. 295 """ 296 channel, method_handle = ChannelCache.get().get_channel( 297 target, 298 options, 299 channel_credentials, 300 insecure, 301 compression, 302 method, 303 _registered_method, 304 ) 305 multicallable = channel.unary_unary( 306 method, request_serializer, response_deserializer, method_handle 307 ) 308 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 309 return multicallable( 310 request, 311 metadata=metadata, 312 wait_for_ready=wait_for_ready, 313 credentials=call_credentials, 314 timeout=timeout, 315 ) 316 317 318@experimental_api 319# pylint: disable=too-many-locals 320def unary_stream( 321 request: RequestType, 322 target: str, 323 method: str, 324 request_serializer: Optional[Callable[[Any], bytes]] = None, 325 response_deserializer: Optional[Callable[[bytes], Any]] = None, 326 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 327 channel_credentials: Optional[grpc.ChannelCredentials] = None, 328 insecure: bool = False, 329 call_credentials: Optional[grpc.CallCredentials] = None, 330 compression: Optional[grpc.Compression] = None, 331 wait_for_ready: Optional[bool] = None, 332 timeout: Optional[float] = _DEFAULT_TIMEOUT, 333 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 334 _registered_method: Optional[bool] = False, 335) -> Iterator[ResponseType]: 336 """Invokes a unary-stream RPC without an explicitly specified channel. 337 338 THIS IS AN EXPERIMENTAL API. 339 340 This is backed by a per-process cache of channels. Channels are evicted 341 from the cache after a fixed period by a background. Channels will also be 342 evicted if more than a configured maximum accumulate. 343 344 The default eviction period is 10 minutes. One may set the environment 345 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 346 347 The default maximum number of channels is 256. One may set the 348 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 349 this. 350 351 Args: 352 request: An iterator that yields request values for the RPC. 353 target: The server address. 354 method: The name of the RPC method. 355 request_serializer: Optional :term:`serializer` for serializing the request 356 message. Request goes unserialized in case None is passed. 357 response_deserializer: Optional :term:`deserializer` for deserializing the response 358 message. Response goes undeserialized in case None is passed. 359 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 360 runtime) to configure the channel. 361 channel_credentials: A credential applied to the whole channel, e.g. the 362 return value of grpc.ssl_channel_credentials(). 363 insecure: If True, specifies channel_credentials as 364 :term:`grpc.insecure_channel_credentials()`. This option is mutually 365 exclusive with the `channel_credentials` option. 366 call_credentials: A call credential applied to each call individually, 367 e.g. the output of grpc.metadata_call_credentials() or 368 grpc.access_token_call_credentials(). 369 compression: An optional value indicating the compression method to be 370 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 371 wait_for_ready: An optional flag indicating whether the RPC should fail 372 immediately if the connection is not ready at the time the RPC is 373 invoked, or if it should wait until the connection to the server 374 becomes ready. When using this option, the user will likely also want 375 to set a timeout. Defaults to True. 376 timeout: An optional duration of time in seconds to allow for the RPC, 377 after which an exception will be raised. If timeout is unspecified, 378 defaults to a timeout controlled by the 379 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 380 unset, defaults to 60 seconds. Supply a value of None to indicate that 381 no timeout should be enforced. 382 metadata: Optional metadata to send to the server. 383 384 Returns: 385 An iterator of responses. 386 """ 387 channel, method_handle = ChannelCache.get().get_channel( 388 target, 389 options, 390 channel_credentials, 391 insecure, 392 compression, 393 method, 394 _registered_method, 395 ) 396 multicallable = channel.unary_stream( 397 method, request_serializer, response_deserializer, method_handle 398 ) 399 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 400 return multicallable( 401 request, 402 metadata=metadata, 403 wait_for_ready=wait_for_ready, 404 credentials=call_credentials, 405 timeout=timeout, 406 ) 407 408 409@experimental_api 410# pylint: disable=too-many-locals 411def stream_unary( 412 request_iterator: Iterator[RequestType], 413 target: str, 414 method: str, 415 request_serializer: Optional[Callable[[Any], bytes]] = None, 416 response_deserializer: Optional[Callable[[bytes], Any]] = None, 417 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 418 channel_credentials: Optional[grpc.ChannelCredentials] = None, 419 insecure: bool = False, 420 call_credentials: Optional[grpc.CallCredentials] = None, 421 compression: Optional[grpc.Compression] = None, 422 wait_for_ready: Optional[bool] = None, 423 timeout: Optional[float] = _DEFAULT_TIMEOUT, 424 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 425 _registered_method: Optional[bool] = False, 426) -> ResponseType: 427 """Invokes a stream-unary RPC without an explicitly specified channel. 428 429 THIS IS AN EXPERIMENTAL API. 430 431 This is backed by a per-process cache of channels. Channels are evicted 432 from the cache after a fixed period by a background. Channels will also be 433 evicted if more than a configured maximum accumulate. 434 435 The default eviction period is 10 minutes. One may set the environment 436 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 437 438 The default maximum number of channels is 256. One may set the 439 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 440 this. 441 442 Args: 443 request_iterator: An iterator that yields request values for the RPC. 444 target: The server address. 445 method: The name of the RPC method. 446 request_serializer: Optional :term:`serializer` for serializing the request 447 message. Request goes unserialized in case None is passed. 448 response_deserializer: Optional :term:`deserializer` for deserializing the response 449 message. Response goes undeserialized in case None is passed. 450 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 451 runtime) to configure the channel. 452 channel_credentials: A credential applied to the whole channel, e.g. the 453 return value of grpc.ssl_channel_credentials(). 454 call_credentials: A call credential applied to each call individually, 455 e.g. the output of grpc.metadata_call_credentials() or 456 grpc.access_token_call_credentials(). 457 insecure: If True, specifies channel_credentials as 458 :term:`grpc.insecure_channel_credentials()`. This option is mutually 459 exclusive with the `channel_credentials` option. 460 compression: An optional value indicating the compression method to be 461 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 462 wait_for_ready: An optional flag indicating whether the RPC should fail 463 immediately if the connection is not ready at the time the RPC is 464 invoked, or if it should wait until the connection to the server 465 becomes ready. When using this option, the user will likely also want 466 to set a timeout. Defaults to True. 467 timeout: An optional duration of time in seconds to allow for the RPC, 468 after which an exception will be raised. If timeout is unspecified, 469 defaults to a timeout controlled by the 470 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 471 unset, defaults to 60 seconds. Supply a value of None to indicate that 472 no timeout should be enforced. 473 metadata: Optional metadata to send to the server. 474 475 Returns: 476 The response to the RPC. 477 """ 478 channel, method_handle = ChannelCache.get().get_channel( 479 target, 480 options, 481 channel_credentials, 482 insecure, 483 compression, 484 method, 485 _registered_method, 486 ) 487 multicallable = channel.stream_unary( 488 method, request_serializer, response_deserializer, method_handle 489 ) 490 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 491 return multicallable( 492 request_iterator, 493 metadata=metadata, 494 wait_for_ready=wait_for_ready, 495 credentials=call_credentials, 496 timeout=timeout, 497 ) 498 499 500@experimental_api 501# pylint: disable=too-many-locals 502def stream_stream( 503 request_iterator: Iterator[RequestType], 504 target: str, 505 method: str, 506 request_serializer: Optional[Callable[[Any], bytes]] = None, 507 response_deserializer: Optional[Callable[[bytes], Any]] = None, 508 options: Sequence[Tuple[AnyStr, AnyStr]] = (), 509 channel_credentials: Optional[grpc.ChannelCredentials] = None, 510 insecure: bool = False, 511 call_credentials: Optional[grpc.CallCredentials] = None, 512 compression: Optional[grpc.Compression] = None, 513 wait_for_ready: Optional[bool] = None, 514 timeout: Optional[float] = _DEFAULT_TIMEOUT, 515 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None, 516 _registered_method: Optional[bool] = False, 517) -> Iterator[ResponseType]: 518 """Invokes a stream-stream RPC without an explicitly specified channel. 519 520 THIS IS AN EXPERIMENTAL API. 521 522 This is backed by a per-process cache of channels. Channels are evicted 523 from the cache after a fixed period by a background. Channels will also be 524 evicted if more than a configured maximum accumulate. 525 526 The default eviction period is 10 minutes. One may set the environment 527 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. 528 529 The default maximum number of channels is 256. One may set the 530 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure 531 this. 532 533 Args: 534 request_iterator: An iterator that yields request values for the RPC. 535 target: The server address. 536 method: The name of the RPC method. 537 request_serializer: Optional :term:`serializer` for serializing the request 538 message. Request goes unserialized in case None is passed. 539 response_deserializer: Optional :term:`deserializer` for deserializing the response 540 message. Response goes undeserialized in case None is passed. 541 options: An optional list of key-value pairs (:term:`channel_arguments` in gRPC Core 542 runtime) to configure the channel. 543 channel_credentials: A credential applied to the whole channel, e.g. the 544 return value of grpc.ssl_channel_credentials(). 545 call_credentials: A call credential applied to each call individually, 546 e.g. the output of grpc.metadata_call_credentials() or 547 grpc.access_token_call_credentials(). 548 insecure: If True, specifies channel_credentials as 549 :term:`grpc.insecure_channel_credentials()`. This option is mutually 550 exclusive with the `channel_credentials` option. 551 compression: An optional value indicating the compression method to be 552 used over the lifetime of the channel, e.g. grpc.Compression.Gzip. 553 wait_for_ready: An optional flag indicating whether the RPC should fail 554 immediately if the connection is not ready at the time the RPC is 555 invoked, or if it should wait until the connection to the server 556 becomes ready. When using this option, the user will likely also want 557 to set a timeout. Defaults to True. 558 timeout: An optional duration of time in seconds to allow for the RPC, 559 after which an exception will be raised. If timeout is unspecified, 560 defaults to a timeout controlled by the 561 GRPC_PYTHON_DEFAULT_TIMEOUT_SECONDS environment variable. If that is 562 unset, defaults to 60 seconds. Supply a value of None to indicate that 563 no timeout should be enforced. 564 metadata: Optional metadata to send to the server. 565 566 Returns: 567 An iterator of responses. 568 """ 569 channel, method_handle = ChannelCache.get().get_channel( 570 target, 571 options, 572 channel_credentials, 573 insecure, 574 compression, 575 method, 576 _registered_method, 577 ) 578 multicallable = channel.stream_stream( 579 method, request_serializer, response_deserializer, method_handle 580 ) 581 wait_for_ready = wait_for_ready if wait_for_ready is not None else True 582 return multicallable( 583 request_iterator, 584 metadata=metadata, 585 wait_for_ready=wait_for_ready, 586 credentials=call_credentials, 587 timeout=timeout, 588 ) 589