1*9c5db199SXin Li# Lint as: python2, python3 2*9c5db199SXin Li# Copyright 2017 The Chromium OS Authors. All rights reserved. 3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be 4*9c5db199SXin Li# found in the LICENSE file. 5*9c5db199SXin Li 6*9c5db199SXin Lifrom __future__ import absolute_import 7*9c5db199SXin Lifrom __future__ import division 8*9c5db199SXin Lifrom __future__ import print_function 9*9c5db199SXin Li 10*9c5db199SXin Liimport logging 11*9c5db199SXin Liimport multiprocessing 12*9c5db199SXin Liimport os 13*9c5db199SXin Liimport threading 14*9c5db199SXin Li 15*9c5db199SXin Lifrom autotest_lib.client.common_lib import autotemp 16*9c5db199SXin Lifrom autotest_lib.server import utils 17*9c5db199SXin Liimport six 18*9c5db199SXin Li 19*9c5db199SXin Li# TODO b:169251326 terms below are set outside of this codebase 20*9c5db199SXin Li# and should be updated when possible. ("master" -> "main") 21*9c5db199SXin Li_MAIN_SSH_COMMAND_TEMPLATE = ( 22*9c5db199SXin Li '/usr/bin/ssh -a -x -N ' 23*9c5db199SXin Li '-o ControlMaster=yes ' # Create multiplex socket. # nocheck 24*9c5db199SXin Li '-o ControlPath=%(socket)s ' 25*9c5db199SXin Li '-o StrictHostKeyChecking=no ' 26*9c5db199SXin Li '-o UserKnownHostsFile=/dev/null ' 27*9c5db199SXin Li '-o BatchMode=yes ' 28*9c5db199SXin Li '-o ConnectTimeout=30 ' 29*9c5db199SXin Li '-o ServerAliveInterval=30 ' 30*9c5db199SXin Li '-o ServerAliveCountMax=1 ' 31*9c5db199SXin Li '-o ConnectionAttempts=1 ' 32*9c5db199SXin Li '-o Protocol=2 ' 33*9c5db199SXin Li '-l %(user)s %(port)s %(hostname)s') 34*9c5db199SXin Li 35*9c5db199SXin Li 36*9c5db199SXin Liclass MainSsh(object): 37*9c5db199SXin Li """Manages multiplex ssh connection.""" 38*9c5db199SXin Li 39*9c5db199SXin Li def __init__(self, hostname, user, port): 40*9c5db199SXin Li self._hostname = hostname 41*9c5db199SXin Li self._user = user 42*9c5db199SXin Li self._port = port 43*9c5db199SXin Li 44*9c5db199SXin Li self._main_job = None 45*9c5db199SXin Li self._main_tempdir = None 46*9c5db199SXin Li 47*9c5db199SXin Li self._lock = multiprocessing.Lock() 48*9c5db199SXin Li 49*9c5db199SXin Li def __del__(self): 50*9c5db199SXin Li self.close() 51*9c5db199SXin Li 52*9c5db199SXin Li @property 53*9c5db199SXin Li def _socket_path(self): 54*9c5db199SXin Li return os.path.join(self._main_tempdir.name, 'socket') 55*9c5db199SXin Li 56*9c5db199SXin Li @property 57*9c5db199SXin Li def ssh_option(self): 58*9c5db199SXin Li """Returns the ssh option to use this multiplexed ssh. 59*9c5db199SXin Li 60*9c5db199SXin Li If background process is not running, returns an empty string. 61*9c5db199SXin Li """ 62*9c5db199SXin Li if not self._main_tempdir: 63*9c5db199SXin Li return '' 64*9c5db199SXin Li return '-o ControlPath=%s' % (self._socket_path,) 65*9c5db199SXin Li 66*9c5db199SXin Li def maybe_start(self, timeout=5): 67*9c5db199SXin Li """Starts the background process to run multiplex ssh connection. 68*9c5db199SXin Li 69*9c5db199SXin Li If there already is a background process running, this does nothing. 70*9c5db199SXin Li If there is a stale process or a stale socket, first clean them up, 71*9c5db199SXin Li then create a background process. 72*9c5db199SXin Li 73*9c5db199SXin Li @param timeout: timeout in seconds (default 5) to wait for main ssh 74*9c5db199SXin Li connection to be established. If timeout is reached, a 75*9c5db199SXin Li warning message is logged, but no other action is 76*9c5db199SXin Li taken. 77*9c5db199SXin Li """ 78*9c5db199SXin Li # Multiple processes might try in parallel to clean up the old main 79*9c5db199SXin Li # ssh connection and create a new one, therefore use a lock to protect 80*9c5db199SXin Li # against race conditions. 81*9c5db199SXin Li with self._lock: 82*9c5db199SXin Li # If a previously started main SSH connection is not running 83*9c5db199SXin Li # anymore, it needs to be cleaned up and then restarted. 84*9c5db199SXin Li if (self._main_job and (not os.path.exists(self._socket_path) or 85*9c5db199SXin Li self._main_job.sp.poll() is not None)): 86*9c5db199SXin Li logging.info( 87*9c5db199SXin Li 'Main-ssh connection to %s is down.', self._hostname) 88*9c5db199SXin Li self._close_internal() 89*9c5db199SXin Li 90*9c5db199SXin Li # Start a new main SSH connection. 91*9c5db199SXin Li if not self._main_job: 92*9c5db199SXin Li # Create a shared socket in a temp location. 93*9c5db199SXin Li self._main_tempdir = autotemp.tempdir(dir=_short_tmpdir()) 94*9c5db199SXin Li 95*9c5db199SXin Li # Start the main SSH connection in the background. 96*9c5db199SXin Li main_cmd = _MAIN_SSH_COMMAND_TEMPLATE % { 97*9c5db199SXin Li 'hostname': self._hostname, 98*9c5db199SXin Li 'user': self._user, 99*9c5db199SXin Li 'port': "-p %s" % self._port if self._port else "", 100*9c5db199SXin Li 'socket': self._socket_path, 101*9c5db199SXin Li } 102*9c5db199SXin Li logging.info( 103*9c5db199SXin Li 'Starting main-ssh connection \'%s\'', main_cmd) 104*9c5db199SXin Li self._main_job = utils.BgJob( 105*9c5db199SXin Li main_cmd, nickname='main-ssh', 106*9c5db199SXin Li stdout_tee=utils.DEVNULL, stderr_tee=utils.DEVNULL, 107*9c5db199SXin Li unjoinable=True) 108*9c5db199SXin Li 109*9c5db199SXin Li # To prevent a race between the main ssh connection 110*9c5db199SXin Li # startup and its first attempted use, wait for socket file to 111*9c5db199SXin Li # exist before returning. 112*9c5db199SXin Li try: 113*9c5db199SXin Li utils.poll_for_condition( 114*9c5db199SXin Li condition=lambda: os.path.exists(self._socket_path), 115*9c5db199SXin Li timeout=timeout, 116*9c5db199SXin Li sleep_interval=0.2, 117*9c5db199SXin Li desc='main-ssh connection up') 118*9c5db199SXin Li except utils.TimeoutError: 119*9c5db199SXin Li # poll_for_conditional already logs an error upon timeout 120*9c5db199SXin Li pass 121*9c5db199SXin Li 122*9c5db199SXin Li 123*9c5db199SXin Li def close(self): 124*9c5db199SXin Li """Releases all resources used by multiplexed ssh connection.""" 125*9c5db199SXin Li with self._lock: 126*9c5db199SXin Li self._close_internal() 127*9c5db199SXin Li 128*9c5db199SXin Li def _close_internal(self): 129*9c5db199SXin Li # Assume that when this is called, _lock should be acquired, already. 130*9c5db199SXin Li if self._main_job: 131*9c5db199SXin Li logging.debug('Nuking ssh main_job') 132*9c5db199SXin Li utils.nuke_subprocess(self._main_job.sp) 133*9c5db199SXin Li self._main_job = None 134*9c5db199SXin Li 135*9c5db199SXin Li if self._main_tempdir: 136*9c5db199SXin Li logging.debug('Cleaning ssh main_tempdir') 137*9c5db199SXin Li self._main_tempdir.clean() 138*9c5db199SXin Li self._main_tempdir = None 139*9c5db199SXin Li 140*9c5db199SXin Li 141*9c5db199SXin Liclass ConnectionPool(object): 142*9c5db199SXin Li """Holds SSH multiplex connection instance.""" 143*9c5db199SXin Li 144*9c5db199SXin Li def __init__(self): 145*9c5db199SXin Li self._pool = {} 146*9c5db199SXin Li self._lock = threading.Lock() 147*9c5db199SXin Li 148*9c5db199SXin Li def get(self, hostname, user, port): 149*9c5db199SXin Li """Returns MainSsh instance for the given endpoint. 150*9c5db199SXin Li 151*9c5db199SXin Li If the pool holds the instance already, returns it. If not, create the 152*9c5db199SXin Li instance, and returns it. 153*9c5db199SXin Li 154*9c5db199SXin Li Caller has the responsibility to call maybe_start() before using it. 155*9c5db199SXin Li 156*9c5db199SXin Li @param hostname: Host name of the endpoint. 157*9c5db199SXin Li @param user: User name to log in. 158*9c5db199SXin Li @param port: Port number sshd is listening. 159*9c5db199SXin Li """ 160*9c5db199SXin Li key = (hostname, user, port) 161*9c5db199SXin Li logging.debug('Get main ssh connection for %s@%s%s', user, hostname, 162*9c5db199SXin Li ":%s" % port if port else "") 163*9c5db199SXin Li 164*9c5db199SXin Li with self._lock: 165*9c5db199SXin Li conn = self._pool.get(key) 166*9c5db199SXin Li if not conn: 167*9c5db199SXin Li conn = MainSsh(hostname, user, port) 168*9c5db199SXin Li self._pool[key] = conn 169*9c5db199SXin Li return conn 170*9c5db199SXin Li 171*9c5db199SXin Li def shutdown(self): 172*9c5db199SXin Li """Closes all ssh multiplex connections.""" 173*9c5db199SXin Li for ssh in six.itervalues(self._pool): 174*9c5db199SXin Li ssh.close() 175*9c5db199SXin Li 176*9c5db199SXin Li 177*9c5db199SXin Lidef _short_tmpdir(): 178*9c5db199SXin Li # crbug/865171 Unix domain socket paths are limited to 108 characters. 179*9c5db199SXin Li # crbug/945523 Swarming does not like too many top-level directories in 180*9c5db199SXin Li # /tmp. 181*9c5db199SXin Li # So use a shared parent directory in /tmp 182*9c5db199SXin Li user = os.environ.get("USER", "no_USER")[:8] 183*9c5db199SXin Li d = '/tmp/ssh-main_%s' % user 184*9c5db199SXin Li if not os.path.exists(d): 185*9c5db199SXin Li os.mkdir(d) 186*9c5db199SXin Li return d 187