1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15Provides an interface to xDS Test Server running remotely.
16"""
17import functools
18import logging
19from typing import Iterator, Optional
20
21import framework.rpc
22from framework.rpc import grpc_channelz
23from framework.rpc import grpc_testing
24
25logger = logging.getLogger(__name__)
26
27# Type aliases
28_ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
29_XdsUpdateHealthServiceClient = grpc_testing.XdsUpdateHealthServiceClient
30_HealthClient = grpc_testing.HealthClient
31
32
33class XdsTestServer(framework.rpc.grpc.GrpcApp):
34    """
35    Represents RPC services implemented in Server component of the xDS test app.
36    https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server
37    """
38    # A unique host name identifying each server replica.
39    # Server implementation must return this in the SimpleResponse.hostname,
40    # which client uses as the key in rpcs_by_peer map.
41    hostname: str
42
43    def __init__(self,
44                 *,
45                 ip: str,
46                 rpc_port: int,
47                 hostname: str,
48                 maintenance_port: Optional[int] = None,
49                 secure_mode: Optional[bool] = False,
50                 xds_host: Optional[str] = None,
51                 xds_port: Optional[int] = None,
52                 rpc_host: Optional[str] = None):
53        super().__init__(rpc_host=(rpc_host or ip))
54        self.ip = ip
55        self.rpc_port = rpc_port
56        self.hostname = hostname
57        self.maintenance_port = maintenance_port or rpc_port
58        self.secure_mode = secure_mode
59        self.xds_host, self.xds_port = xds_host, xds_port
60
61    @property
62    @functools.lru_cache(None)
63    def channelz(self) -> _ChannelzServiceClient:
64        return _ChannelzServiceClient(
65            self._make_channel(self.maintenance_port),
66            log_target=f'{self.hostname}:{self.maintenance_port}')
67
68    @property
69    @functools.lru_cache(None)
70    def update_health_service_client(self) -> _XdsUpdateHealthServiceClient:
71        return _XdsUpdateHealthServiceClient(
72            self._make_channel(self.maintenance_port),
73            log_target=f'{self.hostname}:{self.maintenance_port}')
74
75    @property
76    @functools.lru_cache(None)
77    def health_client(self) -> _HealthClient:
78        return _HealthClient(
79            self._make_channel(self.maintenance_port),
80            log_target=f'{self.hostname}:{self.maintenance_port}')
81
82    def set_serving(self):
83        logger.info('[%s] Setting health status to SERVING', self.hostname)
84        self.update_health_service_client.set_serving()
85        logger.info('[%s] Health status %s', self.hostname,
86                    self.health_client.check_health())
87
88    def set_not_serving(self):
89        logger.info('[%s] Setting health status to NOT_SERVING', self.hostname)
90        self.update_health_service_client.set_not_serving()
91        logger.info('[%s] Health status %s', self.hostname,
92                    self.health_client.check_health())
93
94    def set_xds_address(self, xds_host, xds_port: Optional[int] = None):
95        self.xds_host, self.xds_port = xds_host, xds_port
96
97    @property
98    def xds_address(self) -> str:
99        if not self.xds_host:
100            return ''
101        if not self.xds_port:
102            return self.xds_host
103        return f'{self.xds_host}:{self.xds_port}'
104
105    @property
106    def xds_uri(self) -> str:
107        if not self.xds_host:
108            return ''
109        return f'xds:///{self.xds_address}'
110
111    def get_test_server(self) -> grpc_channelz.Server:
112        """Return channelz representation of a server running TestService.
113
114        Raises:
115            GrpcApp.NotFound: Test server not found.
116        """
117        server = self.channelz.find_server_listening_on_port(self.rpc_port)
118        if not server:
119            raise self.NotFound(f'[{self.hostname}] Server'
120                                f'listening on port {self.rpc_port} not found')
121        return server
122
123    def get_test_server_sockets(self) -> Iterator[grpc_channelz.Socket]:
124        """List all sockets of the test server.
125
126        Raises:
127            GrpcApp.NotFound: Test server not found.
128        """
129        server = self.get_test_server()
130        return self.channelz.list_server_sockets(server)
131
132    def get_server_socket_matching_client(self,
133                                          client_socket: grpc_channelz.Socket):
134        """Find test server socket that matches given test client socket.
135
136        Sockets are matched using TCP endpoints (ip:port), further on "address".
137        Server socket remote address matched with client socket local address.
138
139         Raises:
140             GrpcApp.NotFound: Server socket matching client socket not found.
141         """
142        client_local = self.channelz.sock_address_to_str(client_socket.local)
143        logger.debug(
144            '[%s] Looking for a server socket connected '
145            'to the client %s', self.hostname, client_local)
146
147        server_socket = self.channelz.find_server_socket_matching_client(
148            self.get_test_server_sockets(), client_socket)
149        if not server_socket:
150            raise self.NotFound(f'[{self.hostname}] Socket '
151                                f'to client {client_local} not found')
152
153        logger.info(
154            '[%s] Found matching socket pair: '
155            'server(%s) <-> client(%s)', self.hostname,
156            self.channelz.sock_addresses_pretty(server_socket),
157            self.channelz.sock_addresses_pretty(client_socket))
158        return server_socket
159