1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""The Python example of utilizing wait-for-ready flag.""" 15 16import asyncio 17from contextlib import contextmanager 18import logging 19import socket 20from typing import Iterable 21 22import grpc 23 24helloworld_pb2, helloworld_pb2_grpc = grpc.protos_and_services( 25 "helloworld.proto" 26) 27 28_LOGGER = logging.getLogger(__name__) 29_LOGGER.setLevel(logging.INFO) 30 31 32@contextmanager 33def get_free_loopback_tcp_port() -> Iterable[str]: 34 if socket.has_ipv6: 35 tcp_socket = socket.socket(socket.AF_INET6) 36 else: 37 tcp_socket = socket.socket(socket.AF_INET) 38 tcp_socket.bind(("", 0)) 39 address_tuple = tcp_socket.getsockname() 40 yield f"localhost:{address_tuple[1]}" 41 tcp_socket.close() 42 43 44class Greeter(helloworld_pb2_grpc.GreeterServicer): 45 async def SayHello( 46 self, request: helloworld_pb2.HelloRequest, unused_context 47 ) -> helloworld_pb2.HelloReply: 48 return helloworld_pb2.HelloReply(message=f"Hello, {request.name}!") 49 50 51def create_server(server_address: str): 52 server = grpc.aio.server() 53 helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) 54 bound_port = server.add_insecure_port(server_address) 55 assert bound_port == int(server_address.split(":")[-1]) 56 return server 57 58 59async def process( 60 stub: helloworld_pb2_grpc.GreeterStub, wait_for_ready: bool = None 61) -> None: 62 try: 63 response = await stub.SayHello( 64 helloworld_pb2.HelloRequest(name="you"), 65 wait_for_ready=wait_for_ready, 66 ) 67 message = response.message 68 except grpc.aio.AioRpcError as rpc_error: 69 assert rpc_error.code() == grpc.StatusCode.UNAVAILABLE 70 assert not wait_for_ready 71 message = rpc_error 72 else: 73 assert wait_for_ready 74 _LOGGER.info( 75 "Wait-for-ready %s, client received: %s", 76 "enabled" if wait_for_ready else "disabled", 77 message, 78 ) 79 80 81async def main() -> None: 82 # Pick a random free port 83 with get_free_loopback_tcp_port() as server_address: 84 # Create gRPC channel 85 channel = grpc.aio.insecure_channel(server_address) 86 stub = helloworld_pb2_grpc.GreeterStub(channel) 87 88 # Fire an RPC without wait_for_ready 89 fail_fast_task = asyncio.get_event_loop().create_task( 90 process(stub, wait_for_ready=False) 91 ) 92 # Fire an RPC with wait_for_ready 93 wait_for_ready_task = asyncio.get_event_loop().create_task( 94 process(stub, wait_for_ready=True) 95 ) 96 97 # Wait for the channel entering TRANSIENT FAILURE state. 98 state = channel.get_state() 99 while state != grpc.ChannelConnectivity.TRANSIENT_FAILURE: 100 await channel.wait_for_state_change(state) 101 state = channel.get_state() 102 103 # Start the server to handle the RPC 104 server = create_server(server_address) 105 await server.start() 106 107 # Expected to fail with StatusCode.UNAVAILABLE. 108 await fail_fast_task 109 # Expected to success. 110 await wait_for_ready_task 111 112 await server.stop(None) 113 await channel.close() 114 115 116if __name__ == "__main__": 117 logging.basicConfig(level=logging.INFO) 118 asyncio.get_event_loop().run_until_complete(main()) 119