1*cda5da8dSAndroid Build Coastguard Worker# 2*cda5da8dSAndroid Build Coastguard Worker# A higher level module for using sockets (or Windows named pipes) 3*cda5da8dSAndroid Build Coastguard Worker# 4*cda5da8dSAndroid Build Coastguard Worker# multiprocessing/connection.py 5*cda5da8dSAndroid Build Coastguard Worker# 6*cda5da8dSAndroid Build Coastguard Worker# Copyright (c) 2006-2008, R Oudkerk 7*cda5da8dSAndroid Build Coastguard Worker# Licensed to PSF under a Contributor Agreement. 8*cda5da8dSAndroid Build Coastguard Worker# 9*cda5da8dSAndroid Build Coastguard Worker 10*cda5da8dSAndroid Build Coastguard Worker__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] 11*cda5da8dSAndroid Build Coastguard Worker 12*cda5da8dSAndroid Build Coastguard Workerimport io 13*cda5da8dSAndroid Build Coastguard Workerimport os 14*cda5da8dSAndroid Build Coastguard Workerimport sys 15*cda5da8dSAndroid Build Coastguard Workerimport socket 16*cda5da8dSAndroid Build Coastguard Workerimport struct 17*cda5da8dSAndroid Build Coastguard Workerimport time 18*cda5da8dSAndroid Build Coastguard Workerimport tempfile 19*cda5da8dSAndroid Build Coastguard Workerimport itertools 20*cda5da8dSAndroid Build Coastguard Worker 21*cda5da8dSAndroid Build Coastguard Workerimport _multiprocessing 22*cda5da8dSAndroid Build Coastguard Worker 23*cda5da8dSAndroid Build Coastguard Workerfrom . import util 24*cda5da8dSAndroid Build Coastguard Worker 25*cda5da8dSAndroid Build Coastguard Workerfrom . import AuthenticationError, BufferTooShort 26*cda5da8dSAndroid Build Coastguard Workerfrom .context import reduction 27*cda5da8dSAndroid Build Coastguard Worker_ForkingPickler = reduction.ForkingPickler 28*cda5da8dSAndroid Build Coastguard Worker 29*cda5da8dSAndroid Build Coastguard Workertry: 30*cda5da8dSAndroid Build Coastguard Worker import _winapi 31*cda5da8dSAndroid Build Coastguard Worker from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE 32*cda5da8dSAndroid Build Coastguard Workerexcept ImportError: 33*cda5da8dSAndroid Build Coastguard Worker if sys.platform == 'win32': 34*cda5da8dSAndroid Build Coastguard Worker raise 35*cda5da8dSAndroid Build Coastguard Worker _winapi = None 36*cda5da8dSAndroid Build Coastguard Worker 37*cda5da8dSAndroid Build Coastguard Worker# 38*cda5da8dSAndroid Build Coastguard Worker# 39*cda5da8dSAndroid Build Coastguard Worker# 40*cda5da8dSAndroid Build Coastguard Worker 41*cda5da8dSAndroid Build Coastguard WorkerBUFSIZE = 8192 42*cda5da8dSAndroid Build Coastguard Worker# A very generous timeout when it comes to local connections... 43*cda5da8dSAndroid Build Coastguard WorkerCONNECTION_TIMEOUT = 20. 44*cda5da8dSAndroid Build Coastguard Worker 45*cda5da8dSAndroid Build Coastguard Worker_mmap_counter = itertools.count() 46*cda5da8dSAndroid Build Coastguard Worker 47*cda5da8dSAndroid Build Coastguard Workerdefault_family = 'AF_INET' 48*cda5da8dSAndroid Build Coastguard Workerfamilies = ['AF_INET'] 49*cda5da8dSAndroid Build Coastguard Worker 50*cda5da8dSAndroid Build Coastguard Workerif hasattr(socket, 'AF_UNIX'): 51*cda5da8dSAndroid Build Coastguard Worker default_family = 'AF_UNIX' 52*cda5da8dSAndroid Build Coastguard Worker families += ['AF_UNIX'] 53*cda5da8dSAndroid Build Coastguard Worker 54*cda5da8dSAndroid Build Coastguard Workerif sys.platform == 'win32': 55*cda5da8dSAndroid Build Coastguard Worker default_family = 'AF_PIPE' 56*cda5da8dSAndroid Build Coastguard Worker families += ['AF_PIPE'] 57*cda5da8dSAndroid Build Coastguard Worker 58*cda5da8dSAndroid Build Coastguard Worker 59*cda5da8dSAndroid Build Coastguard Workerdef _init_timeout(timeout=CONNECTION_TIMEOUT): 60*cda5da8dSAndroid Build Coastguard Worker return time.monotonic() + timeout 61*cda5da8dSAndroid Build Coastguard Worker 62*cda5da8dSAndroid Build Coastguard Workerdef _check_timeout(t): 63*cda5da8dSAndroid Build Coastguard Worker return time.monotonic() > t 64*cda5da8dSAndroid Build Coastguard Worker 65*cda5da8dSAndroid Build Coastguard Worker# 66*cda5da8dSAndroid Build Coastguard Worker# 67*cda5da8dSAndroid Build Coastguard Worker# 68*cda5da8dSAndroid Build Coastguard Worker 69*cda5da8dSAndroid Build Coastguard Workerdef arbitrary_address(family): 70*cda5da8dSAndroid Build Coastguard Worker ''' 71*cda5da8dSAndroid Build Coastguard Worker Return an arbitrary free address for the given family 72*cda5da8dSAndroid Build Coastguard Worker ''' 73*cda5da8dSAndroid Build Coastguard Worker if family == 'AF_INET': 74*cda5da8dSAndroid Build Coastguard Worker return ('localhost', 0) 75*cda5da8dSAndroid Build Coastguard Worker elif family == 'AF_UNIX': 76*cda5da8dSAndroid Build Coastguard Worker return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) 77*cda5da8dSAndroid Build Coastguard Worker elif family == 'AF_PIPE': 78*cda5da8dSAndroid Build Coastguard Worker return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % 79*cda5da8dSAndroid Build Coastguard Worker (os.getpid(), next(_mmap_counter)), dir="") 80*cda5da8dSAndroid Build Coastguard Worker else: 81*cda5da8dSAndroid Build Coastguard Worker raise ValueError('unrecognized family') 82*cda5da8dSAndroid Build Coastguard Worker 83*cda5da8dSAndroid Build Coastguard Workerdef _validate_family(family): 84*cda5da8dSAndroid Build Coastguard Worker ''' 85*cda5da8dSAndroid Build Coastguard Worker Checks if the family is valid for the current environment. 86*cda5da8dSAndroid Build Coastguard Worker ''' 87*cda5da8dSAndroid Build Coastguard Worker if sys.platform != 'win32' and family == 'AF_PIPE': 88*cda5da8dSAndroid Build Coastguard Worker raise ValueError('Family %s is not recognized.' % family) 89*cda5da8dSAndroid Build Coastguard Worker 90*cda5da8dSAndroid Build Coastguard Worker if sys.platform == 'win32' and family == 'AF_UNIX': 91*cda5da8dSAndroid Build Coastguard Worker # double check 92*cda5da8dSAndroid Build Coastguard Worker if not hasattr(socket, family): 93*cda5da8dSAndroid Build Coastguard Worker raise ValueError('Family %s is not recognized.' % family) 94*cda5da8dSAndroid Build Coastguard Worker 95*cda5da8dSAndroid Build Coastguard Workerdef address_type(address): 96*cda5da8dSAndroid Build Coastguard Worker ''' 97*cda5da8dSAndroid Build Coastguard Worker Return the types of the address 98*cda5da8dSAndroid Build Coastguard Worker 99*cda5da8dSAndroid Build Coastguard Worker This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' 100*cda5da8dSAndroid Build Coastguard Worker ''' 101*cda5da8dSAndroid Build Coastguard Worker if type(address) == tuple: 102*cda5da8dSAndroid Build Coastguard Worker return 'AF_INET' 103*cda5da8dSAndroid Build Coastguard Worker elif type(address) is str and address.startswith('\\\\'): 104*cda5da8dSAndroid Build Coastguard Worker return 'AF_PIPE' 105*cda5da8dSAndroid Build Coastguard Worker elif type(address) is str or util.is_abstract_socket_namespace(address): 106*cda5da8dSAndroid Build Coastguard Worker return 'AF_UNIX' 107*cda5da8dSAndroid Build Coastguard Worker else: 108*cda5da8dSAndroid Build Coastguard Worker raise ValueError('address type of %r unrecognized' % address) 109*cda5da8dSAndroid Build Coastguard Worker 110*cda5da8dSAndroid Build Coastguard Worker# 111*cda5da8dSAndroid Build Coastguard Worker# Connection classes 112*cda5da8dSAndroid Build Coastguard Worker# 113*cda5da8dSAndroid Build Coastguard Worker 114*cda5da8dSAndroid Build Coastguard Workerclass _ConnectionBase: 115*cda5da8dSAndroid Build Coastguard Worker _handle = None 116*cda5da8dSAndroid Build Coastguard Worker 117*cda5da8dSAndroid Build Coastguard Worker def __init__(self, handle, readable=True, writable=True): 118*cda5da8dSAndroid Build Coastguard Worker handle = handle.__index__() 119*cda5da8dSAndroid Build Coastguard Worker if handle < 0: 120*cda5da8dSAndroid Build Coastguard Worker raise ValueError("invalid handle") 121*cda5da8dSAndroid Build Coastguard Worker if not readable and not writable: 122*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 123*cda5da8dSAndroid Build Coastguard Worker "at least one of `readable` and `writable` must be True") 124*cda5da8dSAndroid Build Coastguard Worker self._handle = handle 125*cda5da8dSAndroid Build Coastguard Worker self._readable = readable 126*cda5da8dSAndroid Build Coastguard Worker self._writable = writable 127*cda5da8dSAndroid Build Coastguard Worker 128*cda5da8dSAndroid Build Coastguard Worker # XXX should we use util.Finalize instead of a __del__? 129*cda5da8dSAndroid Build Coastguard Worker 130*cda5da8dSAndroid Build Coastguard Worker def __del__(self): 131*cda5da8dSAndroid Build Coastguard Worker if self._handle is not None: 132*cda5da8dSAndroid Build Coastguard Worker self._close() 133*cda5da8dSAndroid Build Coastguard Worker 134*cda5da8dSAndroid Build Coastguard Worker def _check_closed(self): 135*cda5da8dSAndroid Build Coastguard Worker if self._handle is None: 136*cda5da8dSAndroid Build Coastguard Worker raise OSError("handle is closed") 137*cda5da8dSAndroid Build Coastguard Worker 138*cda5da8dSAndroid Build Coastguard Worker def _check_readable(self): 139*cda5da8dSAndroid Build Coastguard Worker if not self._readable: 140*cda5da8dSAndroid Build Coastguard Worker raise OSError("connection is write-only") 141*cda5da8dSAndroid Build Coastguard Worker 142*cda5da8dSAndroid Build Coastguard Worker def _check_writable(self): 143*cda5da8dSAndroid Build Coastguard Worker if not self._writable: 144*cda5da8dSAndroid Build Coastguard Worker raise OSError("connection is read-only") 145*cda5da8dSAndroid Build Coastguard Worker 146*cda5da8dSAndroid Build Coastguard Worker def _bad_message_length(self): 147*cda5da8dSAndroid Build Coastguard Worker if self._writable: 148*cda5da8dSAndroid Build Coastguard Worker self._readable = False 149*cda5da8dSAndroid Build Coastguard Worker else: 150*cda5da8dSAndroid Build Coastguard Worker self.close() 151*cda5da8dSAndroid Build Coastguard Worker raise OSError("bad message length") 152*cda5da8dSAndroid Build Coastguard Worker 153*cda5da8dSAndroid Build Coastguard Worker @property 154*cda5da8dSAndroid Build Coastguard Worker def closed(self): 155*cda5da8dSAndroid Build Coastguard Worker """True if the connection is closed""" 156*cda5da8dSAndroid Build Coastguard Worker return self._handle is None 157*cda5da8dSAndroid Build Coastguard Worker 158*cda5da8dSAndroid Build Coastguard Worker @property 159*cda5da8dSAndroid Build Coastguard Worker def readable(self): 160*cda5da8dSAndroid Build Coastguard Worker """True if the connection is readable""" 161*cda5da8dSAndroid Build Coastguard Worker return self._readable 162*cda5da8dSAndroid Build Coastguard Worker 163*cda5da8dSAndroid Build Coastguard Worker @property 164*cda5da8dSAndroid Build Coastguard Worker def writable(self): 165*cda5da8dSAndroid Build Coastguard Worker """True if the connection is writable""" 166*cda5da8dSAndroid Build Coastguard Worker return self._writable 167*cda5da8dSAndroid Build Coastguard Worker 168*cda5da8dSAndroid Build Coastguard Worker def fileno(self): 169*cda5da8dSAndroid Build Coastguard Worker """File descriptor or handle of the connection""" 170*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 171*cda5da8dSAndroid Build Coastguard Worker return self._handle 172*cda5da8dSAndroid Build Coastguard Worker 173*cda5da8dSAndroid Build Coastguard Worker def close(self): 174*cda5da8dSAndroid Build Coastguard Worker """Close the connection""" 175*cda5da8dSAndroid Build Coastguard Worker if self._handle is not None: 176*cda5da8dSAndroid Build Coastguard Worker try: 177*cda5da8dSAndroid Build Coastguard Worker self._close() 178*cda5da8dSAndroid Build Coastguard Worker finally: 179*cda5da8dSAndroid Build Coastguard Worker self._handle = None 180*cda5da8dSAndroid Build Coastguard Worker 181*cda5da8dSAndroid Build Coastguard Worker def send_bytes(self, buf, offset=0, size=None): 182*cda5da8dSAndroid Build Coastguard Worker """Send the bytes data from a bytes-like object""" 183*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 184*cda5da8dSAndroid Build Coastguard Worker self._check_writable() 185*cda5da8dSAndroid Build Coastguard Worker m = memoryview(buf) 186*cda5da8dSAndroid Build Coastguard Worker if m.itemsize > 1: 187*cda5da8dSAndroid Build Coastguard Worker m = m.cast('B') 188*cda5da8dSAndroid Build Coastguard Worker n = m.nbytes 189*cda5da8dSAndroid Build Coastguard Worker if offset < 0: 190*cda5da8dSAndroid Build Coastguard Worker raise ValueError("offset is negative") 191*cda5da8dSAndroid Build Coastguard Worker if n < offset: 192*cda5da8dSAndroid Build Coastguard Worker raise ValueError("buffer length < offset") 193*cda5da8dSAndroid Build Coastguard Worker if size is None: 194*cda5da8dSAndroid Build Coastguard Worker size = n - offset 195*cda5da8dSAndroid Build Coastguard Worker elif size < 0: 196*cda5da8dSAndroid Build Coastguard Worker raise ValueError("size is negative") 197*cda5da8dSAndroid Build Coastguard Worker elif offset + size > n: 198*cda5da8dSAndroid Build Coastguard Worker raise ValueError("buffer length < offset + size") 199*cda5da8dSAndroid Build Coastguard Worker self._send_bytes(m[offset:offset + size]) 200*cda5da8dSAndroid Build Coastguard Worker 201*cda5da8dSAndroid Build Coastguard Worker def send(self, obj): 202*cda5da8dSAndroid Build Coastguard Worker """Send a (picklable) object""" 203*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 204*cda5da8dSAndroid Build Coastguard Worker self._check_writable() 205*cda5da8dSAndroid Build Coastguard Worker self._send_bytes(_ForkingPickler.dumps(obj)) 206*cda5da8dSAndroid Build Coastguard Worker 207*cda5da8dSAndroid Build Coastguard Worker def recv_bytes(self, maxlength=None): 208*cda5da8dSAndroid Build Coastguard Worker """ 209*cda5da8dSAndroid Build Coastguard Worker Receive bytes data as a bytes object. 210*cda5da8dSAndroid Build Coastguard Worker """ 211*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 212*cda5da8dSAndroid Build Coastguard Worker self._check_readable() 213*cda5da8dSAndroid Build Coastguard Worker if maxlength is not None and maxlength < 0: 214*cda5da8dSAndroid Build Coastguard Worker raise ValueError("negative maxlength") 215*cda5da8dSAndroid Build Coastguard Worker buf = self._recv_bytes(maxlength) 216*cda5da8dSAndroid Build Coastguard Worker if buf is None: 217*cda5da8dSAndroid Build Coastguard Worker self._bad_message_length() 218*cda5da8dSAndroid Build Coastguard Worker return buf.getvalue() 219*cda5da8dSAndroid Build Coastguard Worker 220*cda5da8dSAndroid Build Coastguard Worker def recv_bytes_into(self, buf, offset=0): 221*cda5da8dSAndroid Build Coastguard Worker """ 222*cda5da8dSAndroid Build Coastguard Worker Receive bytes data into a writeable bytes-like object. 223*cda5da8dSAndroid Build Coastguard Worker Return the number of bytes read. 224*cda5da8dSAndroid Build Coastguard Worker """ 225*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 226*cda5da8dSAndroid Build Coastguard Worker self._check_readable() 227*cda5da8dSAndroid Build Coastguard Worker with memoryview(buf) as m: 228*cda5da8dSAndroid Build Coastguard Worker # Get bytesize of arbitrary buffer 229*cda5da8dSAndroid Build Coastguard Worker itemsize = m.itemsize 230*cda5da8dSAndroid Build Coastguard Worker bytesize = itemsize * len(m) 231*cda5da8dSAndroid Build Coastguard Worker if offset < 0: 232*cda5da8dSAndroid Build Coastguard Worker raise ValueError("negative offset") 233*cda5da8dSAndroid Build Coastguard Worker elif offset > bytesize: 234*cda5da8dSAndroid Build Coastguard Worker raise ValueError("offset too large") 235*cda5da8dSAndroid Build Coastguard Worker result = self._recv_bytes() 236*cda5da8dSAndroid Build Coastguard Worker size = result.tell() 237*cda5da8dSAndroid Build Coastguard Worker if bytesize < offset + size: 238*cda5da8dSAndroid Build Coastguard Worker raise BufferTooShort(result.getvalue()) 239*cda5da8dSAndroid Build Coastguard Worker # Message can fit in dest 240*cda5da8dSAndroid Build Coastguard Worker result.seek(0) 241*cda5da8dSAndroid Build Coastguard Worker result.readinto(m[offset // itemsize : 242*cda5da8dSAndroid Build Coastguard Worker (offset + size) // itemsize]) 243*cda5da8dSAndroid Build Coastguard Worker return size 244*cda5da8dSAndroid Build Coastguard Worker 245*cda5da8dSAndroid Build Coastguard Worker def recv(self): 246*cda5da8dSAndroid Build Coastguard Worker """Receive a (picklable) object""" 247*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 248*cda5da8dSAndroid Build Coastguard Worker self._check_readable() 249*cda5da8dSAndroid Build Coastguard Worker buf = self._recv_bytes() 250*cda5da8dSAndroid Build Coastguard Worker return _ForkingPickler.loads(buf.getbuffer()) 251*cda5da8dSAndroid Build Coastguard Worker 252*cda5da8dSAndroid Build Coastguard Worker def poll(self, timeout=0.0): 253*cda5da8dSAndroid Build Coastguard Worker """Whether there is any input available to be read""" 254*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 255*cda5da8dSAndroid Build Coastguard Worker self._check_readable() 256*cda5da8dSAndroid Build Coastguard Worker return self._poll(timeout) 257*cda5da8dSAndroid Build Coastguard Worker 258*cda5da8dSAndroid Build Coastguard Worker def __enter__(self): 259*cda5da8dSAndroid Build Coastguard Worker return self 260*cda5da8dSAndroid Build Coastguard Worker 261*cda5da8dSAndroid Build Coastguard Worker def __exit__(self, exc_type, exc_value, exc_tb): 262*cda5da8dSAndroid Build Coastguard Worker self.close() 263*cda5da8dSAndroid Build Coastguard Worker 264*cda5da8dSAndroid Build Coastguard Worker 265*cda5da8dSAndroid Build Coastguard Workerif _winapi: 266*cda5da8dSAndroid Build Coastguard Worker 267*cda5da8dSAndroid Build Coastguard Worker class PipeConnection(_ConnectionBase): 268*cda5da8dSAndroid Build Coastguard Worker """ 269*cda5da8dSAndroid Build Coastguard Worker Connection class based on a Windows named pipe. 270*cda5da8dSAndroid Build Coastguard Worker Overlapped I/O is used, so the handles must have been created 271*cda5da8dSAndroid Build Coastguard Worker with FILE_FLAG_OVERLAPPED. 272*cda5da8dSAndroid Build Coastguard Worker """ 273*cda5da8dSAndroid Build Coastguard Worker _got_empty_message = False 274*cda5da8dSAndroid Build Coastguard Worker 275*cda5da8dSAndroid Build Coastguard Worker def _close(self, _CloseHandle=_winapi.CloseHandle): 276*cda5da8dSAndroid Build Coastguard Worker _CloseHandle(self._handle) 277*cda5da8dSAndroid Build Coastguard Worker 278*cda5da8dSAndroid Build Coastguard Worker def _send_bytes(self, buf): 279*cda5da8dSAndroid Build Coastguard Worker ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) 280*cda5da8dSAndroid Build Coastguard Worker try: 281*cda5da8dSAndroid Build Coastguard Worker if err == _winapi.ERROR_IO_PENDING: 282*cda5da8dSAndroid Build Coastguard Worker waitres = _winapi.WaitForMultipleObjects( 283*cda5da8dSAndroid Build Coastguard Worker [ov.event], False, INFINITE) 284*cda5da8dSAndroid Build Coastguard Worker assert waitres == WAIT_OBJECT_0 285*cda5da8dSAndroid Build Coastguard Worker except: 286*cda5da8dSAndroid Build Coastguard Worker ov.cancel() 287*cda5da8dSAndroid Build Coastguard Worker raise 288*cda5da8dSAndroid Build Coastguard Worker finally: 289*cda5da8dSAndroid Build Coastguard Worker nwritten, err = ov.GetOverlappedResult(True) 290*cda5da8dSAndroid Build Coastguard Worker assert err == 0 291*cda5da8dSAndroid Build Coastguard Worker assert nwritten == len(buf) 292*cda5da8dSAndroid Build Coastguard Worker 293*cda5da8dSAndroid Build Coastguard Worker def _recv_bytes(self, maxsize=None): 294*cda5da8dSAndroid Build Coastguard Worker if self._got_empty_message: 295*cda5da8dSAndroid Build Coastguard Worker self._got_empty_message = False 296*cda5da8dSAndroid Build Coastguard Worker return io.BytesIO() 297*cda5da8dSAndroid Build Coastguard Worker else: 298*cda5da8dSAndroid Build Coastguard Worker bsize = 128 if maxsize is None else min(maxsize, 128) 299*cda5da8dSAndroid Build Coastguard Worker try: 300*cda5da8dSAndroid Build Coastguard Worker ov, err = _winapi.ReadFile(self._handle, bsize, 301*cda5da8dSAndroid Build Coastguard Worker overlapped=True) 302*cda5da8dSAndroid Build Coastguard Worker try: 303*cda5da8dSAndroid Build Coastguard Worker if err == _winapi.ERROR_IO_PENDING: 304*cda5da8dSAndroid Build Coastguard Worker waitres = _winapi.WaitForMultipleObjects( 305*cda5da8dSAndroid Build Coastguard Worker [ov.event], False, INFINITE) 306*cda5da8dSAndroid Build Coastguard Worker assert waitres == WAIT_OBJECT_0 307*cda5da8dSAndroid Build Coastguard Worker except: 308*cda5da8dSAndroid Build Coastguard Worker ov.cancel() 309*cda5da8dSAndroid Build Coastguard Worker raise 310*cda5da8dSAndroid Build Coastguard Worker finally: 311*cda5da8dSAndroid Build Coastguard Worker nread, err = ov.GetOverlappedResult(True) 312*cda5da8dSAndroid Build Coastguard Worker if err == 0: 313*cda5da8dSAndroid Build Coastguard Worker f = io.BytesIO() 314*cda5da8dSAndroid Build Coastguard Worker f.write(ov.getbuffer()) 315*cda5da8dSAndroid Build Coastguard Worker return f 316*cda5da8dSAndroid Build Coastguard Worker elif err == _winapi.ERROR_MORE_DATA: 317*cda5da8dSAndroid Build Coastguard Worker return self._get_more_data(ov, maxsize) 318*cda5da8dSAndroid Build Coastguard Worker except OSError as e: 319*cda5da8dSAndroid Build Coastguard Worker if e.winerror == _winapi.ERROR_BROKEN_PIPE: 320*cda5da8dSAndroid Build Coastguard Worker raise EOFError 321*cda5da8dSAndroid Build Coastguard Worker else: 322*cda5da8dSAndroid Build Coastguard Worker raise 323*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") 324*cda5da8dSAndroid Build Coastguard Worker 325*cda5da8dSAndroid Build Coastguard Worker def _poll(self, timeout): 326*cda5da8dSAndroid Build Coastguard Worker if (self._got_empty_message or 327*cda5da8dSAndroid Build Coastguard Worker _winapi.PeekNamedPipe(self._handle)[0] != 0): 328*cda5da8dSAndroid Build Coastguard Worker return True 329*cda5da8dSAndroid Build Coastguard Worker return bool(wait([self], timeout)) 330*cda5da8dSAndroid Build Coastguard Worker 331*cda5da8dSAndroid Build Coastguard Worker def _get_more_data(self, ov, maxsize): 332*cda5da8dSAndroid Build Coastguard Worker buf = ov.getbuffer() 333*cda5da8dSAndroid Build Coastguard Worker f = io.BytesIO() 334*cda5da8dSAndroid Build Coastguard Worker f.write(buf) 335*cda5da8dSAndroid Build Coastguard Worker left = _winapi.PeekNamedPipe(self._handle)[1] 336*cda5da8dSAndroid Build Coastguard Worker assert left > 0 337*cda5da8dSAndroid Build Coastguard Worker if maxsize is not None and len(buf) + left > maxsize: 338*cda5da8dSAndroid Build Coastguard Worker self._bad_message_length() 339*cda5da8dSAndroid Build Coastguard Worker ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) 340*cda5da8dSAndroid Build Coastguard Worker rbytes, err = ov.GetOverlappedResult(True) 341*cda5da8dSAndroid Build Coastguard Worker assert err == 0 342*cda5da8dSAndroid Build Coastguard Worker assert rbytes == left 343*cda5da8dSAndroid Build Coastguard Worker f.write(ov.getbuffer()) 344*cda5da8dSAndroid Build Coastguard Worker return f 345*cda5da8dSAndroid Build Coastguard Worker 346*cda5da8dSAndroid Build Coastguard Worker 347*cda5da8dSAndroid Build Coastguard Workerclass Connection(_ConnectionBase): 348*cda5da8dSAndroid Build Coastguard Worker """ 349*cda5da8dSAndroid Build Coastguard Worker Connection class based on an arbitrary file descriptor (Unix only), or 350*cda5da8dSAndroid Build Coastguard Worker a socket handle (Windows). 351*cda5da8dSAndroid Build Coastguard Worker """ 352*cda5da8dSAndroid Build Coastguard Worker 353*cda5da8dSAndroid Build Coastguard Worker if _winapi: 354*cda5da8dSAndroid Build Coastguard Worker def _close(self, _close=_multiprocessing.closesocket): 355*cda5da8dSAndroid Build Coastguard Worker _close(self._handle) 356*cda5da8dSAndroid Build Coastguard Worker _write = _multiprocessing.send 357*cda5da8dSAndroid Build Coastguard Worker _read = _multiprocessing.recv 358*cda5da8dSAndroid Build Coastguard Worker else: 359*cda5da8dSAndroid Build Coastguard Worker def _close(self, _close=os.close): 360*cda5da8dSAndroid Build Coastguard Worker _close(self._handle) 361*cda5da8dSAndroid Build Coastguard Worker _write = os.write 362*cda5da8dSAndroid Build Coastguard Worker _read = os.read 363*cda5da8dSAndroid Build Coastguard Worker 364*cda5da8dSAndroid Build Coastguard Worker def _send(self, buf, write=_write): 365*cda5da8dSAndroid Build Coastguard Worker remaining = len(buf) 366*cda5da8dSAndroid Build Coastguard Worker while True: 367*cda5da8dSAndroid Build Coastguard Worker n = write(self._handle, buf) 368*cda5da8dSAndroid Build Coastguard Worker remaining -= n 369*cda5da8dSAndroid Build Coastguard Worker if remaining == 0: 370*cda5da8dSAndroid Build Coastguard Worker break 371*cda5da8dSAndroid Build Coastguard Worker buf = buf[n:] 372*cda5da8dSAndroid Build Coastguard Worker 373*cda5da8dSAndroid Build Coastguard Worker def _recv(self, size, read=_read): 374*cda5da8dSAndroid Build Coastguard Worker buf = io.BytesIO() 375*cda5da8dSAndroid Build Coastguard Worker handle = self._handle 376*cda5da8dSAndroid Build Coastguard Worker remaining = size 377*cda5da8dSAndroid Build Coastguard Worker while remaining > 0: 378*cda5da8dSAndroid Build Coastguard Worker chunk = read(handle, remaining) 379*cda5da8dSAndroid Build Coastguard Worker n = len(chunk) 380*cda5da8dSAndroid Build Coastguard Worker if n == 0: 381*cda5da8dSAndroid Build Coastguard Worker if remaining == size: 382*cda5da8dSAndroid Build Coastguard Worker raise EOFError 383*cda5da8dSAndroid Build Coastguard Worker else: 384*cda5da8dSAndroid Build Coastguard Worker raise OSError("got end of file during message") 385*cda5da8dSAndroid Build Coastguard Worker buf.write(chunk) 386*cda5da8dSAndroid Build Coastguard Worker remaining -= n 387*cda5da8dSAndroid Build Coastguard Worker return buf 388*cda5da8dSAndroid Build Coastguard Worker 389*cda5da8dSAndroid Build Coastguard Worker def _send_bytes(self, buf): 390*cda5da8dSAndroid Build Coastguard Worker n = len(buf) 391*cda5da8dSAndroid Build Coastguard Worker if n > 0x7fffffff: 392*cda5da8dSAndroid Build Coastguard Worker pre_header = struct.pack("!i", -1) 393*cda5da8dSAndroid Build Coastguard Worker header = struct.pack("!Q", n) 394*cda5da8dSAndroid Build Coastguard Worker self._send(pre_header) 395*cda5da8dSAndroid Build Coastguard Worker self._send(header) 396*cda5da8dSAndroid Build Coastguard Worker self._send(buf) 397*cda5da8dSAndroid Build Coastguard Worker else: 398*cda5da8dSAndroid Build Coastguard Worker # For wire compatibility with 3.7 and lower 399*cda5da8dSAndroid Build Coastguard Worker header = struct.pack("!i", n) 400*cda5da8dSAndroid Build Coastguard Worker if n > 16384: 401*cda5da8dSAndroid Build Coastguard Worker # The payload is large so Nagle's algorithm won't be triggered 402*cda5da8dSAndroid Build Coastguard Worker # and we'd better avoid the cost of concatenation. 403*cda5da8dSAndroid Build Coastguard Worker self._send(header) 404*cda5da8dSAndroid Build Coastguard Worker self._send(buf) 405*cda5da8dSAndroid Build Coastguard Worker else: 406*cda5da8dSAndroid Build Coastguard Worker # Issue #20540: concatenate before sending, to avoid delays due 407*cda5da8dSAndroid Build Coastguard Worker # to Nagle's algorithm on a TCP socket. 408*cda5da8dSAndroid Build Coastguard Worker # Also note we want to avoid sending a 0-length buffer separately, 409*cda5da8dSAndroid Build Coastguard Worker # to avoid "broken pipe" errors if the other end closed the pipe. 410*cda5da8dSAndroid Build Coastguard Worker self._send(header + buf) 411*cda5da8dSAndroid Build Coastguard Worker 412*cda5da8dSAndroid Build Coastguard Worker def _recv_bytes(self, maxsize=None): 413*cda5da8dSAndroid Build Coastguard Worker buf = self._recv(4) 414*cda5da8dSAndroid Build Coastguard Worker size, = struct.unpack("!i", buf.getvalue()) 415*cda5da8dSAndroid Build Coastguard Worker if size == -1: 416*cda5da8dSAndroid Build Coastguard Worker buf = self._recv(8) 417*cda5da8dSAndroid Build Coastguard Worker size, = struct.unpack("!Q", buf.getvalue()) 418*cda5da8dSAndroid Build Coastguard Worker if maxsize is not None and size > maxsize: 419*cda5da8dSAndroid Build Coastguard Worker return None 420*cda5da8dSAndroid Build Coastguard Worker return self._recv(size) 421*cda5da8dSAndroid Build Coastguard Worker 422*cda5da8dSAndroid Build Coastguard Worker def _poll(self, timeout): 423*cda5da8dSAndroid Build Coastguard Worker r = wait([self], timeout) 424*cda5da8dSAndroid Build Coastguard Worker return bool(r) 425*cda5da8dSAndroid Build Coastguard Worker 426*cda5da8dSAndroid Build Coastguard Worker 427*cda5da8dSAndroid Build Coastguard Worker# 428*cda5da8dSAndroid Build Coastguard Worker# Public functions 429*cda5da8dSAndroid Build Coastguard Worker# 430*cda5da8dSAndroid Build Coastguard Worker 431*cda5da8dSAndroid Build Coastguard Workerclass Listener(object): 432*cda5da8dSAndroid Build Coastguard Worker ''' 433*cda5da8dSAndroid Build Coastguard Worker Returns a listener object. 434*cda5da8dSAndroid Build Coastguard Worker 435*cda5da8dSAndroid Build Coastguard Worker This is a wrapper for a bound socket which is 'listening' for 436*cda5da8dSAndroid Build Coastguard Worker connections, or for a Windows named pipe. 437*cda5da8dSAndroid Build Coastguard Worker ''' 438*cda5da8dSAndroid Build Coastguard Worker def __init__(self, address=None, family=None, backlog=1, authkey=None): 439*cda5da8dSAndroid Build Coastguard Worker family = family or (address and address_type(address)) \ 440*cda5da8dSAndroid Build Coastguard Worker or default_family 441*cda5da8dSAndroid Build Coastguard Worker address = address or arbitrary_address(family) 442*cda5da8dSAndroid Build Coastguard Worker 443*cda5da8dSAndroid Build Coastguard Worker _validate_family(family) 444*cda5da8dSAndroid Build Coastguard Worker if family == 'AF_PIPE': 445*cda5da8dSAndroid Build Coastguard Worker self._listener = PipeListener(address, backlog) 446*cda5da8dSAndroid Build Coastguard Worker else: 447*cda5da8dSAndroid Build Coastguard Worker self._listener = SocketListener(address, family, backlog) 448*cda5da8dSAndroid Build Coastguard Worker 449*cda5da8dSAndroid Build Coastguard Worker if authkey is not None and not isinstance(authkey, bytes): 450*cda5da8dSAndroid Build Coastguard Worker raise TypeError('authkey should be a byte string') 451*cda5da8dSAndroid Build Coastguard Worker 452*cda5da8dSAndroid Build Coastguard Worker self._authkey = authkey 453*cda5da8dSAndroid Build Coastguard Worker 454*cda5da8dSAndroid Build Coastguard Worker def accept(self): 455*cda5da8dSAndroid Build Coastguard Worker ''' 456*cda5da8dSAndroid Build Coastguard Worker Accept a connection on the bound socket or named pipe of `self`. 457*cda5da8dSAndroid Build Coastguard Worker 458*cda5da8dSAndroid Build Coastguard Worker Returns a `Connection` object. 459*cda5da8dSAndroid Build Coastguard Worker ''' 460*cda5da8dSAndroid Build Coastguard Worker if self._listener is None: 461*cda5da8dSAndroid Build Coastguard Worker raise OSError('listener is closed') 462*cda5da8dSAndroid Build Coastguard Worker c = self._listener.accept() 463*cda5da8dSAndroid Build Coastguard Worker if self._authkey: 464*cda5da8dSAndroid Build Coastguard Worker deliver_challenge(c, self._authkey) 465*cda5da8dSAndroid Build Coastguard Worker answer_challenge(c, self._authkey) 466*cda5da8dSAndroid Build Coastguard Worker return c 467*cda5da8dSAndroid Build Coastguard Worker 468*cda5da8dSAndroid Build Coastguard Worker def close(self): 469*cda5da8dSAndroid Build Coastguard Worker ''' 470*cda5da8dSAndroid Build Coastguard Worker Close the bound socket or named pipe of `self`. 471*cda5da8dSAndroid Build Coastguard Worker ''' 472*cda5da8dSAndroid Build Coastguard Worker listener = self._listener 473*cda5da8dSAndroid Build Coastguard Worker if listener is not None: 474*cda5da8dSAndroid Build Coastguard Worker self._listener = None 475*cda5da8dSAndroid Build Coastguard Worker listener.close() 476*cda5da8dSAndroid Build Coastguard Worker 477*cda5da8dSAndroid Build Coastguard Worker @property 478*cda5da8dSAndroid Build Coastguard Worker def address(self): 479*cda5da8dSAndroid Build Coastguard Worker return self._listener._address 480*cda5da8dSAndroid Build Coastguard Worker 481*cda5da8dSAndroid Build Coastguard Worker @property 482*cda5da8dSAndroid Build Coastguard Worker def last_accepted(self): 483*cda5da8dSAndroid Build Coastguard Worker return self._listener._last_accepted 484*cda5da8dSAndroid Build Coastguard Worker 485*cda5da8dSAndroid Build Coastguard Worker def __enter__(self): 486*cda5da8dSAndroid Build Coastguard Worker return self 487*cda5da8dSAndroid Build Coastguard Worker 488*cda5da8dSAndroid Build Coastguard Worker def __exit__(self, exc_type, exc_value, exc_tb): 489*cda5da8dSAndroid Build Coastguard Worker self.close() 490*cda5da8dSAndroid Build Coastguard Worker 491*cda5da8dSAndroid Build Coastguard Worker 492*cda5da8dSAndroid Build Coastguard Workerdef Client(address, family=None, authkey=None): 493*cda5da8dSAndroid Build Coastguard Worker ''' 494*cda5da8dSAndroid Build Coastguard Worker Returns a connection to the address of a `Listener` 495*cda5da8dSAndroid Build Coastguard Worker ''' 496*cda5da8dSAndroid Build Coastguard Worker family = family or address_type(address) 497*cda5da8dSAndroid Build Coastguard Worker _validate_family(family) 498*cda5da8dSAndroid Build Coastguard Worker if family == 'AF_PIPE': 499*cda5da8dSAndroid Build Coastguard Worker c = PipeClient(address) 500*cda5da8dSAndroid Build Coastguard Worker else: 501*cda5da8dSAndroid Build Coastguard Worker c = SocketClient(address) 502*cda5da8dSAndroid Build Coastguard Worker 503*cda5da8dSAndroid Build Coastguard Worker if authkey is not None and not isinstance(authkey, bytes): 504*cda5da8dSAndroid Build Coastguard Worker raise TypeError('authkey should be a byte string') 505*cda5da8dSAndroid Build Coastguard Worker 506*cda5da8dSAndroid Build Coastguard Worker if authkey is not None: 507*cda5da8dSAndroid Build Coastguard Worker answer_challenge(c, authkey) 508*cda5da8dSAndroid Build Coastguard Worker deliver_challenge(c, authkey) 509*cda5da8dSAndroid Build Coastguard Worker 510*cda5da8dSAndroid Build Coastguard Worker return c 511*cda5da8dSAndroid Build Coastguard Worker 512*cda5da8dSAndroid Build Coastguard Worker 513*cda5da8dSAndroid Build Coastguard Workerif sys.platform != 'win32': 514*cda5da8dSAndroid Build Coastguard Worker 515*cda5da8dSAndroid Build Coastguard Worker def Pipe(duplex=True): 516*cda5da8dSAndroid Build Coastguard Worker ''' 517*cda5da8dSAndroid Build Coastguard Worker Returns pair of connection objects at either end of a pipe 518*cda5da8dSAndroid Build Coastguard Worker ''' 519*cda5da8dSAndroid Build Coastguard Worker if duplex: 520*cda5da8dSAndroid Build Coastguard Worker s1, s2 = socket.socketpair() 521*cda5da8dSAndroid Build Coastguard Worker s1.setblocking(True) 522*cda5da8dSAndroid Build Coastguard Worker s2.setblocking(True) 523*cda5da8dSAndroid Build Coastguard Worker c1 = Connection(s1.detach()) 524*cda5da8dSAndroid Build Coastguard Worker c2 = Connection(s2.detach()) 525*cda5da8dSAndroid Build Coastguard Worker else: 526*cda5da8dSAndroid Build Coastguard Worker fd1, fd2 = os.pipe() 527*cda5da8dSAndroid Build Coastguard Worker c1 = Connection(fd1, writable=False) 528*cda5da8dSAndroid Build Coastguard Worker c2 = Connection(fd2, readable=False) 529*cda5da8dSAndroid Build Coastguard Worker 530*cda5da8dSAndroid Build Coastguard Worker return c1, c2 531*cda5da8dSAndroid Build Coastguard Worker 532*cda5da8dSAndroid Build Coastguard Workerelse: 533*cda5da8dSAndroid Build Coastguard Worker 534*cda5da8dSAndroid Build Coastguard Worker def Pipe(duplex=True): 535*cda5da8dSAndroid Build Coastguard Worker ''' 536*cda5da8dSAndroid Build Coastguard Worker Returns pair of connection objects at either end of a pipe 537*cda5da8dSAndroid Build Coastguard Worker ''' 538*cda5da8dSAndroid Build Coastguard Worker address = arbitrary_address('AF_PIPE') 539*cda5da8dSAndroid Build Coastguard Worker if duplex: 540*cda5da8dSAndroid Build Coastguard Worker openmode = _winapi.PIPE_ACCESS_DUPLEX 541*cda5da8dSAndroid Build Coastguard Worker access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE 542*cda5da8dSAndroid Build Coastguard Worker obsize, ibsize = BUFSIZE, BUFSIZE 543*cda5da8dSAndroid Build Coastguard Worker else: 544*cda5da8dSAndroid Build Coastguard Worker openmode = _winapi.PIPE_ACCESS_INBOUND 545*cda5da8dSAndroid Build Coastguard Worker access = _winapi.GENERIC_WRITE 546*cda5da8dSAndroid Build Coastguard Worker obsize, ibsize = 0, BUFSIZE 547*cda5da8dSAndroid Build Coastguard Worker 548*cda5da8dSAndroid Build Coastguard Worker h1 = _winapi.CreateNamedPipe( 549*cda5da8dSAndroid Build Coastguard Worker address, openmode | _winapi.FILE_FLAG_OVERLAPPED | 550*cda5da8dSAndroid Build Coastguard Worker _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, 551*cda5da8dSAndroid Build Coastguard Worker _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 552*cda5da8dSAndroid Build Coastguard Worker _winapi.PIPE_WAIT, 553*cda5da8dSAndroid Build Coastguard Worker 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, 554*cda5da8dSAndroid Build Coastguard Worker # default security descriptor: the handle cannot be inherited 555*cda5da8dSAndroid Build Coastguard Worker _winapi.NULL 556*cda5da8dSAndroid Build Coastguard Worker ) 557*cda5da8dSAndroid Build Coastguard Worker h2 = _winapi.CreateFile( 558*cda5da8dSAndroid Build Coastguard Worker address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 559*cda5da8dSAndroid Build Coastguard Worker _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 560*cda5da8dSAndroid Build Coastguard Worker ) 561*cda5da8dSAndroid Build Coastguard Worker _winapi.SetNamedPipeHandleState( 562*cda5da8dSAndroid Build Coastguard Worker h2, _winapi.PIPE_READMODE_MESSAGE, None, None 563*cda5da8dSAndroid Build Coastguard Worker ) 564*cda5da8dSAndroid Build Coastguard Worker 565*cda5da8dSAndroid Build Coastguard Worker overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) 566*cda5da8dSAndroid Build Coastguard Worker _, err = overlapped.GetOverlappedResult(True) 567*cda5da8dSAndroid Build Coastguard Worker assert err == 0 568*cda5da8dSAndroid Build Coastguard Worker 569*cda5da8dSAndroid Build Coastguard Worker c1 = PipeConnection(h1, writable=duplex) 570*cda5da8dSAndroid Build Coastguard Worker c2 = PipeConnection(h2, readable=duplex) 571*cda5da8dSAndroid Build Coastguard Worker 572*cda5da8dSAndroid Build Coastguard Worker return c1, c2 573*cda5da8dSAndroid Build Coastguard Worker 574*cda5da8dSAndroid Build Coastguard Worker# 575*cda5da8dSAndroid Build Coastguard Worker# Definitions for connections based on sockets 576*cda5da8dSAndroid Build Coastguard Worker# 577*cda5da8dSAndroid Build Coastguard Worker 578*cda5da8dSAndroid Build Coastguard Workerclass SocketListener(object): 579*cda5da8dSAndroid Build Coastguard Worker ''' 580*cda5da8dSAndroid Build Coastguard Worker Representation of a socket which is bound to an address and listening 581*cda5da8dSAndroid Build Coastguard Worker ''' 582*cda5da8dSAndroid Build Coastguard Worker def __init__(self, address, family, backlog=1): 583*cda5da8dSAndroid Build Coastguard Worker self._socket = socket.socket(getattr(socket, family)) 584*cda5da8dSAndroid Build Coastguard Worker try: 585*cda5da8dSAndroid Build Coastguard Worker # SO_REUSEADDR has different semantics on Windows (issue #2550). 586*cda5da8dSAndroid Build Coastguard Worker if os.name == 'posix': 587*cda5da8dSAndroid Build Coastguard Worker self._socket.setsockopt(socket.SOL_SOCKET, 588*cda5da8dSAndroid Build Coastguard Worker socket.SO_REUSEADDR, 1) 589*cda5da8dSAndroid Build Coastguard Worker self._socket.setblocking(True) 590*cda5da8dSAndroid Build Coastguard Worker self._socket.bind(address) 591*cda5da8dSAndroid Build Coastguard Worker self._socket.listen(backlog) 592*cda5da8dSAndroid Build Coastguard Worker self._address = self._socket.getsockname() 593*cda5da8dSAndroid Build Coastguard Worker except OSError: 594*cda5da8dSAndroid Build Coastguard Worker self._socket.close() 595*cda5da8dSAndroid Build Coastguard Worker raise 596*cda5da8dSAndroid Build Coastguard Worker self._family = family 597*cda5da8dSAndroid Build Coastguard Worker self._last_accepted = None 598*cda5da8dSAndroid Build Coastguard Worker 599*cda5da8dSAndroid Build Coastguard Worker if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address): 600*cda5da8dSAndroid Build Coastguard Worker # Linux abstract socket namespaces do not need to be explicitly unlinked 601*cda5da8dSAndroid Build Coastguard Worker self._unlink = util.Finalize( 602*cda5da8dSAndroid Build Coastguard Worker self, os.unlink, args=(address,), exitpriority=0 603*cda5da8dSAndroid Build Coastguard Worker ) 604*cda5da8dSAndroid Build Coastguard Worker else: 605*cda5da8dSAndroid Build Coastguard Worker self._unlink = None 606*cda5da8dSAndroid Build Coastguard Worker 607*cda5da8dSAndroid Build Coastguard Worker def accept(self): 608*cda5da8dSAndroid Build Coastguard Worker s, self._last_accepted = self._socket.accept() 609*cda5da8dSAndroid Build Coastguard Worker s.setblocking(True) 610*cda5da8dSAndroid Build Coastguard Worker return Connection(s.detach()) 611*cda5da8dSAndroid Build Coastguard Worker 612*cda5da8dSAndroid Build Coastguard Worker def close(self): 613*cda5da8dSAndroid Build Coastguard Worker try: 614*cda5da8dSAndroid Build Coastguard Worker self._socket.close() 615*cda5da8dSAndroid Build Coastguard Worker finally: 616*cda5da8dSAndroid Build Coastguard Worker unlink = self._unlink 617*cda5da8dSAndroid Build Coastguard Worker if unlink is not None: 618*cda5da8dSAndroid Build Coastguard Worker self._unlink = None 619*cda5da8dSAndroid Build Coastguard Worker unlink() 620*cda5da8dSAndroid Build Coastguard Worker 621*cda5da8dSAndroid Build Coastguard Worker 622*cda5da8dSAndroid Build Coastguard Workerdef SocketClient(address): 623*cda5da8dSAndroid Build Coastguard Worker ''' 624*cda5da8dSAndroid Build Coastguard Worker Return a connection object connected to the socket given by `address` 625*cda5da8dSAndroid Build Coastguard Worker ''' 626*cda5da8dSAndroid Build Coastguard Worker family = address_type(address) 627*cda5da8dSAndroid Build Coastguard Worker with socket.socket( getattr(socket, family) ) as s: 628*cda5da8dSAndroid Build Coastguard Worker s.setblocking(True) 629*cda5da8dSAndroid Build Coastguard Worker s.connect(address) 630*cda5da8dSAndroid Build Coastguard Worker return Connection(s.detach()) 631*cda5da8dSAndroid Build Coastguard Worker 632*cda5da8dSAndroid Build Coastguard Worker# 633*cda5da8dSAndroid Build Coastguard Worker# Definitions for connections based on named pipes 634*cda5da8dSAndroid Build Coastguard Worker# 635*cda5da8dSAndroid Build Coastguard Worker 636*cda5da8dSAndroid Build Coastguard Workerif sys.platform == 'win32': 637*cda5da8dSAndroid Build Coastguard Worker 638*cda5da8dSAndroid Build Coastguard Worker class PipeListener(object): 639*cda5da8dSAndroid Build Coastguard Worker ''' 640*cda5da8dSAndroid Build Coastguard Worker Representation of a named pipe 641*cda5da8dSAndroid Build Coastguard Worker ''' 642*cda5da8dSAndroid Build Coastguard Worker def __init__(self, address, backlog=None): 643*cda5da8dSAndroid Build Coastguard Worker self._address = address 644*cda5da8dSAndroid Build Coastguard Worker self._handle_queue = [self._new_handle(first=True)] 645*cda5da8dSAndroid Build Coastguard Worker 646*cda5da8dSAndroid Build Coastguard Worker self._last_accepted = None 647*cda5da8dSAndroid Build Coastguard Worker util.sub_debug('listener created with address=%r', self._address) 648*cda5da8dSAndroid Build Coastguard Worker self.close = util.Finalize( 649*cda5da8dSAndroid Build Coastguard Worker self, PipeListener._finalize_pipe_listener, 650*cda5da8dSAndroid Build Coastguard Worker args=(self._handle_queue, self._address), exitpriority=0 651*cda5da8dSAndroid Build Coastguard Worker ) 652*cda5da8dSAndroid Build Coastguard Worker 653*cda5da8dSAndroid Build Coastguard Worker def _new_handle(self, first=False): 654*cda5da8dSAndroid Build Coastguard Worker flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 655*cda5da8dSAndroid Build Coastguard Worker if first: 656*cda5da8dSAndroid Build Coastguard Worker flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 657*cda5da8dSAndroid Build Coastguard Worker return _winapi.CreateNamedPipe( 658*cda5da8dSAndroid Build Coastguard Worker self._address, flags, 659*cda5da8dSAndroid Build Coastguard Worker _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 660*cda5da8dSAndroid Build Coastguard Worker _winapi.PIPE_WAIT, 661*cda5da8dSAndroid Build Coastguard Worker _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 662*cda5da8dSAndroid Build Coastguard Worker _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL 663*cda5da8dSAndroid Build Coastguard Worker ) 664*cda5da8dSAndroid Build Coastguard Worker 665*cda5da8dSAndroid Build Coastguard Worker def accept(self): 666*cda5da8dSAndroid Build Coastguard Worker self._handle_queue.append(self._new_handle()) 667*cda5da8dSAndroid Build Coastguard Worker handle = self._handle_queue.pop(0) 668*cda5da8dSAndroid Build Coastguard Worker try: 669*cda5da8dSAndroid Build Coastguard Worker ov = _winapi.ConnectNamedPipe(handle, overlapped=True) 670*cda5da8dSAndroid Build Coastguard Worker except OSError as e: 671*cda5da8dSAndroid Build Coastguard Worker if e.winerror != _winapi.ERROR_NO_DATA: 672*cda5da8dSAndroid Build Coastguard Worker raise 673*cda5da8dSAndroid Build Coastguard Worker # ERROR_NO_DATA can occur if a client has already connected, 674*cda5da8dSAndroid Build Coastguard Worker # written data and then disconnected -- see Issue 14725. 675*cda5da8dSAndroid Build Coastguard Worker else: 676*cda5da8dSAndroid Build Coastguard Worker try: 677*cda5da8dSAndroid Build Coastguard Worker res = _winapi.WaitForMultipleObjects( 678*cda5da8dSAndroid Build Coastguard Worker [ov.event], False, INFINITE) 679*cda5da8dSAndroid Build Coastguard Worker except: 680*cda5da8dSAndroid Build Coastguard Worker ov.cancel() 681*cda5da8dSAndroid Build Coastguard Worker _winapi.CloseHandle(handle) 682*cda5da8dSAndroid Build Coastguard Worker raise 683*cda5da8dSAndroid Build Coastguard Worker finally: 684*cda5da8dSAndroid Build Coastguard Worker _, err = ov.GetOverlappedResult(True) 685*cda5da8dSAndroid Build Coastguard Worker assert err == 0 686*cda5da8dSAndroid Build Coastguard Worker return PipeConnection(handle) 687*cda5da8dSAndroid Build Coastguard Worker 688*cda5da8dSAndroid Build Coastguard Worker @staticmethod 689*cda5da8dSAndroid Build Coastguard Worker def _finalize_pipe_listener(queue, address): 690*cda5da8dSAndroid Build Coastguard Worker util.sub_debug('closing listener with address=%r', address) 691*cda5da8dSAndroid Build Coastguard Worker for handle in queue: 692*cda5da8dSAndroid Build Coastguard Worker _winapi.CloseHandle(handle) 693*cda5da8dSAndroid Build Coastguard Worker 694*cda5da8dSAndroid Build Coastguard Worker def PipeClient(address): 695*cda5da8dSAndroid Build Coastguard Worker ''' 696*cda5da8dSAndroid Build Coastguard Worker Return a connection object connected to the pipe given by `address` 697*cda5da8dSAndroid Build Coastguard Worker ''' 698*cda5da8dSAndroid Build Coastguard Worker t = _init_timeout() 699*cda5da8dSAndroid Build Coastguard Worker while 1: 700*cda5da8dSAndroid Build Coastguard Worker try: 701*cda5da8dSAndroid Build Coastguard Worker _winapi.WaitNamedPipe(address, 1000) 702*cda5da8dSAndroid Build Coastguard Worker h = _winapi.CreateFile( 703*cda5da8dSAndroid Build Coastguard Worker address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 704*cda5da8dSAndroid Build Coastguard Worker 0, _winapi.NULL, _winapi.OPEN_EXISTING, 705*cda5da8dSAndroid Build Coastguard Worker _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 706*cda5da8dSAndroid Build Coastguard Worker ) 707*cda5da8dSAndroid Build Coastguard Worker except OSError as e: 708*cda5da8dSAndroid Build Coastguard Worker if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, 709*cda5da8dSAndroid Build Coastguard Worker _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): 710*cda5da8dSAndroid Build Coastguard Worker raise 711*cda5da8dSAndroid Build Coastguard Worker else: 712*cda5da8dSAndroid Build Coastguard Worker break 713*cda5da8dSAndroid Build Coastguard Worker else: 714*cda5da8dSAndroid Build Coastguard Worker raise 715*cda5da8dSAndroid Build Coastguard Worker 716*cda5da8dSAndroid Build Coastguard Worker _winapi.SetNamedPipeHandleState( 717*cda5da8dSAndroid Build Coastguard Worker h, _winapi.PIPE_READMODE_MESSAGE, None, None 718*cda5da8dSAndroid Build Coastguard Worker ) 719*cda5da8dSAndroid Build Coastguard Worker return PipeConnection(h) 720*cda5da8dSAndroid Build Coastguard Worker 721*cda5da8dSAndroid Build Coastguard Worker# 722*cda5da8dSAndroid Build Coastguard Worker# Authentication stuff 723*cda5da8dSAndroid Build Coastguard Worker# 724*cda5da8dSAndroid Build Coastguard Worker 725*cda5da8dSAndroid Build Coastguard WorkerMESSAGE_LENGTH = 20 726*cda5da8dSAndroid Build Coastguard Worker 727*cda5da8dSAndroid Build Coastguard WorkerCHALLENGE = b'#CHALLENGE#' 728*cda5da8dSAndroid Build Coastguard WorkerWELCOME = b'#WELCOME#' 729*cda5da8dSAndroid Build Coastguard WorkerFAILURE = b'#FAILURE#' 730*cda5da8dSAndroid Build Coastguard Worker 731*cda5da8dSAndroid Build Coastguard Workerdef deliver_challenge(connection, authkey): 732*cda5da8dSAndroid Build Coastguard Worker import hmac 733*cda5da8dSAndroid Build Coastguard Worker if not isinstance(authkey, bytes): 734*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 735*cda5da8dSAndroid Build Coastguard Worker "Authkey must be bytes, not {0!s}".format(type(authkey))) 736*cda5da8dSAndroid Build Coastguard Worker message = os.urandom(MESSAGE_LENGTH) 737*cda5da8dSAndroid Build Coastguard Worker connection.send_bytes(CHALLENGE + message) 738*cda5da8dSAndroid Build Coastguard Worker digest = hmac.new(authkey, message, 'md5').digest() 739*cda5da8dSAndroid Build Coastguard Worker response = connection.recv_bytes(256) # reject large message 740*cda5da8dSAndroid Build Coastguard Worker if response == digest: 741*cda5da8dSAndroid Build Coastguard Worker connection.send_bytes(WELCOME) 742*cda5da8dSAndroid Build Coastguard Worker else: 743*cda5da8dSAndroid Build Coastguard Worker connection.send_bytes(FAILURE) 744*cda5da8dSAndroid Build Coastguard Worker raise AuthenticationError('digest received was wrong') 745*cda5da8dSAndroid Build Coastguard Worker 746*cda5da8dSAndroid Build Coastguard Workerdef answer_challenge(connection, authkey): 747*cda5da8dSAndroid Build Coastguard Worker import hmac 748*cda5da8dSAndroid Build Coastguard Worker if not isinstance(authkey, bytes): 749*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 750*cda5da8dSAndroid Build Coastguard Worker "Authkey must be bytes, not {0!s}".format(type(authkey))) 751*cda5da8dSAndroid Build Coastguard Worker message = connection.recv_bytes(256) # reject large message 752*cda5da8dSAndroid Build Coastguard Worker assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message 753*cda5da8dSAndroid Build Coastguard Worker message = message[len(CHALLENGE):] 754*cda5da8dSAndroid Build Coastguard Worker digest = hmac.new(authkey, message, 'md5').digest() 755*cda5da8dSAndroid Build Coastguard Worker connection.send_bytes(digest) 756*cda5da8dSAndroid Build Coastguard Worker response = connection.recv_bytes(256) # reject large message 757*cda5da8dSAndroid Build Coastguard Worker if response != WELCOME: 758*cda5da8dSAndroid Build Coastguard Worker raise AuthenticationError('digest sent was rejected') 759*cda5da8dSAndroid Build Coastguard Worker 760*cda5da8dSAndroid Build Coastguard Worker# 761*cda5da8dSAndroid Build Coastguard Worker# Support for using xmlrpclib for serialization 762*cda5da8dSAndroid Build Coastguard Worker# 763*cda5da8dSAndroid Build Coastguard Worker 764*cda5da8dSAndroid Build Coastguard Workerclass ConnectionWrapper(object): 765*cda5da8dSAndroid Build Coastguard Worker def __init__(self, conn, dumps, loads): 766*cda5da8dSAndroid Build Coastguard Worker self._conn = conn 767*cda5da8dSAndroid Build Coastguard Worker self._dumps = dumps 768*cda5da8dSAndroid Build Coastguard Worker self._loads = loads 769*cda5da8dSAndroid Build Coastguard Worker for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): 770*cda5da8dSAndroid Build Coastguard Worker obj = getattr(conn, attr) 771*cda5da8dSAndroid Build Coastguard Worker setattr(self, attr, obj) 772*cda5da8dSAndroid Build Coastguard Worker def send(self, obj): 773*cda5da8dSAndroid Build Coastguard Worker s = self._dumps(obj) 774*cda5da8dSAndroid Build Coastguard Worker self._conn.send_bytes(s) 775*cda5da8dSAndroid Build Coastguard Worker def recv(self): 776*cda5da8dSAndroid Build Coastguard Worker s = self._conn.recv_bytes() 777*cda5da8dSAndroid Build Coastguard Worker return self._loads(s) 778*cda5da8dSAndroid Build Coastguard Worker 779*cda5da8dSAndroid Build Coastguard Workerdef _xml_dumps(obj): 780*cda5da8dSAndroid Build Coastguard Worker return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') 781*cda5da8dSAndroid Build Coastguard Worker 782*cda5da8dSAndroid Build Coastguard Workerdef _xml_loads(s): 783*cda5da8dSAndroid Build Coastguard Worker (obj,), method = xmlrpclib.loads(s.decode('utf-8')) 784*cda5da8dSAndroid Build Coastguard Worker return obj 785*cda5da8dSAndroid Build Coastguard Worker 786*cda5da8dSAndroid Build Coastguard Workerclass XmlListener(Listener): 787*cda5da8dSAndroid Build Coastguard Worker def accept(self): 788*cda5da8dSAndroid Build Coastguard Worker global xmlrpclib 789*cda5da8dSAndroid Build Coastguard Worker import xmlrpc.client as xmlrpclib 790*cda5da8dSAndroid Build Coastguard Worker obj = Listener.accept(self) 791*cda5da8dSAndroid Build Coastguard Worker return ConnectionWrapper(obj, _xml_dumps, _xml_loads) 792*cda5da8dSAndroid Build Coastguard Worker 793*cda5da8dSAndroid Build Coastguard Workerdef XmlClient(*args, **kwds): 794*cda5da8dSAndroid Build Coastguard Worker global xmlrpclib 795*cda5da8dSAndroid Build Coastguard Worker import xmlrpc.client as xmlrpclib 796*cda5da8dSAndroid Build Coastguard Worker return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) 797*cda5da8dSAndroid Build Coastguard Worker 798*cda5da8dSAndroid Build Coastguard Worker# 799*cda5da8dSAndroid Build Coastguard Worker# Wait 800*cda5da8dSAndroid Build Coastguard Worker# 801*cda5da8dSAndroid Build Coastguard Worker 802*cda5da8dSAndroid Build Coastguard Workerif sys.platform == 'win32': 803*cda5da8dSAndroid Build Coastguard Worker 804*cda5da8dSAndroid Build Coastguard Worker def _exhaustive_wait(handles, timeout): 805*cda5da8dSAndroid Build Coastguard Worker # Return ALL handles which are currently signalled. (Only 806*cda5da8dSAndroid Build Coastguard Worker # returning the first signalled might create starvation issues.) 807*cda5da8dSAndroid Build Coastguard Worker L = list(handles) 808*cda5da8dSAndroid Build Coastguard Worker ready = [] 809*cda5da8dSAndroid Build Coastguard Worker while L: 810*cda5da8dSAndroid Build Coastguard Worker res = _winapi.WaitForMultipleObjects(L, False, timeout) 811*cda5da8dSAndroid Build Coastguard Worker if res == WAIT_TIMEOUT: 812*cda5da8dSAndroid Build Coastguard Worker break 813*cda5da8dSAndroid Build Coastguard Worker elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): 814*cda5da8dSAndroid Build Coastguard Worker res -= WAIT_OBJECT_0 815*cda5da8dSAndroid Build Coastguard Worker elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): 816*cda5da8dSAndroid Build Coastguard Worker res -= WAIT_ABANDONED_0 817*cda5da8dSAndroid Build Coastguard Worker else: 818*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('Should not get here') 819*cda5da8dSAndroid Build Coastguard Worker ready.append(L[res]) 820*cda5da8dSAndroid Build Coastguard Worker L = L[res+1:] 821*cda5da8dSAndroid Build Coastguard Worker timeout = 0 822*cda5da8dSAndroid Build Coastguard Worker return ready 823*cda5da8dSAndroid Build Coastguard Worker 824*cda5da8dSAndroid Build Coastguard Worker _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} 825*cda5da8dSAndroid Build Coastguard Worker 826*cda5da8dSAndroid Build Coastguard Worker def wait(object_list, timeout=None): 827*cda5da8dSAndroid Build Coastguard Worker ''' 828*cda5da8dSAndroid Build Coastguard Worker Wait till an object in object_list is ready/readable. 829*cda5da8dSAndroid Build Coastguard Worker 830*cda5da8dSAndroid Build Coastguard Worker Returns list of those objects in object_list which are ready/readable. 831*cda5da8dSAndroid Build Coastguard Worker ''' 832*cda5da8dSAndroid Build Coastguard Worker if timeout is None: 833*cda5da8dSAndroid Build Coastguard Worker timeout = INFINITE 834*cda5da8dSAndroid Build Coastguard Worker elif timeout < 0: 835*cda5da8dSAndroid Build Coastguard Worker timeout = 0 836*cda5da8dSAndroid Build Coastguard Worker else: 837*cda5da8dSAndroid Build Coastguard Worker timeout = int(timeout * 1000 + 0.5) 838*cda5da8dSAndroid Build Coastguard Worker 839*cda5da8dSAndroid Build Coastguard Worker object_list = list(object_list) 840*cda5da8dSAndroid Build Coastguard Worker waithandle_to_obj = {} 841*cda5da8dSAndroid Build Coastguard Worker ov_list = [] 842*cda5da8dSAndroid Build Coastguard Worker ready_objects = set() 843*cda5da8dSAndroid Build Coastguard Worker ready_handles = set() 844*cda5da8dSAndroid Build Coastguard Worker 845*cda5da8dSAndroid Build Coastguard Worker try: 846*cda5da8dSAndroid Build Coastguard Worker for o in object_list: 847*cda5da8dSAndroid Build Coastguard Worker try: 848*cda5da8dSAndroid Build Coastguard Worker fileno = getattr(o, 'fileno') 849*cda5da8dSAndroid Build Coastguard Worker except AttributeError: 850*cda5da8dSAndroid Build Coastguard Worker waithandle_to_obj[o.__index__()] = o 851*cda5da8dSAndroid Build Coastguard Worker else: 852*cda5da8dSAndroid Build Coastguard Worker # start an overlapped read of length zero 853*cda5da8dSAndroid Build Coastguard Worker try: 854*cda5da8dSAndroid Build Coastguard Worker ov, err = _winapi.ReadFile(fileno(), 0, True) 855*cda5da8dSAndroid Build Coastguard Worker except OSError as e: 856*cda5da8dSAndroid Build Coastguard Worker ov, err = None, e.winerror 857*cda5da8dSAndroid Build Coastguard Worker if err not in _ready_errors: 858*cda5da8dSAndroid Build Coastguard Worker raise 859*cda5da8dSAndroid Build Coastguard Worker if err == _winapi.ERROR_IO_PENDING: 860*cda5da8dSAndroid Build Coastguard Worker ov_list.append(ov) 861*cda5da8dSAndroid Build Coastguard Worker waithandle_to_obj[ov.event] = o 862*cda5da8dSAndroid Build Coastguard Worker else: 863*cda5da8dSAndroid Build Coastguard Worker # If o.fileno() is an overlapped pipe handle and 864*cda5da8dSAndroid Build Coastguard Worker # err == 0 then there is a zero length message 865*cda5da8dSAndroid Build Coastguard Worker # in the pipe, but it HAS NOT been consumed... 866*cda5da8dSAndroid Build Coastguard Worker if ov and sys.getwindowsversion()[:2] >= (6, 2): 867*cda5da8dSAndroid Build Coastguard Worker # ... except on Windows 8 and later, where 868*cda5da8dSAndroid Build Coastguard Worker # the message HAS been consumed. 869*cda5da8dSAndroid Build Coastguard Worker try: 870*cda5da8dSAndroid Build Coastguard Worker _, err = ov.GetOverlappedResult(False) 871*cda5da8dSAndroid Build Coastguard Worker except OSError as e: 872*cda5da8dSAndroid Build Coastguard Worker err = e.winerror 873*cda5da8dSAndroid Build Coastguard Worker if not err and hasattr(o, '_got_empty_message'): 874*cda5da8dSAndroid Build Coastguard Worker o._got_empty_message = True 875*cda5da8dSAndroid Build Coastguard Worker ready_objects.add(o) 876*cda5da8dSAndroid Build Coastguard Worker timeout = 0 877*cda5da8dSAndroid Build Coastguard Worker 878*cda5da8dSAndroid Build Coastguard Worker ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) 879*cda5da8dSAndroid Build Coastguard Worker finally: 880*cda5da8dSAndroid Build Coastguard Worker # request that overlapped reads stop 881*cda5da8dSAndroid Build Coastguard Worker for ov in ov_list: 882*cda5da8dSAndroid Build Coastguard Worker ov.cancel() 883*cda5da8dSAndroid Build Coastguard Worker 884*cda5da8dSAndroid Build Coastguard Worker # wait for all overlapped reads to stop 885*cda5da8dSAndroid Build Coastguard Worker for ov in ov_list: 886*cda5da8dSAndroid Build Coastguard Worker try: 887*cda5da8dSAndroid Build Coastguard Worker _, err = ov.GetOverlappedResult(True) 888*cda5da8dSAndroid Build Coastguard Worker except OSError as e: 889*cda5da8dSAndroid Build Coastguard Worker err = e.winerror 890*cda5da8dSAndroid Build Coastguard Worker if err not in _ready_errors: 891*cda5da8dSAndroid Build Coastguard Worker raise 892*cda5da8dSAndroid Build Coastguard Worker if err != _winapi.ERROR_OPERATION_ABORTED: 893*cda5da8dSAndroid Build Coastguard Worker o = waithandle_to_obj[ov.event] 894*cda5da8dSAndroid Build Coastguard Worker ready_objects.add(o) 895*cda5da8dSAndroid Build Coastguard Worker if err == 0: 896*cda5da8dSAndroid Build Coastguard Worker # If o.fileno() is an overlapped pipe handle then 897*cda5da8dSAndroid Build Coastguard Worker # a zero length message HAS been consumed. 898*cda5da8dSAndroid Build Coastguard Worker if hasattr(o, '_got_empty_message'): 899*cda5da8dSAndroid Build Coastguard Worker o._got_empty_message = True 900*cda5da8dSAndroid Build Coastguard Worker 901*cda5da8dSAndroid Build Coastguard Worker ready_objects.update(waithandle_to_obj[h] for h in ready_handles) 902*cda5da8dSAndroid Build Coastguard Worker return [o for o in object_list if o in ready_objects] 903*cda5da8dSAndroid Build Coastguard Worker 904*cda5da8dSAndroid Build Coastguard Workerelse: 905*cda5da8dSAndroid Build Coastguard Worker 906*cda5da8dSAndroid Build Coastguard Worker import selectors 907*cda5da8dSAndroid Build Coastguard Worker 908*cda5da8dSAndroid Build Coastguard Worker # poll/select have the advantage of not requiring any extra file 909*cda5da8dSAndroid Build Coastguard Worker # descriptor, contrarily to epoll/kqueue (also, they require a single 910*cda5da8dSAndroid Build Coastguard Worker # syscall). 911*cda5da8dSAndroid Build Coastguard Worker if hasattr(selectors, 'PollSelector'): 912*cda5da8dSAndroid Build Coastguard Worker _WaitSelector = selectors.PollSelector 913*cda5da8dSAndroid Build Coastguard Worker else: 914*cda5da8dSAndroid Build Coastguard Worker _WaitSelector = selectors.SelectSelector 915*cda5da8dSAndroid Build Coastguard Worker 916*cda5da8dSAndroid Build Coastguard Worker def wait(object_list, timeout=None): 917*cda5da8dSAndroid Build Coastguard Worker ''' 918*cda5da8dSAndroid Build Coastguard Worker Wait till an object in object_list is ready/readable. 919*cda5da8dSAndroid Build Coastguard Worker 920*cda5da8dSAndroid Build Coastguard Worker Returns list of those objects in object_list which are ready/readable. 921*cda5da8dSAndroid Build Coastguard Worker ''' 922*cda5da8dSAndroid Build Coastguard Worker with _WaitSelector() as selector: 923*cda5da8dSAndroid Build Coastguard Worker for obj in object_list: 924*cda5da8dSAndroid Build Coastguard Worker selector.register(obj, selectors.EVENT_READ) 925*cda5da8dSAndroid Build Coastguard Worker 926*cda5da8dSAndroid Build Coastguard Worker if timeout is not None: 927*cda5da8dSAndroid Build Coastguard Worker deadline = time.monotonic() + timeout 928*cda5da8dSAndroid Build Coastguard Worker 929*cda5da8dSAndroid Build Coastguard Worker while True: 930*cda5da8dSAndroid Build Coastguard Worker ready = selector.select(timeout) 931*cda5da8dSAndroid Build Coastguard Worker if ready: 932*cda5da8dSAndroid Build Coastguard Worker return [key.fileobj for (key, events) in ready] 933*cda5da8dSAndroid Build Coastguard Worker else: 934*cda5da8dSAndroid Build Coastguard Worker if timeout is not None: 935*cda5da8dSAndroid Build Coastguard Worker timeout = deadline - time.monotonic() 936*cda5da8dSAndroid Build Coastguard Worker if timeout < 0: 937*cda5da8dSAndroid Build Coastguard Worker return ready 938*cda5da8dSAndroid Build Coastguard Worker 939*cda5da8dSAndroid Build Coastguard Worker# 940*cda5da8dSAndroid Build Coastguard Worker# Make connection and socket objects shareable if possible 941*cda5da8dSAndroid Build Coastguard Worker# 942*cda5da8dSAndroid Build Coastguard Worker 943*cda5da8dSAndroid Build Coastguard Workerif sys.platform == 'win32': 944*cda5da8dSAndroid Build Coastguard Worker def reduce_connection(conn): 945*cda5da8dSAndroid Build Coastguard Worker handle = conn.fileno() 946*cda5da8dSAndroid Build Coastguard Worker with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: 947*cda5da8dSAndroid Build Coastguard Worker from . import resource_sharer 948*cda5da8dSAndroid Build Coastguard Worker ds = resource_sharer.DupSocket(s) 949*cda5da8dSAndroid Build Coastguard Worker return rebuild_connection, (ds, conn.readable, conn.writable) 950*cda5da8dSAndroid Build Coastguard Worker def rebuild_connection(ds, readable, writable): 951*cda5da8dSAndroid Build Coastguard Worker sock = ds.detach() 952*cda5da8dSAndroid Build Coastguard Worker return Connection(sock.detach(), readable, writable) 953*cda5da8dSAndroid Build Coastguard Worker reduction.register(Connection, reduce_connection) 954*cda5da8dSAndroid Build Coastguard Worker 955*cda5da8dSAndroid Build Coastguard Worker def reduce_pipe_connection(conn): 956*cda5da8dSAndroid Build Coastguard Worker access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | 957*cda5da8dSAndroid Build Coastguard Worker (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) 958*cda5da8dSAndroid Build Coastguard Worker dh = reduction.DupHandle(conn.fileno(), access) 959*cda5da8dSAndroid Build Coastguard Worker return rebuild_pipe_connection, (dh, conn.readable, conn.writable) 960*cda5da8dSAndroid Build Coastguard Worker def rebuild_pipe_connection(dh, readable, writable): 961*cda5da8dSAndroid Build Coastguard Worker handle = dh.detach() 962*cda5da8dSAndroid Build Coastguard Worker return PipeConnection(handle, readable, writable) 963*cda5da8dSAndroid Build Coastguard Worker reduction.register(PipeConnection, reduce_pipe_connection) 964*cda5da8dSAndroid Build Coastguard Worker 965*cda5da8dSAndroid Build Coastguard Workerelse: 966*cda5da8dSAndroid Build Coastguard Worker def reduce_connection(conn): 967*cda5da8dSAndroid Build Coastguard Worker df = reduction.DupFd(conn.fileno()) 968*cda5da8dSAndroid Build Coastguard Worker return rebuild_connection, (df, conn.readable, conn.writable) 969*cda5da8dSAndroid Build Coastguard Worker def rebuild_connection(df, readable, writable): 970*cda5da8dSAndroid Build Coastguard Worker fd = df.detach() 971*cda5da8dSAndroid Build Coastguard Worker return Connection(fd, readable, writable) 972*cda5da8dSAndroid Build Coastguard Worker reduction.register(Connection, reduce_connection) 973