1# Copyright 2020 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14""" 15Provides an interface to xDS Test Server running remotely. 16""" 17import functools 18import logging 19from typing import Iterator, Optional 20 21import framework.rpc 22from framework.rpc import grpc_channelz 23from framework.rpc import grpc_testing 24 25logger = logging.getLogger(__name__) 26 27# Type aliases 28_ChannelzServiceClient = grpc_channelz.ChannelzServiceClient 29_XdsUpdateHealthServiceClient = grpc_testing.XdsUpdateHealthServiceClient 30_HealthClient = grpc_testing.HealthClient 31 32 33class XdsTestServer(framework.rpc.grpc.GrpcApp): 34 """ 35 Represents RPC services implemented in Server component of the xDS test app. 36 https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server 37 """ 38 # A unique host name identifying each server replica. 39 # Server implementation must return this in the SimpleResponse.hostname, 40 # which client uses as the key in rpcs_by_peer map. 41 hostname: str 42 43 def __init__(self, 44 *, 45 ip: str, 46 rpc_port: int, 47 hostname: str, 48 maintenance_port: Optional[int] = None, 49 secure_mode: Optional[bool] = False, 50 xds_host: Optional[str] = None, 51 xds_port: Optional[int] = None, 52 rpc_host: Optional[str] = None): 53 super().__init__(rpc_host=(rpc_host or ip)) 54 self.ip = ip 55 self.rpc_port = rpc_port 56 self.hostname = hostname 57 self.maintenance_port = maintenance_port or rpc_port 58 self.secure_mode = secure_mode 59 self.xds_host, self.xds_port = xds_host, xds_port 60 61 @property 62 @functools.lru_cache(None) 63 def channelz(self) -> _ChannelzServiceClient: 64 return _ChannelzServiceClient( 65 self._make_channel(self.maintenance_port), 66 log_target=f'{self.hostname}:{self.maintenance_port}') 67 68 @property 69 @functools.lru_cache(None) 70 def update_health_service_client(self) -> _XdsUpdateHealthServiceClient: 71 return _XdsUpdateHealthServiceClient( 72 self._make_channel(self.maintenance_port), 73 log_target=f'{self.hostname}:{self.maintenance_port}') 74 75 @property 76 @functools.lru_cache(None) 77 def health_client(self) -> _HealthClient: 78 return _HealthClient( 79 self._make_channel(self.maintenance_port), 80 log_target=f'{self.hostname}:{self.maintenance_port}') 81 82 def set_serving(self): 83 logger.info('[%s] Setting health status to SERVING', self.hostname) 84 self.update_health_service_client.set_serving() 85 logger.info('[%s] Health status %s', self.hostname, 86 self.health_client.check_health()) 87 88 def set_not_serving(self): 89 logger.info('[%s] Setting health status to NOT_SERVING', self.hostname) 90 self.update_health_service_client.set_not_serving() 91 logger.info('[%s] Health status %s', self.hostname, 92 self.health_client.check_health()) 93 94 def set_xds_address(self, xds_host, xds_port: Optional[int] = None): 95 self.xds_host, self.xds_port = xds_host, xds_port 96 97 @property 98 def xds_address(self) -> str: 99 if not self.xds_host: 100 return '' 101 if not self.xds_port: 102 return self.xds_host 103 return f'{self.xds_host}:{self.xds_port}' 104 105 @property 106 def xds_uri(self) -> str: 107 if not self.xds_host: 108 return '' 109 return f'xds:///{self.xds_address}' 110 111 def get_test_server(self) -> grpc_channelz.Server: 112 """Return channelz representation of a server running TestService. 113 114 Raises: 115 GrpcApp.NotFound: Test server not found. 116 """ 117 server = self.channelz.find_server_listening_on_port(self.rpc_port) 118 if not server: 119 raise self.NotFound(f'[{self.hostname}] Server' 120 f'listening on port {self.rpc_port} not found') 121 return server 122 123 def get_test_server_sockets(self) -> Iterator[grpc_channelz.Socket]: 124 """List all sockets of the test server. 125 126 Raises: 127 GrpcApp.NotFound: Test server not found. 128 """ 129 server = self.get_test_server() 130 return self.channelz.list_server_sockets(server) 131 132 def get_server_socket_matching_client(self, 133 client_socket: grpc_channelz.Socket): 134 """Find test server socket that matches given test client socket. 135 136 Sockets are matched using TCP endpoints (ip:port), further on "address". 137 Server socket remote address matched with client socket local address. 138 139 Raises: 140 GrpcApp.NotFound: Server socket matching client socket not found. 141 """ 142 client_local = self.channelz.sock_address_to_str(client_socket.local) 143 logger.debug( 144 '[%s] Looking for a server socket connected ' 145 'to the client %s', self.hostname, client_local) 146 147 server_socket = self.channelz.find_server_socket_matching_client( 148 self.get_test_server_sockets(), client_socket) 149 if not server_socket: 150 raise self.NotFound(f'[{self.hostname}] Socket ' 151 f'to client {client_local} not found') 152 153 logger.info( 154 '[%s] Found matching socket pair: ' 155 'server(%s) <-> client(%s)', self.hostname, 156 self.channelz.sock_addresses_pretty(server_socket), 157 self.channelz.sock_addresses_pretty(client_socket)) 158 return server_socket 159