xref: /aosp_15_r20/external/autotest/server/hosts/rpc_server_tracker.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2015 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import six.moves.http_client
7import logging
8import socket
9import tempfile
10import time
11import six.moves.xmlrpc_client
12
13import common
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import error
16from autotest_lib.client.common_lib.cros import retry
17
18
19class RpcServerTracker(object):
20    """
21    This class keeps track of all the RPC server connections started on a
22    remote host. The caller can use either |xmlrpc_connect| to start the
23    required type of rpc server on the remote host.
24    The host will cleanup all the open RPC server connections on disconnect.
25    """
26
27    _RPC_PROXY_URL_FORMAT = 'http://localhost:%d'
28    _RPC_HOST_ADDRESS_FORMAT = 'localhost:%d'
29    _RPC_SHUTDOWN_POLLING_PERIOD_SECONDS = 2
30    _RPC_SHUTDOWN_TIMEOUT_SECONDS = 10
31
32    def __init__(self, host):
33        """
34        @param port: The host object associated with this instance of
35                     RpcServerTracker.
36        """
37        self._host = host
38        self._rpc_proxy_map = {}
39
40
41    def _setup_port(self, port, command_name, remote_pid=None):
42        """Sets up a tunnel process and register it to rpc_server_tracker.
43
44        ChromeOS on the target closes down most external ports for security.
45        We could open the port, but doing that would conflict with security
46        tests that check that only expected ports are open.  So, to get to
47        the port on the target we use an ssh tunnel.
48
49        This method assumes that xmlrpc and jsonrpc never conflict, since
50        we can only either have an xmlrpc or a jsonrpc server listening on
51        a remote port. As such, it enforces a single proxy->remote port
52        policy, i.e if one starts a jsonrpc proxy/server from port A->B,
53        and then tries to start an xmlrpc proxy forwarded to the same port,
54        the xmlrpc proxy will override the jsonrpc tunnel process, however:
55
56        1. None of the methods on the xmlrpc proxy will work because
57        the server listening on B is jsonrpc.
58
59        2. The xmlrpc client cannot initiate a termination of the JsonRPC
60        server, as the only use case currently is goofy, which is tied to
61        the factory image. It is much easier to handle a failed xmlrpc
62        call on the client than it is to terminate goofy in this scenario,
63        as doing the latter might leave the DUT in a hard to recover state.
64
65        With the current implementation newer rpc proxy connections will
66        terminate the tunnel processes of older rpc connections tunneling
67        to the same remote port. If methods are invoked on the client
68        after this has happened they will fail with connection closed errors.
69
70        @param port: The remote forwarding port.
71        @param command_name: The name of the remote process, to terminate
72                                using pkill.
73        @param remote_pid: The PID of the remote background process
74                            as a string.
75
76        @return the local port which is used for port forwarding on the ssh
77                    client.
78        """
79        self.disconnect(port)
80        local_port = utils.get_unused_port()
81        tunnel_proc = self._host.create_ssh_tunnel(port, local_port)
82        self._rpc_proxy_map[port] = (command_name, tunnel_proc, remote_pid)
83        return local_port
84
85
86    def _setup_rpc(self, port, command_name, remote_pid=None):
87        """Construct a URL for an rpc connection using ssh tunnel.
88
89        @param port: The remote forwarding port.
90        @param command_name: The name of the remote process, to terminate
91                              using pkill.
92        @param remote_pid: The PID of the remote background process
93                            as a string.
94
95        @return a url that we can use to initiate the rpc connection.
96        """
97        return self._RPC_PROXY_URL_FORMAT % self._setup_port(
98                port, command_name, remote_pid=remote_pid)
99
100
101    def tunnel_connect(self, port):
102        """Construct a host address using ssh tunnel.
103
104        @param port: The remote forwarding port.
105
106        @return a host address using ssh tunnel.
107        """
108        return self._RPC_HOST_ADDRESS_FORMAT % self._setup_port(port, None)
109
110
111    def xmlrpc_connect(self, command, port, command_name=None,
112                       ready_test_name=None, timeout_seconds=10,
113                       logfile=None, request_timeout_seconds=None,
114                       server_desc=None):
115        """Connect to an XMLRPC server on the host.
116
117        The `command` argument should be a simple shell command that
118        starts an XMLRPC server on the given `port`.  The command
119        must not daemonize, and must terminate cleanly on SIGTERM.
120        The command is started in the background on the host, and a
121        local XMLRPC client for the server is created and returned
122        to the caller.
123
124        Note that the process of creating an XMLRPC client makes no
125        attempt to connect to the remote server; the caller is
126        responsible for determining whether the server is running
127        correctly, and is ready to serve requests.
128
129        Optionally, the caller can pass ready_test_name, a string
130        containing the name of a method to call on the proxy.  This
131        method should take no parameters and return successfully only
132        when the server is ready to process client requests.  When
133        ready_test_name is set, xmlrpc_connect will block until the
134        proxy is ready, and throw a TestError if the server isn't
135        ready by timeout_seconds.
136
137        If a server is already running on the remote port, this
138        method will kill it and disconnect the tunnel process
139        associated with the connection before establishing a new one,
140        by consulting the rpc_proxy_map in disconnect.
141
142        @param command Shell command to start the server.
143        @param port Port number on which the server is expected to
144                    be serving.
145        @param command_name String to use as input to `pkill` to
146            terminate the XMLRPC server on the host.
147        @param ready_test_name String containing the name of a
148            method defined on the XMLRPC server.
149        @param timeout_seconds Number of seconds to wait
150            for the server to become 'ready.'  Will throw a
151            TestFail error if server is not ready in time.
152        @param logfile Logfile to send output when running
153            'command' argument.
154        @param request_timeout_seconds Timeout in seconds for an XMLRPC request.
155        @param server_desc: Extra text to report in socket.error descriptions.
156
157        """
158        # Clean up any existing state.  If the caller is willing
159        # to believe their server is down, we ought to clean up
160        # any tunnels we might have sitting around.
161        self.disconnect(port)
162        remote_pid = None
163        if command is not None:
164            if logfile:
165                remote_cmd = '%s > %s 2>&1' % (command, logfile)
166            else:
167                remote_cmd = command
168            remote_pid = self._host.run_background(remote_cmd)
169            logging.debug('Started XMLRPC server on host %s, pid = %s',
170                          self._host.hostname, remote_pid)
171
172        # Tunnel through SSH to be able to reach that remote port.
173        rpc_url = self._setup_rpc(port, command_name, remote_pid=remote_pid)
174        if not server_desc:
175            server_desc = "<%s '%s:%s'>" % (command_name or 'XMLRPC',
176                                            self._host.hostname, port)
177        server_desc = '%s (%s)' % (server_desc, rpc_url.replace('http://', ''))
178        if request_timeout_seconds is not None:
179            proxy = TimeoutXMLRPCServerProxy(
180                    rpc_url, timeout=request_timeout_seconds, allow_none=True)
181        else:
182            proxy = six.moves.xmlrpc_client.ServerProxy(rpc_url, allow_none=True)
183
184        if ready_test_name is not None:
185            # retry.retry logs each attempt; calculate delay_sec to
186            # keep log spam to a dull roar.
187            @retry.retry((socket.error,
188                          six.moves.xmlrpc_client.ProtocolError,
189                          six.moves.http_client.BadStatusLine),
190                         timeout_min=timeout_seconds / 60.0,
191                         delay_sec=min(max(timeout_seconds / 20.0, 0.1), 1))
192            def ready_test():
193                """ Call proxy.ready_test_name(). """
194                try:
195                    getattr(proxy, ready_test_name)()
196                except socket.error as e:
197                    e.filename = server_desc
198                    raise
199
200            try:
201                logging.info('Waiting %d seconds for XMLRPC server '
202                             'to start.', timeout_seconds)
203                ready_test()
204            except Exception as exc:
205                log_lines = []
206                if logfile:
207                    logging.warning('Failed to start XMLRPC server; getting log.')
208                    with tempfile.NamedTemporaryFile() as temp:
209                        self._host.get_file(logfile, temp.name)
210                        with open(temp.name) as f:
211                            log_lines = f.read().rstrip().splitlines()
212                else:
213                    logging.warning('Failed to start XMLRPC server; no log.')
214
215                logging.error(
216                        'Failed to start XMLRPC server:  %s.%s: %s.',
217                        type(exc).__module__, type(exc).__name__,
218                        str(exc).rstrip('.'))
219
220                if isinstance(exc, six.moves.http_client.BadStatusLine):
221                    # BadStatusLine: inject the last log line into the message,
222                    # using the 'line' and 'args' attributes.
223                    if log_lines:
224                        if exc.line:
225                            exc.line = '%s -- Log tail: %r' % (
226                                    exc.line, log_lines[-1])
227                        else:
228                            exc.line = 'Log tail: %r' % (
229                                    log_lines[-1])
230                        exc.args = (exc.line,)
231                elif isinstance(exc, socket.error):
232                    # socket.error: inject the last log line into the message,
233                    # using the 'filename' attribute.
234                    if log_lines:
235                        if exc.filename:
236                            exc.filename = '%s -- Log tail: %r' % (
237                                    exc.filename, log_lines[-1])
238                        else:
239                            exc.filename = 'Log tail: %r' % log_lines[-1]
240                elif log_lines:
241                    # Unusual failure: can't inject the last log line,
242                    # so report it via logging.
243                    logging.error('Log tail: %r', log_lines[-1])
244
245                if len(log_lines) > 1:
246                    # The failure messages include only the last line,
247                    # so report the whole thing if it had more lines.
248                    logging.error('Full XMLRPC server log:\n%s',
249                                  '\n'.join(log_lines))
250
251                self.disconnect(port)
252                raise
253        logging.info('XMLRPC server started successfully.')
254        return proxy
255
256    def disconnect(self, port, pkill=True):
257        """Disconnect from an RPC server on the host.
258
259        Terminates the remote RPC server previously started for
260        the given `port`.  Also closes the local ssh tunnel created
261        for the connection to the host.  This function does not
262        directly alter the state of a previously returned RPC
263        client object; however disconnection will cause all
264        subsequent calls to methods on the object to fail.
265
266        This function does nothing if requested to disconnect a port
267        that was not previously connected via _setup_rpc.
268
269        @param port Port number passed to a previous call to `_setup_rpc()`.
270        @param pkill: if True, ssh in to the server and pkill the process.
271        """
272        if port not in self._rpc_proxy_map:
273            return
274        remote_name, tunnel_proc, remote_pid = self._rpc_proxy_map[port]
275        if pkill and remote_name:
276            # We use 'pkill' to find our target process rather than
277            # a PID, because the host may have rebooted since
278            # connecting, and we don't want to kill an innocent
279            # process with the same PID.
280            #
281            # 'pkill' helpfully exits with status 1 if no target
282            # process  is found, for which run() will throw an
283            # exception.  We don't want that, so we the ignore
284            # status.
285            self._host.run("pkill -f '%s'" % remote_name, ignore_status=True)
286            if remote_pid:
287                logging.info('Waiting for RPC server "%s" shutdown (%s)',
288                             remote_name, remote_pid)
289                start_time = time.time()
290                while (time.time() - start_time <
291                       self._RPC_SHUTDOWN_TIMEOUT_SECONDS):
292                    running_processes = self._host.run(
293                            "pgrep -f '%s'" % remote_name,
294                            ignore_status=True).stdout.split()
295                    if not remote_pid in running_processes:
296                        logging.info('Shut down RPC server %s.', remote_pid)
297                        break
298                    time.sleep(self._RPC_SHUTDOWN_POLLING_PERIOD_SECONDS)
299                    self._host.run("pkill -9 -f '%s'" % remote_name,
300                                   ignore_status=True)
301                else:
302                    raise error.TestError('Failed to shutdown RPC server %s' %
303                                          remote_name)
304
305        self._host.disconnect_ssh_tunnel(tunnel_proc)
306        del self._rpc_proxy_map[port]
307
308
309    def disconnect_all(self):
310        """Disconnect all known RPC proxy ports."""
311        for port in list(self._rpc_proxy_map.keys()):
312            self.disconnect(port)
313
314
315class TimeoutXMLRPCServerProxy(six.moves.xmlrpc_client.ServerProxy):
316    """XMLRPC ServerProxy supporting timeout."""
317    def __init__(self, uri, timeout=20, *args, **kwargs):
318        """Initializes a TimeoutXMLRPCServerProxy.
319
320        @param uri: URI to a XMLRPC server.
321        @param timeout: Timeout in seconds for a XMLRPC request.
322        @param *args: args to xmlrpclib.ServerProxy.
323        @param **kwargs: kwargs to xmlrpclib.ServerProxy.
324
325        """
326        if timeout:
327            kwargs['transport'] = TimeoutXMLRPCTransport(timeout=timeout)
328        six.moves.xmlrpc_client.ServerProxy.__init__(self, uri, *args, **kwargs)
329
330
331class TimeoutXMLRPCTransport(six.moves.xmlrpc_client.Transport):
332    """A Transport subclass supporting timeout."""
333    def __init__(self, timeout=20, *args, **kwargs):
334        """Initializes a TimeoutXMLRPCTransport.
335
336        @param timeout: Timeout in seconds for a HTTP request through this transport layer.
337        @param *args: args to xmlrpclib.Transport.
338        @param **kwargs: kwargs to xmlrpclib.Transport.
339
340        """
341        six.moves.xmlrpc_client.Transport.__init__(self, *args, **kwargs)
342        self.timeout = timeout
343
344
345    def make_connection(self, host):
346        """Overwrites make_connection in xmlrpclib.Transport with timeout.
347
348        @param host: Host address to connect.
349
350        @return: A httplib.HTTPConnection connecting to host with timeout.
351
352        """
353        conn = six.moves.http_client.HTTPConnection(host, timeout=self.timeout)
354        return conn
355