xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/multiprocessing/connection.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1#
2# A higher level module for using sockets (or Windows named pipes)
3#
4# multiprocessing/connection.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
11
12import io
13import os
14import sys
15import socket
16import struct
17import time
18import tempfile
19import itertools
20
21import _multiprocessing
22
23from . import util
24
25from . import AuthenticationError, BufferTooShort
26from .context import reduction
27_ForkingPickler = reduction.ForkingPickler
28
29try:
30    import _winapi
31    from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
32except ImportError:
33    if sys.platform == 'win32':
34        raise
35    _winapi = None
36
37#
38#
39#
40
41BUFSIZE = 8192
42# A very generous timeout when it comes to local connections...
43CONNECTION_TIMEOUT = 20.
44
45_mmap_counter = itertools.count()
46
47default_family = 'AF_INET'
48families = ['AF_INET']
49
50if hasattr(socket, 'AF_UNIX'):
51    default_family = 'AF_UNIX'
52    families += ['AF_UNIX']
53
54if sys.platform == 'win32':
55    default_family = 'AF_PIPE'
56    families += ['AF_PIPE']
57
58
59def _init_timeout(timeout=CONNECTION_TIMEOUT):
60    return time.monotonic() + timeout
61
62def _check_timeout(t):
63    return time.monotonic() > t
64
65#
66#
67#
68
69def arbitrary_address(family):
70    '''
71    Return an arbitrary free address for the given family
72    '''
73    if family == 'AF_INET':
74        return ('localhost', 0)
75    elif family == 'AF_UNIX':
76        return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
77    elif family == 'AF_PIPE':
78        return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
79                               (os.getpid(), next(_mmap_counter)), dir="")
80    else:
81        raise ValueError('unrecognized family')
82
83def _validate_family(family):
84    '''
85    Checks if the family is valid for the current environment.
86    '''
87    if sys.platform != 'win32' and family == 'AF_PIPE':
88        raise ValueError('Family %s is not recognized.' % family)
89
90    if sys.platform == 'win32' and family == 'AF_UNIX':
91        # double check
92        if not hasattr(socket, family):
93            raise ValueError('Family %s is not recognized.' % family)
94
95def address_type(address):
96    '''
97    Return the types of the address
98
99    This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
100    '''
101    if type(address) == tuple:
102        return 'AF_INET'
103    elif type(address) is str and address.startswith('\\\\'):
104        return 'AF_PIPE'
105    elif type(address) is str or util.is_abstract_socket_namespace(address):
106        return 'AF_UNIX'
107    else:
108        raise ValueError('address type of %r unrecognized' % address)
109
110#
111# Connection classes
112#
113
114class _ConnectionBase:
115    _handle = None
116
117    def __init__(self, handle, readable=True, writable=True):
118        handle = handle.__index__()
119        if handle < 0:
120            raise ValueError("invalid handle")
121        if not readable and not writable:
122            raise ValueError(
123                "at least one of `readable` and `writable` must be True")
124        self._handle = handle
125        self._readable = readable
126        self._writable = writable
127
128    # XXX should we use util.Finalize instead of a __del__?
129
130    def __del__(self):
131        if self._handle is not None:
132            self._close()
133
134    def _check_closed(self):
135        if self._handle is None:
136            raise OSError("handle is closed")
137
138    def _check_readable(self):
139        if not self._readable:
140            raise OSError("connection is write-only")
141
142    def _check_writable(self):
143        if not self._writable:
144            raise OSError("connection is read-only")
145
146    def _bad_message_length(self):
147        if self._writable:
148            self._readable = False
149        else:
150            self.close()
151        raise OSError("bad message length")
152
153    @property
154    def closed(self):
155        """True if the connection is closed"""
156        return self._handle is None
157
158    @property
159    def readable(self):
160        """True if the connection is readable"""
161        return self._readable
162
163    @property
164    def writable(self):
165        """True if the connection is writable"""
166        return self._writable
167
168    def fileno(self):
169        """File descriptor or handle of the connection"""
170        self._check_closed()
171        return self._handle
172
173    def close(self):
174        """Close the connection"""
175        if self._handle is not None:
176            try:
177                self._close()
178            finally:
179                self._handle = None
180
181    def send_bytes(self, buf, offset=0, size=None):
182        """Send the bytes data from a bytes-like object"""
183        self._check_closed()
184        self._check_writable()
185        m = memoryview(buf)
186        if m.itemsize > 1:
187            m = m.cast('B')
188        n = m.nbytes
189        if offset < 0:
190            raise ValueError("offset is negative")
191        if n < offset:
192            raise ValueError("buffer length < offset")
193        if size is None:
194            size = n - offset
195        elif size < 0:
196            raise ValueError("size is negative")
197        elif offset + size > n:
198            raise ValueError("buffer length < offset + size")
199        self._send_bytes(m[offset:offset + size])
200
201    def send(self, obj):
202        """Send a (picklable) object"""
203        self._check_closed()
204        self._check_writable()
205        self._send_bytes(_ForkingPickler.dumps(obj))
206
207    def recv_bytes(self, maxlength=None):
208        """
209        Receive bytes data as a bytes object.
210        """
211        self._check_closed()
212        self._check_readable()
213        if maxlength is not None and maxlength < 0:
214            raise ValueError("negative maxlength")
215        buf = self._recv_bytes(maxlength)
216        if buf is None:
217            self._bad_message_length()
218        return buf.getvalue()
219
220    def recv_bytes_into(self, buf, offset=0):
221        """
222        Receive bytes data into a writeable bytes-like object.
223        Return the number of bytes read.
224        """
225        self._check_closed()
226        self._check_readable()
227        with memoryview(buf) as m:
228            # Get bytesize of arbitrary buffer
229            itemsize = m.itemsize
230            bytesize = itemsize * len(m)
231            if offset < 0:
232                raise ValueError("negative offset")
233            elif offset > bytesize:
234                raise ValueError("offset too large")
235            result = self._recv_bytes()
236            size = result.tell()
237            if bytesize < offset + size:
238                raise BufferTooShort(result.getvalue())
239            # Message can fit in dest
240            result.seek(0)
241            result.readinto(m[offset // itemsize :
242                              (offset + size) // itemsize])
243            return size
244
245    def recv(self):
246        """Receive a (picklable) object"""
247        self._check_closed()
248        self._check_readable()
249        buf = self._recv_bytes()
250        return _ForkingPickler.loads(buf.getbuffer())
251
252    def poll(self, timeout=0.0):
253        """Whether there is any input available to be read"""
254        self._check_closed()
255        self._check_readable()
256        return self._poll(timeout)
257
258    def __enter__(self):
259        return self
260
261    def __exit__(self, exc_type, exc_value, exc_tb):
262        self.close()
263
264
265if _winapi:
266
267    class PipeConnection(_ConnectionBase):
268        """
269        Connection class based on a Windows named pipe.
270        Overlapped I/O is used, so the handles must have been created
271        with FILE_FLAG_OVERLAPPED.
272        """
273        _got_empty_message = False
274
275        def _close(self, _CloseHandle=_winapi.CloseHandle):
276            _CloseHandle(self._handle)
277
278        def _send_bytes(self, buf):
279            ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
280            try:
281                if err == _winapi.ERROR_IO_PENDING:
282                    waitres = _winapi.WaitForMultipleObjects(
283                        [ov.event], False, INFINITE)
284                    assert waitres == WAIT_OBJECT_0
285            except:
286                ov.cancel()
287                raise
288            finally:
289                nwritten, err = ov.GetOverlappedResult(True)
290            assert err == 0
291            assert nwritten == len(buf)
292
293        def _recv_bytes(self, maxsize=None):
294            if self._got_empty_message:
295                self._got_empty_message = False
296                return io.BytesIO()
297            else:
298                bsize = 128 if maxsize is None else min(maxsize, 128)
299                try:
300                    ov, err = _winapi.ReadFile(self._handle, bsize,
301                                                overlapped=True)
302                    try:
303                        if err == _winapi.ERROR_IO_PENDING:
304                            waitres = _winapi.WaitForMultipleObjects(
305                                [ov.event], False, INFINITE)
306                            assert waitres == WAIT_OBJECT_0
307                    except:
308                        ov.cancel()
309                        raise
310                    finally:
311                        nread, err = ov.GetOverlappedResult(True)
312                        if err == 0:
313                            f = io.BytesIO()
314                            f.write(ov.getbuffer())
315                            return f
316                        elif err == _winapi.ERROR_MORE_DATA:
317                            return self._get_more_data(ov, maxsize)
318                except OSError as e:
319                    if e.winerror == _winapi.ERROR_BROKEN_PIPE:
320                        raise EOFError
321                    else:
322                        raise
323            raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
324
325        def _poll(self, timeout):
326            if (self._got_empty_message or
327                        _winapi.PeekNamedPipe(self._handle)[0] != 0):
328                return True
329            return bool(wait([self], timeout))
330
331        def _get_more_data(self, ov, maxsize):
332            buf = ov.getbuffer()
333            f = io.BytesIO()
334            f.write(buf)
335            left = _winapi.PeekNamedPipe(self._handle)[1]
336            assert left > 0
337            if maxsize is not None and len(buf) + left > maxsize:
338                self._bad_message_length()
339            ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
340            rbytes, err = ov.GetOverlappedResult(True)
341            assert err == 0
342            assert rbytes == left
343            f.write(ov.getbuffer())
344            return f
345
346
347class Connection(_ConnectionBase):
348    """
349    Connection class based on an arbitrary file descriptor (Unix only), or
350    a socket handle (Windows).
351    """
352
353    if _winapi:
354        def _close(self, _close=_multiprocessing.closesocket):
355            _close(self._handle)
356        _write = _multiprocessing.send
357        _read = _multiprocessing.recv
358    else:
359        def _close(self, _close=os.close):
360            _close(self._handle)
361        _write = os.write
362        _read = os.read
363
364    def _send(self, buf, write=_write):
365        remaining = len(buf)
366        while True:
367            n = write(self._handle, buf)
368            remaining -= n
369            if remaining == 0:
370                break
371            buf = buf[n:]
372
373    def _recv(self, size, read=_read):
374        buf = io.BytesIO()
375        handle = self._handle
376        remaining = size
377        while remaining > 0:
378            chunk = read(handle, remaining)
379            n = len(chunk)
380            if n == 0:
381                if remaining == size:
382                    raise EOFError
383                else:
384                    raise OSError("got end of file during message")
385            buf.write(chunk)
386            remaining -= n
387        return buf
388
389    def _send_bytes(self, buf):
390        n = len(buf)
391        if n > 0x7fffffff:
392            pre_header = struct.pack("!i", -1)
393            header = struct.pack("!Q", n)
394            self._send(pre_header)
395            self._send(header)
396            self._send(buf)
397        else:
398            # For wire compatibility with 3.7 and lower
399            header = struct.pack("!i", n)
400            if n > 16384:
401                # The payload is large so Nagle's algorithm won't be triggered
402                # and we'd better avoid the cost of concatenation.
403                self._send(header)
404                self._send(buf)
405            else:
406                # Issue #20540: concatenate before sending, to avoid delays due
407                # to Nagle's algorithm on a TCP socket.
408                # Also note we want to avoid sending a 0-length buffer separately,
409                # to avoid "broken pipe" errors if the other end closed the pipe.
410                self._send(header + buf)
411
412    def _recv_bytes(self, maxsize=None):
413        buf = self._recv(4)
414        size, = struct.unpack("!i", buf.getvalue())
415        if size == -1:
416            buf = self._recv(8)
417            size, = struct.unpack("!Q", buf.getvalue())
418        if maxsize is not None and size > maxsize:
419            return None
420        return self._recv(size)
421
422    def _poll(self, timeout):
423        r = wait([self], timeout)
424        return bool(r)
425
426
427#
428# Public functions
429#
430
431class Listener(object):
432    '''
433    Returns a listener object.
434
435    This is a wrapper for a bound socket which is 'listening' for
436    connections, or for a Windows named pipe.
437    '''
438    def __init__(self, address=None, family=None, backlog=1, authkey=None):
439        family = family or (address and address_type(address)) \
440                 or default_family
441        address = address or arbitrary_address(family)
442
443        _validate_family(family)
444        if family == 'AF_PIPE':
445            self._listener = PipeListener(address, backlog)
446        else:
447            self._listener = SocketListener(address, family, backlog)
448
449        if authkey is not None and not isinstance(authkey, bytes):
450            raise TypeError('authkey should be a byte string')
451
452        self._authkey = authkey
453
454    def accept(self):
455        '''
456        Accept a connection on the bound socket or named pipe of `self`.
457
458        Returns a `Connection` object.
459        '''
460        if self._listener is None:
461            raise OSError('listener is closed')
462        c = self._listener.accept()
463        if self._authkey:
464            deliver_challenge(c, self._authkey)
465            answer_challenge(c, self._authkey)
466        return c
467
468    def close(self):
469        '''
470        Close the bound socket or named pipe of `self`.
471        '''
472        listener = self._listener
473        if listener is not None:
474            self._listener = None
475            listener.close()
476
477    @property
478    def address(self):
479        return self._listener._address
480
481    @property
482    def last_accepted(self):
483        return self._listener._last_accepted
484
485    def __enter__(self):
486        return self
487
488    def __exit__(self, exc_type, exc_value, exc_tb):
489        self.close()
490
491
492def Client(address, family=None, authkey=None):
493    '''
494    Returns a connection to the address of a `Listener`
495    '''
496    family = family or address_type(address)
497    _validate_family(family)
498    if family == 'AF_PIPE':
499        c = PipeClient(address)
500    else:
501        c = SocketClient(address)
502
503    if authkey is not None and not isinstance(authkey, bytes):
504        raise TypeError('authkey should be a byte string')
505
506    if authkey is not None:
507        answer_challenge(c, authkey)
508        deliver_challenge(c, authkey)
509
510    return c
511
512
513if sys.platform != 'win32':
514
515    def Pipe(duplex=True):
516        '''
517        Returns pair of connection objects at either end of a pipe
518        '''
519        if duplex:
520            s1, s2 = socket.socketpair()
521            s1.setblocking(True)
522            s2.setblocking(True)
523            c1 = Connection(s1.detach())
524            c2 = Connection(s2.detach())
525        else:
526            fd1, fd2 = os.pipe()
527            c1 = Connection(fd1, writable=False)
528            c2 = Connection(fd2, readable=False)
529
530        return c1, c2
531
532else:
533
534    def Pipe(duplex=True):
535        '''
536        Returns pair of connection objects at either end of a pipe
537        '''
538        address = arbitrary_address('AF_PIPE')
539        if duplex:
540            openmode = _winapi.PIPE_ACCESS_DUPLEX
541            access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
542            obsize, ibsize = BUFSIZE, BUFSIZE
543        else:
544            openmode = _winapi.PIPE_ACCESS_INBOUND
545            access = _winapi.GENERIC_WRITE
546            obsize, ibsize = 0, BUFSIZE
547
548        h1 = _winapi.CreateNamedPipe(
549            address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
550            _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
551            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
552            _winapi.PIPE_WAIT,
553            1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
554            # default security descriptor: the handle cannot be inherited
555            _winapi.NULL
556            )
557        h2 = _winapi.CreateFile(
558            address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
559            _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
560            )
561        _winapi.SetNamedPipeHandleState(
562            h2, _winapi.PIPE_READMODE_MESSAGE, None, None
563            )
564
565        overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
566        _, err = overlapped.GetOverlappedResult(True)
567        assert err == 0
568
569        c1 = PipeConnection(h1, writable=duplex)
570        c2 = PipeConnection(h2, readable=duplex)
571
572        return c1, c2
573
574#
575# Definitions for connections based on sockets
576#
577
578class SocketListener(object):
579    '''
580    Representation of a socket which is bound to an address and listening
581    '''
582    def __init__(self, address, family, backlog=1):
583        self._socket = socket.socket(getattr(socket, family))
584        try:
585            # SO_REUSEADDR has different semantics on Windows (issue #2550).
586            if os.name == 'posix':
587                self._socket.setsockopt(socket.SOL_SOCKET,
588                                        socket.SO_REUSEADDR, 1)
589            self._socket.setblocking(True)
590            self._socket.bind(address)
591            self._socket.listen(backlog)
592            self._address = self._socket.getsockname()
593        except OSError:
594            self._socket.close()
595            raise
596        self._family = family
597        self._last_accepted = None
598
599        if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address):
600            # Linux abstract socket namespaces do not need to be explicitly unlinked
601            self._unlink = util.Finalize(
602                self, os.unlink, args=(address,), exitpriority=0
603                )
604        else:
605            self._unlink = None
606
607    def accept(self):
608        s, self._last_accepted = self._socket.accept()
609        s.setblocking(True)
610        return Connection(s.detach())
611
612    def close(self):
613        try:
614            self._socket.close()
615        finally:
616            unlink = self._unlink
617            if unlink is not None:
618                self._unlink = None
619                unlink()
620
621
622def SocketClient(address):
623    '''
624    Return a connection object connected to the socket given by `address`
625    '''
626    family = address_type(address)
627    with socket.socket( getattr(socket, family) ) as s:
628        s.setblocking(True)
629        s.connect(address)
630        return Connection(s.detach())
631
632#
633# Definitions for connections based on named pipes
634#
635
636if sys.platform == 'win32':
637
638    class PipeListener(object):
639        '''
640        Representation of a named pipe
641        '''
642        def __init__(self, address, backlog=None):
643            self._address = address
644            self._handle_queue = [self._new_handle(first=True)]
645
646            self._last_accepted = None
647            util.sub_debug('listener created with address=%r', self._address)
648            self.close = util.Finalize(
649                self, PipeListener._finalize_pipe_listener,
650                args=(self._handle_queue, self._address), exitpriority=0
651                )
652
653        def _new_handle(self, first=False):
654            flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
655            if first:
656                flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
657            return _winapi.CreateNamedPipe(
658                self._address, flags,
659                _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
660                _winapi.PIPE_WAIT,
661                _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
662                _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
663                )
664
665        def accept(self):
666            self._handle_queue.append(self._new_handle())
667            handle = self._handle_queue.pop(0)
668            try:
669                ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
670            except OSError as e:
671                if e.winerror != _winapi.ERROR_NO_DATA:
672                    raise
673                # ERROR_NO_DATA can occur if a client has already connected,
674                # written data and then disconnected -- see Issue 14725.
675            else:
676                try:
677                    res = _winapi.WaitForMultipleObjects(
678                        [ov.event], False, INFINITE)
679                except:
680                    ov.cancel()
681                    _winapi.CloseHandle(handle)
682                    raise
683                finally:
684                    _, err = ov.GetOverlappedResult(True)
685                    assert err == 0
686            return PipeConnection(handle)
687
688        @staticmethod
689        def _finalize_pipe_listener(queue, address):
690            util.sub_debug('closing listener with address=%r', address)
691            for handle in queue:
692                _winapi.CloseHandle(handle)
693
694    def PipeClient(address):
695        '''
696        Return a connection object connected to the pipe given by `address`
697        '''
698        t = _init_timeout()
699        while 1:
700            try:
701                _winapi.WaitNamedPipe(address, 1000)
702                h = _winapi.CreateFile(
703                    address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
704                    0, _winapi.NULL, _winapi.OPEN_EXISTING,
705                    _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
706                    )
707            except OSError as e:
708                if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
709                                      _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
710                    raise
711            else:
712                break
713        else:
714            raise
715
716        _winapi.SetNamedPipeHandleState(
717            h, _winapi.PIPE_READMODE_MESSAGE, None, None
718            )
719        return PipeConnection(h)
720
721#
722# Authentication stuff
723#
724
725MESSAGE_LENGTH = 20
726
727CHALLENGE = b'#CHALLENGE#'
728WELCOME = b'#WELCOME#'
729FAILURE = b'#FAILURE#'
730
731def deliver_challenge(connection, authkey):
732    import hmac
733    if not isinstance(authkey, bytes):
734        raise ValueError(
735            "Authkey must be bytes, not {0!s}".format(type(authkey)))
736    message = os.urandom(MESSAGE_LENGTH)
737    connection.send_bytes(CHALLENGE + message)
738    digest = hmac.new(authkey, message, 'md5').digest()
739    response = connection.recv_bytes(256)        # reject large message
740    if response == digest:
741        connection.send_bytes(WELCOME)
742    else:
743        connection.send_bytes(FAILURE)
744        raise AuthenticationError('digest received was wrong')
745
746def answer_challenge(connection, authkey):
747    import hmac
748    if not isinstance(authkey, bytes):
749        raise ValueError(
750            "Authkey must be bytes, not {0!s}".format(type(authkey)))
751    message = connection.recv_bytes(256)         # reject large message
752    assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
753    message = message[len(CHALLENGE):]
754    digest = hmac.new(authkey, message, 'md5').digest()
755    connection.send_bytes(digest)
756    response = connection.recv_bytes(256)        # reject large message
757    if response != WELCOME:
758        raise AuthenticationError('digest sent was rejected')
759
760#
761# Support for using xmlrpclib for serialization
762#
763
764class ConnectionWrapper(object):
765    def __init__(self, conn, dumps, loads):
766        self._conn = conn
767        self._dumps = dumps
768        self._loads = loads
769        for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
770            obj = getattr(conn, attr)
771            setattr(self, attr, obj)
772    def send(self, obj):
773        s = self._dumps(obj)
774        self._conn.send_bytes(s)
775    def recv(self):
776        s = self._conn.recv_bytes()
777        return self._loads(s)
778
779def _xml_dumps(obj):
780    return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
781
782def _xml_loads(s):
783    (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
784    return obj
785
786class XmlListener(Listener):
787    def accept(self):
788        global xmlrpclib
789        import xmlrpc.client as xmlrpclib
790        obj = Listener.accept(self)
791        return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
792
793def XmlClient(*args, **kwds):
794    global xmlrpclib
795    import xmlrpc.client as xmlrpclib
796    return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
797
798#
799# Wait
800#
801
802if sys.platform == 'win32':
803
804    def _exhaustive_wait(handles, timeout):
805        # Return ALL handles which are currently signalled.  (Only
806        # returning the first signalled might create starvation issues.)
807        L = list(handles)
808        ready = []
809        while L:
810            res = _winapi.WaitForMultipleObjects(L, False, timeout)
811            if res == WAIT_TIMEOUT:
812                break
813            elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
814                res -= WAIT_OBJECT_0
815            elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
816                res -= WAIT_ABANDONED_0
817            else:
818                raise RuntimeError('Should not get here')
819            ready.append(L[res])
820            L = L[res+1:]
821            timeout = 0
822        return ready
823
824    _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
825
826    def wait(object_list, timeout=None):
827        '''
828        Wait till an object in object_list is ready/readable.
829
830        Returns list of those objects in object_list which are ready/readable.
831        '''
832        if timeout is None:
833            timeout = INFINITE
834        elif timeout < 0:
835            timeout = 0
836        else:
837            timeout = int(timeout * 1000 + 0.5)
838
839        object_list = list(object_list)
840        waithandle_to_obj = {}
841        ov_list = []
842        ready_objects = set()
843        ready_handles = set()
844
845        try:
846            for o in object_list:
847                try:
848                    fileno = getattr(o, 'fileno')
849                except AttributeError:
850                    waithandle_to_obj[o.__index__()] = o
851                else:
852                    # start an overlapped read of length zero
853                    try:
854                        ov, err = _winapi.ReadFile(fileno(), 0, True)
855                    except OSError as e:
856                        ov, err = None, e.winerror
857                        if err not in _ready_errors:
858                            raise
859                    if err == _winapi.ERROR_IO_PENDING:
860                        ov_list.append(ov)
861                        waithandle_to_obj[ov.event] = o
862                    else:
863                        # If o.fileno() is an overlapped pipe handle and
864                        # err == 0 then there is a zero length message
865                        # in the pipe, but it HAS NOT been consumed...
866                        if ov and sys.getwindowsversion()[:2] >= (6, 2):
867                            # ... except on Windows 8 and later, where
868                            # the message HAS been consumed.
869                            try:
870                                _, err = ov.GetOverlappedResult(False)
871                            except OSError as e:
872                                err = e.winerror
873                            if not err and hasattr(o, '_got_empty_message'):
874                                o._got_empty_message = True
875                        ready_objects.add(o)
876                        timeout = 0
877
878            ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
879        finally:
880            # request that overlapped reads stop
881            for ov in ov_list:
882                ov.cancel()
883
884            # wait for all overlapped reads to stop
885            for ov in ov_list:
886                try:
887                    _, err = ov.GetOverlappedResult(True)
888                except OSError as e:
889                    err = e.winerror
890                    if err not in _ready_errors:
891                        raise
892                if err != _winapi.ERROR_OPERATION_ABORTED:
893                    o = waithandle_to_obj[ov.event]
894                    ready_objects.add(o)
895                    if err == 0:
896                        # If o.fileno() is an overlapped pipe handle then
897                        # a zero length message HAS been consumed.
898                        if hasattr(o, '_got_empty_message'):
899                            o._got_empty_message = True
900
901        ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
902        return [o for o in object_list if o in ready_objects]
903
904else:
905
906    import selectors
907
908    # poll/select have the advantage of not requiring any extra file
909    # descriptor, contrarily to epoll/kqueue (also, they require a single
910    # syscall).
911    if hasattr(selectors, 'PollSelector'):
912        _WaitSelector = selectors.PollSelector
913    else:
914        _WaitSelector = selectors.SelectSelector
915
916    def wait(object_list, timeout=None):
917        '''
918        Wait till an object in object_list is ready/readable.
919
920        Returns list of those objects in object_list which are ready/readable.
921        '''
922        with _WaitSelector() as selector:
923            for obj in object_list:
924                selector.register(obj, selectors.EVENT_READ)
925
926            if timeout is not None:
927                deadline = time.monotonic() + timeout
928
929            while True:
930                ready = selector.select(timeout)
931                if ready:
932                    return [key.fileobj for (key, events) in ready]
933                else:
934                    if timeout is not None:
935                        timeout = deadline - time.monotonic()
936                        if timeout < 0:
937                            return ready
938
939#
940# Make connection and socket objects shareable if possible
941#
942
943if sys.platform == 'win32':
944    def reduce_connection(conn):
945        handle = conn.fileno()
946        with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
947            from . import resource_sharer
948            ds = resource_sharer.DupSocket(s)
949            return rebuild_connection, (ds, conn.readable, conn.writable)
950    def rebuild_connection(ds, readable, writable):
951        sock = ds.detach()
952        return Connection(sock.detach(), readable, writable)
953    reduction.register(Connection, reduce_connection)
954
955    def reduce_pipe_connection(conn):
956        access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
957                  (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
958        dh = reduction.DupHandle(conn.fileno(), access)
959        return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
960    def rebuild_pipe_connection(dh, readable, writable):
961        handle = dh.detach()
962        return PipeConnection(handle, readable, writable)
963    reduction.register(PipeConnection, reduce_pipe_connection)
964
965else:
966    def reduce_connection(conn):
967        df = reduction.DupFd(conn.fileno())
968        return rebuild_connection, (df, conn.readable, conn.writable)
969    def rebuild_connection(df, readable, writable):
970        fd = df.detach()
971        return Connection(fd, readable, writable)
972    reduction.register(Connection, reduce_connection)
973