xref: /aosp_15_r20/external/autotest/server/hosts/ssh_multiplex.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Li# Copyright 2017 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li
6*9c5db199SXin Lifrom __future__ import absolute_import
7*9c5db199SXin Lifrom __future__ import division
8*9c5db199SXin Lifrom __future__ import print_function
9*9c5db199SXin Li
10*9c5db199SXin Liimport logging
11*9c5db199SXin Liimport multiprocessing
12*9c5db199SXin Liimport os
13*9c5db199SXin Liimport threading
14*9c5db199SXin Li
15*9c5db199SXin Lifrom autotest_lib.client.common_lib import autotemp
16*9c5db199SXin Lifrom autotest_lib.server import utils
17*9c5db199SXin Liimport six
18*9c5db199SXin Li
19*9c5db199SXin Li# TODO b:169251326 terms below are set outside of this codebase
20*9c5db199SXin Li# and should be updated when possible. ("master" -> "main")
21*9c5db199SXin Li_MAIN_SSH_COMMAND_TEMPLATE = (
22*9c5db199SXin Li        '/usr/bin/ssh -a -x -N '
23*9c5db199SXin Li        '-o ControlMaster=yes '  # Create multiplex socket. # nocheck
24*9c5db199SXin Li        '-o ControlPath=%(socket)s '
25*9c5db199SXin Li        '-o StrictHostKeyChecking=no '
26*9c5db199SXin Li        '-o UserKnownHostsFile=/dev/null '
27*9c5db199SXin Li        '-o BatchMode=yes '
28*9c5db199SXin Li        '-o ConnectTimeout=30 '
29*9c5db199SXin Li        '-o ServerAliveInterval=30 '
30*9c5db199SXin Li        '-o ServerAliveCountMax=1 '
31*9c5db199SXin Li        '-o ConnectionAttempts=1 '
32*9c5db199SXin Li        '-o Protocol=2 '
33*9c5db199SXin Li        '-l %(user)s %(port)s %(hostname)s')
34*9c5db199SXin Li
35*9c5db199SXin Li
36*9c5db199SXin Liclass MainSsh(object):
37*9c5db199SXin Li    """Manages multiplex ssh connection."""
38*9c5db199SXin Li
39*9c5db199SXin Li    def __init__(self, hostname, user, port):
40*9c5db199SXin Li        self._hostname = hostname
41*9c5db199SXin Li        self._user = user
42*9c5db199SXin Li        self._port = port
43*9c5db199SXin Li
44*9c5db199SXin Li        self._main_job = None
45*9c5db199SXin Li        self._main_tempdir = None
46*9c5db199SXin Li
47*9c5db199SXin Li        self._lock = multiprocessing.Lock()
48*9c5db199SXin Li
49*9c5db199SXin Li    def __del__(self):
50*9c5db199SXin Li        self.close()
51*9c5db199SXin Li
52*9c5db199SXin Li    @property
53*9c5db199SXin Li    def _socket_path(self):
54*9c5db199SXin Li        return os.path.join(self._main_tempdir.name, 'socket')
55*9c5db199SXin Li
56*9c5db199SXin Li    @property
57*9c5db199SXin Li    def ssh_option(self):
58*9c5db199SXin Li        """Returns the ssh option to use this multiplexed ssh.
59*9c5db199SXin Li
60*9c5db199SXin Li        If background process is not running, returns an empty string.
61*9c5db199SXin Li        """
62*9c5db199SXin Li        if not self._main_tempdir:
63*9c5db199SXin Li            return ''
64*9c5db199SXin Li        return '-o ControlPath=%s' % (self._socket_path,)
65*9c5db199SXin Li
66*9c5db199SXin Li    def maybe_start(self, timeout=5):
67*9c5db199SXin Li        """Starts the background process to run multiplex ssh connection.
68*9c5db199SXin Li
69*9c5db199SXin Li        If there already is a background process running, this does nothing.
70*9c5db199SXin Li        If there is a stale process or a stale socket, first clean them up,
71*9c5db199SXin Li        then create a background process.
72*9c5db199SXin Li
73*9c5db199SXin Li        @param timeout: timeout in seconds (default 5) to wait for main ssh
74*9c5db199SXin Li                        connection to be established. If timeout is reached, a
75*9c5db199SXin Li                        warning message is logged, but no other action is
76*9c5db199SXin Li                        taken.
77*9c5db199SXin Li        """
78*9c5db199SXin Li        # Multiple processes might try in parallel to clean up the old main
79*9c5db199SXin Li        # ssh connection and create a new one, therefore use a lock to protect
80*9c5db199SXin Li        # against race conditions.
81*9c5db199SXin Li        with self._lock:
82*9c5db199SXin Li            # If a previously started main SSH connection is not running
83*9c5db199SXin Li            # anymore, it needs to be cleaned up and then restarted.
84*9c5db199SXin Li            if (self._main_job and (not os.path.exists(self._socket_path) or
85*9c5db199SXin Li                                      self._main_job.sp.poll() is not None)):
86*9c5db199SXin Li                logging.info(
87*9c5db199SXin Li                        'Main-ssh connection to %s is down.', self._hostname)
88*9c5db199SXin Li                self._close_internal()
89*9c5db199SXin Li
90*9c5db199SXin Li            # Start a new main SSH connection.
91*9c5db199SXin Li            if not self._main_job:
92*9c5db199SXin Li                # Create a shared socket in a temp location.
93*9c5db199SXin Li                self._main_tempdir = autotemp.tempdir(dir=_short_tmpdir())
94*9c5db199SXin Li
95*9c5db199SXin Li                # Start the main SSH connection in the background.
96*9c5db199SXin Li                main_cmd = _MAIN_SSH_COMMAND_TEMPLATE % {
97*9c5db199SXin Li                        'hostname': self._hostname,
98*9c5db199SXin Li                        'user': self._user,
99*9c5db199SXin Li                        'port': "-p %s" % self._port if self._port else "",
100*9c5db199SXin Li                        'socket': self._socket_path,
101*9c5db199SXin Li                }
102*9c5db199SXin Li                logging.info(
103*9c5db199SXin Li                    'Starting main-ssh connection \'%s\'', main_cmd)
104*9c5db199SXin Li                self._main_job = utils.BgJob(
105*9c5db199SXin Li                    main_cmd, nickname='main-ssh',
106*9c5db199SXin Li                    stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL,
107*9c5db199SXin Li                    unjoinable=True)
108*9c5db199SXin Li
109*9c5db199SXin Li                # To prevent a race between the main ssh connection
110*9c5db199SXin Li                # startup and its first attempted use, wait for socket file to
111*9c5db199SXin Li                # exist before returning.
112*9c5db199SXin Li                try:
113*9c5db199SXin Li                    utils.poll_for_condition(
114*9c5db199SXin Li                            condition=lambda: os.path.exists(self._socket_path),
115*9c5db199SXin Li                            timeout=timeout,
116*9c5db199SXin Li                            sleep_interval=0.2,
117*9c5db199SXin Li                            desc='main-ssh connection up')
118*9c5db199SXin Li                except utils.TimeoutError:
119*9c5db199SXin Li                    # poll_for_conditional already logs an error upon timeout
120*9c5db199SXin Li                    pass
121*9c5db199SXin Li
122*9c5db199SXin Li
123*9c5db199SXin Li    def close(self):
124*9c5db199SXin Li        """Releases all resources used by multiplexed ssh connection."""
125*9c5db199SXin Li        with self._lock:
126*9c5db199SXin Li            self._close_internal()
127*9c5db199SXin Li
128*9c5db199SXin Li    def _close_internal(self):
129*9c5db199SXin Li        # Assume that when this is called, _lock should be acquired, already.
130*9c5db199SXin Li        if self._main_job:
131*9c5db199SXin Li            logging.debug('Nuking ssh main_job')
132*9c5db199SXin Li            utils.nuke_subprocess(self._main_job.sp)
133*9c5db199SXin Li            self._main_job = None
134*9c5db199SXin Li
135*9c5db199SXin Li        if self._main_tempdir:
136*9c5db199SXin Li            logging.debug('Cleaning ssh main_tempdir')
137*9c5db199SXin Li            self._main_tempdir.clean()
138*9c5db199SXin Li            self._main_tempdir = None
139*9c5db199SXin Li
140*9c5db199SXin Li
141*9c5db199SXin Liclass ConnectionPool(object):
142*9c5db199SXin Li    """Holds SSH multiplex connection instance."""
143*9c5db199SXin Li
144*9c5db199SXin Li    def __init__(self):
145*9c5db199SXin Li        self._pool = {}
146*9c5db199SXin Li        self._lock = threading.Lock()
147*9c5db199SXin Li
148*9c5db199SXin Li    def get(self, hostname, user, port):
149*9c5db199SXin Li        """Returns MainSsh instance for the given endpoint.
150*9c5db199SXin Li
151*9c5db199SXin Li        If the pool holds the instance already, returns it. If not, create the
152*9c5db199SXin Li        instance, and returns it.
153*9c5db199SXin Li
154*9c5db199SXin Li        Caller has the responsibility to call maybe_start() before using it.
155*9c5db199SXin Li
156*9c5db199SXin Li        @param hostname: Host name of the endpoint.
157*9c5db199SXin Li        @param user: User name to log in.
158*9c5db199SXin Li        @param port: Port number sshd is listening.
159*9c5db199SXin Li        """
160*9c5db199SXin Li        key = (hostname, user, port)
161*9c5db199SXin Li        logging.debug('Get main ssh connection for %s@%s%s', user, hostname,
162*9c5db199SXin Li                      ":%s" % port if port else "")
163*9c5db199SXin Li
164*9c5db199SXin Li        with self._lock:
165*9c5db199SXin Li            conn = self._pool.get(key)
166*9c5db199SXin Li            if not conn:
167*9c5db199SXin Li                conn = MainSsh(hostname, user, port)
168*9c5db199SXin Li                self._pool[key] = conn
169*9c5db199SXin Li            return conn
170*9c5db199SXin Li
171*9c5db199SXin Li    def shutdown(self):
172*9c5db199SXin Li        """Closes all ssh multiplex connections."""
173*9c5db199SXin Li        for ssh in six.itervalues(self._pool):
174*9c5db199SXin Li            ssh.close()
175*9c5db199SXin Li
176*9c5db199SXin Li
177*9c5db199SXin Lidef _short_tmpdir():
178*9c5db199SXin Li    # crbug/865171 Unix domain socket paths are limited to 108 characters.
179*9c5db199SXin Li    # crbug/945523 Swarming does not like too many top-level directories in
180*9c5db199SXin Li    # /tmp.
181*9c5db199SXin Li    # So use a shared parent directory in /tmp
182*9c5db199SXin Li    user = os.environ.get("USER", "no_USER")[:8]
183*9c5db199SXin Li    d = '/tmp/ssh-main_%s' % user
184*9c5db199SXin Li    if not os.path.exists(d):
185*9c5db199SXin Li        os.mkdir(d)
186*9c5db199SXin Li    return d
187