1# 2# A higher level module for using sockets (or Windows named pipes) 3# 4# multiprocessing/connection.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] 11 12import io 13import os 14import sys 15import socket 16import struct 17import time 18import tempfile 19import itertools 20 21import _multiprocessing 22 23from . import util 24 25from . import AuthenticationError, BufferTooShort 26from .context import reduction 27_ForkingPickler = reduction.ForkingPickler 28 29try: 30 import _winapi 31 from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE 32except ImportError: 33 if sys.platform == 'win32': 34 raise 35 _winapi = None 36 37# 38# 39# 40 41BUFSIZE = 8192 42# A very generous timeout when it comes to local connections... 43CONNECTION_TIMEOUT = 20. 44 45_mmap_counter = itertools.count() 46 47default_family = 'AF_INET' 48families = ['AF_INET'] 49 50if hasattr(socket, 'AF_UNIX'): 51 default_family = 'AF_UNIX' 52 families += ['AF_UNIX'] 53 54if sys.platform == 'win32': 55 default_family = 'AF_PIPE' 56 families += ['AF_PIPE'] 57 58 59def _init_timeout(timeout=CONNECTION_TIMEOUT): 60 return time.monotonic() + timeout 61 62def _check_timeout(t): 63 return time.monotonic() > t 64 65# 66# 67# 68 69def arbitrary_address(family): 70 ''' 71 Return an arbitrary free address for the given family 72 ''' 73 if family == 'AF_INET': 74 return ('localhost', 0) 75 elif family == 'AF_UNIX': 76 return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) 77 elif family == 'AF_PIPE': 78 return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % 79 (os.getpid(), next(_mmap_counter)), dir="") 80 else: 81 raise ValueError('unrecognized family') 82 83def _validate_family(family): 84 ''' 85 Checks if the family is valid for the current environment. 86 ''' 87 if sys.platform != 'win32' and family == 'AF_PIPE': 88 raise ValueError('Family %s is not recognized.' % family) 89 90 if sys.platform == 'win32' and family == 'AF_UNIX': 91 # double check 92 if not hasattr(socket, family): 93 raise ValueError('Family %s is not recognized.' % family) 94 95def address_type(address): 96 ''' 97 Return the types of the address 98 99 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' 100 ''' 101 if type(address) == tuple: 102 return 'AF_INET' 103 elif type(address) is str and address.startswith('\\\\'): 104 return 'AF_PIPE' 105 elif type(address) is str or util.is_abstract_socket_namespace(address): 106 return 'AF_UNIX' 107 else: 108 raise ValueError('address type of %r unrecognized' % address) 109 110# 111# Connection classes 112# 113 114class _ConnectionBase: 115 _handle = None 116 117 def __init__(self, handle, readable=True, writable=True): 118 handle = handle.__index__() 119 if handle < 0: 120 raise ValueError("invalid handle") 121 if not readable and not writable: 122 raise ValueError( 123 "at least one of `readable` and `writable` must be True") 124 self._handle = handle 125 self._readable = readable 126 self._writable = writable 127 128 # XXX should we use util.Finalize instead of a __del__? 129 130 def __del__(self): 131 if self._handle is not None: 132 self._close() 133 134 def _check_closed(self): 135 if self._handle is None: 136 raise OSError("handle is closed") 137 138 def _check_readable(self): 139 if not self._readable: 140 raise OSError("connection is write-only") 141 142 def _check_writable(self): 143 if not self._writable: 144 raise OSError("connection is read-only") 145 146 def _bad_message_length(self): 147 if self._writable: 148 self._readable = False 149 else: 150 self.close() 151 raise OSError("bad message length") 152 153 @property 154 def closed(self): 155 """True if the connection is closed""" 156 return self._handle is None 157 158 @property 159 def readable(self): 160 """True if the connection is readable""" 161 return self._readable 162 163 @property 164 def writable(self): 165 """True if the connection is writable""" 166 return self._writable 167 168 def fileno(self): 169 """File descriptor or handle of the connection""" 170 self._check_closed() 171 return self._handle 172 173 def close(self): 174 """Close the connection""" 175 if self._handle is not None: 176 try: 177 self._close() 178 finally: 179 self._handle = None 180 181 def send_bytes(self, buf, offset=0, size=None): 182 """Send the bytes data from a bytes-like object""" 183 self._check_closed() 184 self._check_writable() 185 m = memoryview(buf) 186 if m.itemsize > 1: 187 m = m.cast('B') 188 n = m.nbytes 189 if offset < 0: 190 raise ValueError("offset is negative") 191 if n < offset: 192 raise ValueError("buffer length < offset") 193 if size is None: 194 size = n - offset 195 elif size < 0: 196 raise ValueError("size is negative") 197 elif offset + size > n: 198 raise ValueError("buffer length < offset + size") 199 self._send_bytes(m[offset:offset + size]) 200 201 def send(self, obj): 202 """Send a (picklable) object""" 203 self._check_closed() 204 self._check_writable() 205 self._send_bytes(_ForkingPickler.dumps(obj)) 206 207 def recv_bytes(self, maxlength=None): 208 """ 209 Receive bytes data as a bytes object. 210 """ 211 self._check_closed() 212 self._check_readable() 213 if maxlength is not None and maxlength < 0: 214 raise ValueError("negative maxlength") 215 buf = self._recv_bytes(maxlength) 216 if buf is None: 217 self._bad_message_length() 218 return buf.getvalue() 219 220 def recv_bytes_into(self, buf, offset=0): 221 """ 222 Receive bytes data into a writeable bytes-like object. 223 Return the number of bytes read. 224 """ 225 self._check_closed() 226 self._check_readable() 227 with memoryview(buf) as m: 228 # Get bytesize of arbitrary buffer 229 itemsize = m.itemsize 230 bytesize = itemsize * len(m) 231 if offset < 0: 232 raise ValueError("negative offset") 233 elif offset > bytesize: 234 raise ValueError("offset too large") 235 result = self._recv_bytes() 236 size = result.tell() 237 if bytesize < offset + size: 238 raise BufferTooShort(result.getvalue()) 239 # Message can fit in dest 240 result.seek(0) 241 result.readinto(m[offset // itemsize : 242 (offset + size) // itemsize]) 243 return size 244 245 def recv(self): 246 """Receive a (picklable) object""" 247 self._check_closed() 248 self._check_readable() 249 buf = self._recv_bytes() 250 return _ForkingPickler.loads(buf.getbuffer()) 251 252 def poll(self, timeout=0.0): 253 """Whether there is any input available to be read""" 254 self._check_closed() 255 self._check_readable() 256 return self._poll(timeout) 257 258 def __enter__(self): 259 return self 260 261 def __exit__(self, exc_type, exc_value, exc_tb): 262 self.close() 263 264 265if _winapi: 266 267 class PipeConnection(_ConnectionBase): 268 """ 269 Connection class based on a Windows named pipe. 270 Overlapped I/O is used, so the handles must have been created 271 with FILE_FLAG_OVERLAPPED. 272 """ 273 _got_empty_message = False 274 275 def _close(self, _CloseHandle=_winapi.CloseHandle): 276 _CloseHandle(self._handle) 277 278 def _send_bytes(self, buf): 279 ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) 280 try: 281 if err == _winapi.ERROR_IO_PENDING: 282 waitres = _winapi.WaitForMultipleObjects( 283 [ov.event], False, INFINITE) 284 assert waitres == WAIT_OBJECT_0 285 except: 286 ov.cancel() 287 raise 288 finally: 289 nwritten, err = ov.GetOverlappedResult(True) 290 assert err == 0 291 assert nwritten == len(buf) 292 293 def _recv_bytes(self, maxsize=None): 294 if self._got_empty_message: 295 self._got_empty_message = False 296 return io.BytesIO() 297 else: 298 bsize = 128 if maxsize is None else min(maxsize, 128) 299 try: 300 ov, err = _winapi.ReadFile(self._handle, bsize, 301 overlapped=True) 302 try: 303 if err == _winapi.ERROR_IO_PENDING: 304 waitres = _winapi.WaitForMultipleObjects( 305 [ov.event], False, INFINITE) 306 assert waitres == WAIT_OBJECT_0 307 except: 308 ov.cancel() 309 raise 310 finally: 311 nread, err = ov.GetOverlappedResult(True) 312 if err == 0: 313 f = io.BytesIO() 314 f.write(ov.getbuffer()) 315 return f 316 elif err == _winapi.ERROR_MORE_DATA: 317 return self._get_more_data(ov, maxsize) 318 except OSError as e: 319 if e.winerror == _winapi.ERROR_BROKEN_PIPE: 320 raise EOFError 321 else: 322 raise 323 raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") 324 325 def _poll(self, timeout): 326 if (self._got_empty_message or 327 _winapi.PeekNamedPipe(self._handle)[0] != 0): 328 return True 329 return bool(wait([self], timeout)) 330 331 def _get_more_data(self, ov, maxsize): 332 buf = ov.getbuffer() 333 f = io.BytesIO() 334 f.write(buf) 335 left = _winapi.PeekNamedPipe(self._handle)[1] 336 assert left > 0 337 if maxsize is not None and len(buf) + left > maxsize: 338 self._bad_message_length() 339 ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) 340 rbytes, err = ov.GetOverlappedResult(True) 341 assert err == 0 342 assert rbytes == left 343 f.write(ov.getbuffer()) 344 return f 345 346 347class Connection(_ConnectionBase): 348 """ 349 Connection class based on an arbitrary file descriptor (Unix only), or 350 a socket handle (Windows). 351 """ 352 353 if _winapi: 354 def _close(self, _close=_multiprocessing.closesocket): 355 _close(self._handle) 356 _write = _multiprocessing.send 357 _read = _multiprocessing.recv 358 else: 359 def _close(self, _close=os.close): 360 _close(self._handle) 361 _write = os.write 362 _read = os.read 363 364 def _send(self, buf, write=_write): 365 remaining = len(buf) 366 while True: 367 n = write(self._handle, buf) 368 remaining -= n 369 if remaining == 0: 370 break 371 buf = buf[n:] 372 373 def _recv(self, size, read=_read): 374 buf = io.BytesIO() 375 handle = self._handle 376 remaining = size 377 while remaining > 0: 378 chunk = read(handle, remaining) 379 n = len(chunk) 380 if n == 0: 381 if remaining == size: 382 raise EOFError 383 else: 384 raise OSError("got end of file during message") 385 buf.write(chunk) 386 remaining -= n 387 return buf 388 389 def _send_bytes(self, buf): 390 n = len(buf) 391 if n > 0x7fffffff: 392 pre_header = struct.pack("!i", -1) 393 header = struct.pack("!Q", n) 394 self._send(pre_header) 395 self._send(header) 396 self._send(buf) 397 else: 398 # For wire compatibility with 3.7 and lower 399 header = struct.pack("!i", n) 400 if n > 16384: 401 # The payload is large so Nagle's algorithm won't be triggered 402 # and we'd better avoid the cost of concatenation. 403 self._send(header) 404 self._send(buf) 405 else: 406 # Issue #20540: concatenate before sending, to avoid delays due 407 # to Nagle's algorithm on a TCP socket. 408 # Also note we want to avoid sending a 0-length buffer separately, 409 # to avoid "broken pipe" errors if the other end closed the pipe. 410 self._send(header + buf) 411 412 def _recv_bytes(self, maxsize=None): 413 buf = self._recv(4) 414 size, = struct.unpack("!i", buf.getvalue()) 415 if size == -1: 416 buf = self._recv(8) 417 size, = struct.unpack("!Q", buf.getvalue()) 418 if maxsize is not None and size > maxsize: 419 return None 420 return self._recv(size) 421 422 def _poll(self, timeout): 423 r = wait([self], timeout) 424 return bool(r) 425 426 427# 428# Public functions 429# 430 431class Listener(object): 432 ''' 433 Returns a listener object. 434 435 This is a wrapper for a bound socket which is 'listening' for 436 connections, or for a Windows named pipe. 437 ''' 438 def __init__(self, address=None, family=None, backlog=1, authkey=None): 439 family = family or (address and address_type(address)) \ 440 or default_family 441 address = address or arbitrary_address(family) 442 443 _validate_family(family) 444 if family == 'AF_PIPE': 445 self._listener = PipeListener(address, backlog) 446 else: 447 self._listener = SocketListener(address, family, backlog) 448 449 if authkey is not None and not isinstance(authkey, bytes): 450 raise TypeError('authkey should be a byte string') 451 452 self._authkey = authkey 453 454 def accept(self): 455 ''' 456 Accept a connection on the bound socket or named pipe of `self`. 457 458 Returns a `Connection` object. 459 ''' 460 if self._listener is None: 461 raise OSError('listener is closed') 462 c = self._listener.accept() 463 if self._authkey: 464 deliver_challenge(c, self._authkey) 465 answer_challenge(c, self._authkey) 466 return c 467 468 def close(self): 469 ''' 470 Close the bound socket or named pipe of `self`. 471 ''' 472 listener = self._listener 473 if listener is not None: 474 self._listener = None 475 listener.close() 476 477 @property 478 def address(self): 479 return self._listener._address 480 481 @property 482 def last_accepted(self): 483 return self._listener._last_accepted 484 485 def __enter__(self): 486 return self 487 488 def __exit__(self, exc_type, exc_value, exc_tb): 489 self.close() 490 491 492def Client(address, family=None, authkey=None): 493 ''' 494 Returns a connection to the address of a `Listener` 495 ''' 496 family = family or address_type(address) 497 _validate_family(family) 498 if family == 'AF_PIPE': 499 c = PipeClient(address) 500 else: 501 c = SocketClient(address) 502 503 if authkey is not None and not isinstance(authkey, bytes): 504 raise TypeError('authkey should be a byte string') 505 506 if authkey is not None: 507 answer_challenge(c, authkey) 508 deliver_challenge(c, authkey) 509 510 return c 511 512 513if sys.platform != 'win32': 514 515 def Pipe(duplex=True): 516 ''' 517 Returns pair of connection objects at either end of a pipe 518 ''' 519 if duplex: 520 s1, s2 = socket.socketpair() 521 s1.setblocking(True) 522 s2.setblocking(True) 523 c1 = Connection(s1.detach()) 524 c2 = Connection(s2.detach()) 525 else: 526 fd1, fd2 = os.pipe() 527 c1 = Connection(fd1, writable=False) 528 c2 = Connection(fd2, readable=False) 529 530 return c1, c2 531 532else: 533 534 def Pipe(duplex=True): 535 ''' 536 Returns pair of connection objects at either end of a pipe 537 ''' 538 address = arbitrary_address('AF_PIPE') 539 if duplex: 540 openmode = _winapi.PIPE_ACCESS_DUPLEX 541 access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE 542 obsize, ibsize = BUFSIZE, BUFSIZE 543 else: 544 openmode = _winapi.PIPE_ACCESS_INBOUND 545 access = _winapi.GENERIC_WRITE 546 obsize, ibsize = 0, BUFSIZE 547 548 h1 = _winapi.CreateNamedPipe( 549 address, openmode | _winapi.FILE_FLAG_OVERLAPPED | 550 _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, 551 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 552 _winapi.PIPE_WAIT, 553 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, 554 # default security descriptor: the handle cannot be inherited 555 _winapi.NULL 556 ) 557 h2 = _winapi.CreateFile( 558 address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 559 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 560 ) 561 _winapi.SetNamedPipeHandleState( 562 h2, _winapi.PIPE_READMODE_MESSAGE, None, None 563 ) 564 565 overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) 566 _, err = overlapped.GetOverlappedResult(True) 567 assert err == 0 568 569 c1 = PipeConnection(h1, writable=duplex) 570 c2 = PipeConnection(h2, readable=duplex) 571 572 return c1, c2 573 574# 575# Definitions for connections based on sockets 576# 577 578class SocketListener(object): 579 ''' 580 Representation of a socket which is bound to an address and listening 581 ''' 582 def __init__(self, address, family, backlog=1): 583 self._socket = socket.socket(getattr(socket, family)) 584 try: 585 # SO_REUSEADDR has different semantics on Windows (issue #2550). 586 if os.name == 'posix': 587 self._socket.setsockopt(socket.SOL_SOCKET, 588 socket.SO_REUSEADDR, 1) 589 self._socket.setblocking(True) 590 self._socket.bind(address) 591 self._socket.listen(backlog) 592 self._address = self._socket.getsockname() 593 except OSError: 594 self._socket.close() 595 raise 596 self._family = family 597 self._last_accepted = None 598 599 if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address): 600 # Linux abstract socket namespaces do not need to be explicitly unlinked 601 self._unlink = util.Finalize( 602 self, os.unlink, args=(address,), exitpriority=0 603 ) 604 else: 605 self._unlink = None 606 607 def accept(self): 608 s, self._last_accepted = self._socket.accept() 609 s.setblocking(True) 610 return Connection(s.detach()) 611 612 def close(self): 613 try: 614 self._socket.close() 615 finally: 616 unlink = self._unlink 617 if unlink is not None: 618 self._unlink = None 619 unlink() 620 621 622def SocketClient(address): 623 ''' 624 Return a connection object connected to the socket given by `address` 625 ''' 626 family = address_type(address) 627 with socket.socket( getattr(socket, family) ) as s: 628 s.setblocking(True) 629 s.connect(address) 630 return Connection(s.detach()) 631 632# 633# Definitions for connections based on named pipes 634# 635 636if sys.platform == 'win32': 637 638 class PipeListener(object): 639 ''' 640 Representation of a named pipe 641 ''' 642 def __init__(self, address, backlog=None): 643 self._address = address 644 self._handle_queue = [self._new_handle(first=True)] 645 646 self._last_accepted = None 647 util.sub_debug('listener created with address=%r', self._address) 648 self.close = util.Finalize( 649 self, PipeListener._finalize_pipe_listener, 650 args=(self._handle_queue, self._address), exitpriority=0 651 ) 652 653 def _new_handle(self, first=False): 654 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 655 if first: 656 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 657 return _winapi.CreateNamedPipe( 658 self._address, flags, 659 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 660 _winapi.PIPE_WAIT, 661 _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 662 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL 663 ) 664 665 def accept(self): 666 self._handle_queue.append(self._new_handle()) 667 handle = self._handle_queue.pop(0) 668 try: 669 ov = _winapi.ConnectNamedPipe(handle, overlapped=True) 670 except OSError as e: 671 if e.winerror != _winapi.ERROR_NO_DATA: 672 raise 673 # ERROR_NO_DATA can occur if a client has already connected, 674 # written data and then disconnected -- see Issue 14725. 675 else: 676 try: 677 res = _winapi.WaitForMultipleObjects( 678 [ov.event], False, INFINITE) 679 except: 680 ov.cancel() 681 _winapi.CloseHandle(handle) 682 raise 683 finally: 684 _, err = ov.GetOverlappedResult(True) 685 assert err == 0 686 return PipeConnection(handle) 687 688 @staticmethod 689 def _finalize_pipe_listener(queue, address): 690 util.sub_debug('closing listener with address=%r', address) 691 for handle in queue: 692 _winapi.CloseHandle(handle) 693 694 def PipeClient(address): 695 ''' 696 Return a connection object connected to the pipe given by `address` 697 ''' 698 t = _init_timeout() 699 while 1: 700 try: 701 _winapi.WaitNamedPipe(address, 1000) 702 h = _winapi.CreateFile( 703 address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 704 0, _winapi.NULL, _winapi.OPEN_EXISTING, 705 _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 706 ) 707 except OSError as e: 708 if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, 709 _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): 710 raise 711 else: 712 break 713 else: 714 raise 715 716 _winapi.SetNamedPipeHandleState( 717 h, _winapi.PIPE_READMODE_MESSAGE, None, None 718 ) 719 return PipeConnection(h) 720 721# 722# Authentication stuff 723# 724 725MESSAGE_LENGTH = 20 726 727CHALLENGE = b'#CHALLENGE#' 728WELCOME = b'#WELCOME#' 729FAILURE = b'#FAILURE#' 730 731def deliver_challenge(connection, authkey): 732 import hmac 733 if not isinstance(authkey, bytes): 734 raise ValueError( 735 "Authkey must be bytes, not {0!s}".format(type(authkey))) 736 message = os.urandom(MESSAGE_LENGTH) 737 connection.send_bytes(CHALLENGE + message) 738 digest = hmac.new(authkey, message, 'md5').digest() 739 response = connection.recv_bytes(256) # reject large message 740 if response == digest: 741 connection.send_bytes(WELCOME) 742 else: 743 connection.send_bytes(FAILURE) 744 raise AuthenticationError('digest received was wrong') 745 746def answer_challenge(connection, authkey): 747 import hmac 748 if not isinstance(authkey, bytes): 749 raise ValueError( 750 "Authkey must be bytes, not {0!s}".format(type(authkey))) 751 message = connection.recv_bytes(256) # reject large message 752 assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message 753 message = message[len(CHALLENGE):] 754 digest = hmac.new(authkey, message, 'md5').digest() 755 connection.send_bytes(digest) 756 response = connection.recv_bytes(256) # reject large message 757 if response != WELCOME: 758 raise AuthenticationError('digest sent was rejected') 759 760# 761# Support for using xmlrpclib for serialization 762# 763 764class ConnectionWrapper(object): 765 def __init__(self, conn, dumps, loads): 766 self._conn = conn 767 self._dumps = dumps 768 self._loads = loads 769 for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): 770 obj = getattr(conn, attr) 771 setattr(self, attr, obj) 772 def send(self, obj): 773 s = self._dumps(obj) 774 self._conn.send_bytes(s) 775 def recv(self): 776 s = self._conn.recv_bytes() 777 return self._loads(s) 778 779def _xml_dumps(obj): 780 return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') 781 782def _xml_loads(s): 783 (obj,), method = xmlrpclib.loads(s.decode('utf-8')) 784 return obj 785 786class XmlListener(Listener): 787 def accept(self): 788 global xmlrpclib 789 import xmlrpc.client as xmlrpclib 790 obj = Listener.accept(self) 791 return ConnectionWrapper(obj, _xml_dumps, _xml_loads) 792 793def XmlClient(*args, **kwds): 794 global xmlrpclib 795 import xmlrpc.client as xmlrpclib 796 return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) 797 798# 799# Wait 800# 801 802if sys.platform == 'win32': 803 804 def _exhaustive_wait(handles, timeout): 805 # Return ALL handles which are currently signalled. (Only 806 # returning the first signalled might create starvation issues.) 807 L = list(handles) 808 ready = [] 809 while L: 810 res = _winapi.WaitForMultipleObjects(L, False, timeout) 811 if res == WAIT_TIMEOUT: 812 break 813 elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): 814 res -= WAIT_OBJECT_0 815 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): 816 res -= WAIT_ABANDONED_0 817 else: 818 raise RuntimeError('Should not get here') 819 ready.append(L[res]) 820 L = L[res+1:] 821 timeout = 0 822 return ready 823 824 _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} 825 826 def wait(object_list, timeout=None): 827 ''' 828 Wait till an object in object_list is ready/readable. 829 830 Returns list of those objects in object_list which are ready/readable. 831 ''' 832 if timeout is None: 833 timeout = INFINITE 834 elif timeout < 0: 835 timeout = 0 836 else: 837 timeout = int(timeout * 1000 + 0.5) 838 839 object_list = list(object_list) 840 waithandle_to_obj = {} 841 ov_list = [] 842 ready_objects = set() 843 ready_handles = set() 844 845 try: 846 for o in object_list: 847 try: 848 fileno = getattr(o, 'fileno') 849 except AttributeError: 850 waithandle_to_obj[o.__index__()] = o 851 else: 852 # start an overlapped read of length zero 853 try: 854 ov, err = _winapi.ReadFile(fileno(), 0, True) 855 except OSError as e: 856 ov, err = None, e.winerror 857 if err not in _ready_errors: 858 raise 859 if err == _winapi.ERROR_IO_PENDING: 860 ov_list.append(ov) 861 waithandle_to_obj[ov.event] = o 862 else: 863 # If o.fileno() is an overlapped pipe handle and 864 # err == 0 then there is a zero length message 865 # in the pipe, but it HAS NOT been consumed... 866 if ov and sys.getwindowsversion()[:2] >= (6, 2): 867 # ... except on Windows 8 and later, where 868 # the message HAS been consumed. 869 try: 870 _, err = ov.GetOverlappedResult(False) 871 except OSError as e: 872 err = e.winerror 873 if not err and hasattr(o, '_got_empty_message'): 874 o._got_empty_message = True 875 ready_objects.add(o) 876 timeout = 0 877 878 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) 879 finally: 880 # request that overlapped reads stop 881 for ov in ov_list: 882 ov.cancel() 883 884 # wait for all overlapped reads to stop 885 for ov in ov_list: 886 try: 887 _, err = ov.GetOverlappedResult(True) 888 except OSError as e: 889 err = e.winerror 890 if err not in _ready_errors: 891 raise 892 if err != _winapi.ERROR_OPERATION_ABORTED: 893 o = waithandle_to_obj[ov.event] 894 ready_objects.add(o) 895 if err == 0: 896 # If o.fileno() is an overlapped pipe handle then 897 # a zero length message HAS been consumed. 898 if hasattr(o, '_got_empty_message'): 899 o._got_empty_message = True 900 901 ready_objects.update(waithandle_to_obj[h] for h in ready_handles) 902 return [o for o in object_list if o in ready_objects] 903 904else: 905 906 import selectors 907 908 # poll/select have the advantage of not requiring any extra file 909 # descriptor, contrarily to epoll/kqueue (also, they require a single 910 # syscall). 911 if hasattr(selectors, 'PollSelector'): 912 _WaitSelector = selectors.PollSelector 913 else: 914 _WaitSelector = selectors.SelectSelector 915 916 def wait(object_list, timeout=None): 917 ''' 918 Wait till an object in object_list is ready/readable. 919 920 Returns list of those objects in object_list which are ready/readable. 921 ''' 922 with _WaitSelector() as selector: 923 for obj in object_list: 924 selector.register(obj, selectors.EVENT_READ) 925 926 if timeout is not None: 927 deadline = time.monotonic() + timeout 928 929 while True: 930 ready = selector.select(timeout) 931 if ready: 932 return [key.fileobj for (key, events) in ready] 933 else: 934 if timeout is not None: 935 timeout = deadline - time.monotonic() 936 if timeout < 0: 937 return ready 938 939# 940# Make connection and socket objects shareable if possible 941# 942 943if sys.platform == 'win32': 944 def reduce_connection(conn): 945 handle = conn.fileno() 946 with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: 947 from . import resource_sharer 948 ds = resource_sharer.DupSocket(s) 949 return rebuild_connection, (ds, conn.readable, conn.writable) 950 def rebuild_connection(ds, readable, writable): 951 sock = ds.detach() 952 return Connection(sock.detach(), readable, writable) 953 reduction.register(Connection, reduce_connection) 954 955 def reduce_pipe_connection(conn): 956 access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | 957 (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) 958 dh = reduction.DupHandle(conn.fileno(), access) 959 return rebuild_pipe_connection, (dh, conn.readable, conn.writable) 960 def rebuild_pipe_connection(dh, readable, writable): 961 handle = dh.detach() 962 return PipeConnection(handle, readable, writable) 963 reduction.register(PipeConnection, reduce_pipe_connection) 964 965else: 966 def reduce_connection(conn): 967 df = reduction.DupFd(conn.fileno()) 968 return rebuild_connection, (df, conn.readable, conn.writable) 969 def rebuild_connection(df, readable, writable): 970 fd = df.detach() 971 return Connection(fd, readable, writable) 972 reduction.register(Connection, reduce_connection) 973