1# 2# Module providing manager classes for dealing 3# with shared objects 4# 5# multiprocessing/managers.py 6# 7# Copyright (c) 2006-2008, R Oudkerk 8# Licensed to PSF under a Contributor Agreement. 9# 10 11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] 12 13# 14# Imports 15# 16 17import sys 18import threading 19import signal 20import array 21import queue 22import time 23import types 24import os 25from os import getpid 26 27from traceback import format_exc 28 29from . import connection 30from .context import reduction, get_spawning_popen, ProcessError 31from . import pool 32from . import process 33from . import util 34from . import get_context 35try: 36 from . import shared_memory 37except ImportError: 38 HAS_SHMEM = False 39else: 40 HAS_SHMEM = True 41 __all__.append('SharedMemoryManager') 42 43# 44# Register some things for pickling 45# 46 47def reduce_array(a): 48 return array.array, (a.typecode, a.tobytes()) 49reduction.register(array.array, reduce_array) 50 51view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] 52def rebuild_as_list(obj): 53 return list, (list(obj),) 54for view_type in view_types: 55 reduction.register(view_type, rebuild_as_list) 56del view_type, view_types 57 58# 59# Type for identifying shared objects 60# 61 62class Token(object): 63 ''' 64 Type to uniquely identify a shared object 65 ''' 66 __slots__ = ('typeid', 'address', 'id') 67 68 def __init__(self, typeid, address, id): 69 (self.typeid, self.address, self.id) = (typeid, address, id) 70 71 def __getstate__(self): 72 return (self.typeid, self.address, self.id) 73 74 def __setstate__(self, state): 75 (self.typeid, self.address, self.id) = state 76 77 def __repr__(self): 78 return '%s(typeid=%r, address=%r, id=%r)' % \ 79 (self.__class__.__name__, self.typeid, self.address, self.id) 80 81# 82# Function for communication with a manager's server process 83# 84 85def dispatch(c, id, methodname, args=(), kwds={}): 86 ''' 87 Send a message to manager using connection `c` and return response 88 ''' 89 c.send((id, methodname, args, kwds)) 90 kind, result = c.recv() 91 if kind == '#RETURN': 92 return result 93 raise convert_to_error(kind, result) 94 95def convert_to_error(kind, result): 96 if kind == '#ERROR': 97 return result 98 elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'): 99 if not isinstance(result, str): 100 raise TypeError( 101 "Result {0!r} (kind '{1}') type is {2}, not str".format( 102 result, kind, type(result))) 103 if kind == '#UNSERIALIZABLE': 104 return RemoteError('Unserializable message: %s\n' % result) 105 else: 106 return RemoteError(result) 107 else: 108 return ValueError('Unrecognized message type {!r}'.format(kind)) 109 110class RemoteError(Exception): 111 def __str__(self): 112 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) 113 114# 115# Functions for finding the method names of an object 116# 117 118def all_methods(obj): 119 ''' 120 Return a list of names of methods of `obj` 121 ''' 122 temp = [] 123 for name in dir(obj): 124 func = getattr(obj, name) 125 if callable(func): 126 temp.append(name) 127 return temp 128 129def public_methods(obj): 130 ''' 131 Return a list of names of methods of `obj` which do not start with '_' 132 ''' 133 return [name for name in all_methods(obj) if name[0] != '_'] 134 135# 136# Server which is run in a process controlled by a manager 137# 138 139class Server(object): 140 ''' 141 Server class which runs in a process controlled by a manager object 142 ''' 143 public = ['shutdown', 'create', 'accept_connection', 'get_methods', 144 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] 145 146 def __init__(self, registry, address, authkey, serializer): 147 if not isinstance(authkey, bytes): 148 raise TypeError( 149 "Authkey {0!r} is type {1!s}, not bytes".format( 150 authkey, type(authkey))) 151 self.registry = registry 152 self.authkey = process.AuthenticationString(authkey) 153 Listener, Client = listener_client[serializer] 154 155 # do authentication later 156 self.listener = Listener(address=address, backlog=16) 157 self.address = self.listener.address 158 159 self.id_to_obj = {'0': (None, ())} 160 self.id_to_refcount = {} 161 self.id_to_local_proxy_obj = {} 162 self.mutex = threading.Lock() 163 164 def serve_forever(self): 165 ''' 166 Run the server forever 167 ''' 168 self.stop_event = threading.Event() 169 process.current_process()._manager_server = self 170 try: 171 accepter = threading.Thread(target=self.accepter) 172 accepter.daemon = True 173 accepter.start() 174 try: 175 while not self.stop_event.is_set(): 176 self.stop_event.wait(1) 177 except (KeyboardInterrupt, SystemExit): 178 pass 179 finally: 180 if sys.stdout != sys.__stdout__: # what about stderr? 181 util.debug('resetting stdout, stderr') 182 sys.stdout = sys.__stdout__ 183 sys.stderr = sys.__stderr__ 184 sys.exit(0) 185 186 def accepter(self): 187 while True: 188 try: 189 c = self.listener.accept() 190 except OSError: 191 continue 192 t = threading.Thread(target=self.handle_request, args=(c,)) 193 t.daemon = True 194 t.start() 195 196 def _handle_request(self, c): 197 request = None 198 try: 199 connection.deliver_challenge(c, self.authkey) 200 connection.answer_challenge(c, self.authkey) 201 request = c.recv() 202 ignore, funcname, args, kwds = request 203 assert funcname in self.public, '%r unrecognized' % funcname 204 func = getattr(self, funcname) 205 except Exception: 206 msg = ('#TRACEBACK', format_exc()) 207 else: 208 try: 209 result = func(c, *args, **kwds) 210 except Exception: 211 msg = ('#TRACEBACK', format_exc()) 212 else: 213 msg = ('#RETURN', result) 214 215 try: 216 c.send(msg) 217 except Exception as e: 218 try: 219 c.send(('#TRACEBACK', format_exc())) 220 except Exception: 221 pass 222 util.info('Failure to send message: %r', msg) 223 util.info(' ... request was %r', request) 224 util.info(' ... exception was %r', e) 225 226 def handle_request(self, conn): 227 ''' 228 Handle a new connection 229 ''' 230 try: 231 self._handle_request(conn) 232 except SystemExit: 233 # Server.serve_client() calls sys.exit(0) on EOF 234 pass 235 finally: 236 conn.close() 237 238 def serve_client(self, conn): 239 ''' 240 Handle requests from the proxies in a particular process/thread 241 ''' 242 util.debug('starting server thread to service %r', 243 threading.current_thread().name) 244 245 recv = conn.recv 246 send = conn.send 247 id_to_obj = self.id_to_obj 248 249 while not self.stop_event.is_set(): 250 251 try: 252 methodname = obj = None 253 request = recv() 254 ident, methodname, args, kwds = request 255 try: 256 obj, exposed, gettypeid = id_to_obj[ident] 257 except KeyError as ke: 258 try: 259 obj, exposed, gettypeid = \ 260 self.id_to_local_proxy_obj[ident] 261 except KeyError: 262 raise ke 263 264 if methodname not in exposed: 265 raise AttributeError( 266 'method %r of %r object is not in exposed=%r' % 267 (methodname, type(obj), exposed) 268 ) 269 270 function = getattr(obj, methodname) 271 272 try: 273 res = function(*args, **kwds) 274 except Exception as e: 275 msg = ('#ERROR', e) 276 else: 277 typeid = gettypeid and gettypeid.get(methodname, None) 278 if typeid: 279 rident, rexposed = self.create(conn, typeid, res) 280 token = Token(typeid, self.address, rident) 281 msg = ('#PROXY', (rexposed, token)) 282 else: 283 msg = ('#RETURN', res) 284 285 except AttributeError: 286 if methodname is None: 287 msg = ('#TRACEBACK', format_exc()) 288 else: 289 try: 290 fallback_func = self.fallback_mapping[methodname] 291 result = fallback_func( 292 self, conn, ident, obj, *args, **kwds 293 ) 294 msg = ('#RETURN', result) 295 except Exception: 296 msg = ('#TRACEBACK', format_exc()) 297 298 except EOFError: 299 util.debug('got EOF -- exiting thread serving %r', 300 threading.current_thread().name) 301 sys.exit(0) 302 303 except Exception: 304 msg = ('#TRACEBACK', format_exc()) 305 306 try: 307 try: 308 send(msg) 309 except Exception: 310 send(('#UNSERIALIZABLE', format_exc())) 311 except Exception as e: 312 util.info('exception in thread serving %r', 313 threading.current_thread().name) 314 util.info(' ... message was %r', msg) 315 util.info(' ... exception was %r', e) 316 conn.close() 317 sys.exit(1) 318 319 def fallback_getvalue(self, conn, ident, obj): 320 return obj 321 322 def fallback_str(self, conn, ident, obj): 323 return str(obj) 324 325 def fallback_repr(self, conn, ident, obj): 326 return repr(obj) 327 328 fallback_mapping = { 329 '__str__':fallback_str, 330 '__repr__':fallback_repr, 331 '#GETVALUE':fallback_getvalue 332 } 333 334 def dummy(self, c): 335 pass 336 337 def debug_info(self, c): 338 ''' 339 Return some info --- useful to spot problems with refcounting 340 ''' 341 # Perhaps include debug info about 'c'? 342 with self.mutex: 343 result = [] 344 keys = list(self.id_to_refcount.keys()) 345 keys.sort() 346 for ident in keys: 347 if ident != '0': 348 result.append(' %s: refcount=%s\n %s' % 349 (ident, self.id_to_refcount[ident], 350 str(self.id_to_obj[ident][0])[:75])) 351 return '\n'.join(result) 352 353 def number_of_objects(self, c): 354 ''' 355 Number of shared objects 356 ''' 357 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' 358 return len(self.id_to_refcount) 359 360 def shutdown(self, c): 361 ''' 362 Shutdown this process 363 ''' 364 try: 365 util.debug('manager received shutdown message') 366 c.send(('#RETURN', None)) 367 except: 368 import traceback 369 traceback.print_exc() 370 finally: 371 self.stop_event.set() 372 373 def create(self, c, typeid, /, *args, **kwds): 374 ''' 375 Create a new shared object and return its id 376 ''' 377 with self.mutex: 378 callable, exposed, method_to_typeid, proxytype = \ 379 self.registry[typeid] 380 381 if callable is None: 382 if kwds or (len(args) != 1): 383 raise ValueError( 384 "Without callable, must have one non-keyword argument") 385 obj = args[0] 386 else: 387 obj = callable(*args, **kwds) 388 389 if exposed is None: 390 exposed = public_methods(obj) 391 if method_to_typeid is not None: 392 if not isinstance(method_to_typeid, dict): 393 raise TypeError( 394 "Method_to_typeid {0!r}: type {1!s}, not dict".format( 395 method_to_typeid, type(method_to_typeid))) 396 exposed = list(exposed) + list(method_to_typeid) 397 398 ident = '%x' % id(obj) # convert to string because xmlrpclib 399 # only has 32 bit signed integers 400 util.debug('%r callable returned object with id %r', typeid, ident) 401 402 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) 403 if ident not in self.id_to_refcount: 404 self.id_to_refcount[ident] = 0 405 406 self.incref(c, ident) 407 return ident, tuple(exposed) 408 409 def get_methods(self, c, token): 410 ''' 411 Return the methods of the shared object indicated by token 412 ''' 413 return tuple(self.id_to_obj[token.id][1]) 414 415 def accept_connection(self, c, name): 416 ''' 417 Spawn a new thread to serve this connection 418 ''' 419 threading.current_thread().name = name 420 c.send(('#RETURN', None)) 421 self.serve_client(c) 422 423 def incref(self, c, ident): 424 with self.mutex: 425 try: 426 self.id_to_refcount[ident] += 1 427 except KeyError as ke: 428 # If no external references exist but an internal (to the 429 # manager) still does and a new external reference is created 430 # from it, restore the manager's tracking of it from the 431 # previously stashed internal ref. 432 if ident in self.id_to_local_proxy_obj: 433 self.id_to_refcount[ident] = 1 434 self.id_to_obj[ident] = \ 435 self.id_to_local_proxy_obj[ident] 436 obj, exposed, gettypeid = self.id_to_obj[ident] 437 util.debug('Server re-enabled tracking & INCREF %r', ident) 438 else: 439 raise ke 440 441 def decref(self, c, ident): 442 if ident not in self.id_to_refcount and \ 443 ident in self.id_to_local_proxy_obj: 444 util.debug('Server DECREF skipping %r', ident) 445 return 446 447 with self.mutex: 448 if self.id_to_refcount[ident] <= 0: 449 raise AssertionError( 450 "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format( 451 ident, self.id_to_obj[ident], 452 self.id_to_refcount[ident])) 453 self.id_to_refcount[ident] -= 1 454 if self.id_to_refcount[ident] == 0: 455 del self.id_to_refcount[ident] 456 457 if ident not in self.id_to_refcount: 458 # Two-step process in case the object turns out to contain other 459 # proxy objects (e.g. a managed list of managed lists). 460 # Otherwise, deleting self.id_to_obj[ident] would trigger the 461 # deleting of the stored value (another managed object) which would 462 # in turn attempt to acquire the mutex that is already held here. 463 self.id_to_obj[ident] = (None, (), None) # thread-safe 464 util.debug('disposing of obj with id %r', ident) 465 with self.mutex: 466 del self.id_to_obj[ident] 467 468 469# 470# Class to represent state of a manager 471# 472 473class State(object): 474 __slots__ = ['value'] 475 INITIAL = 0 476 STARTED = 1 477 SHUTDOWN = 2 478 479# 480# Mapping from serializer name to Listener and Client types 481# 482 483listener_client = { 484 'pickle' : (connection.Listener, connection.Client), 485 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) 486 } 487 488# 489# Definition of BaseManager 490# 491 492class BaseManager(object): 493 ''' 494 Base class for managers 495 ''' 496 _registry = {} 497 _Server = Server 498 499 def __init__(self, address=None, authkey=None, serializer='pickle', 500 ctx=None, *, shutdown_timeout=1.0): 501 if authkey is None: 502 authkey = process.current_process().authkey 503 self._address = address # XXX not final address if eg ('', 0) 504 self._authkey = process.AuthenticationString(authkey) 505 self._state = State() 506 self._state.value = State.INITIAL 507 self._serializer = serializer 508 self._Listener, self._Client = listener_client[serializer] 509 self._ctx = ctx or get_context() 510 self._shutdown_timeout = shutdown_timeout 511 512 def get_server(self): 513 ''' 514 Return server object with serve_forever() method and address attribute 515 ''' 516 if self._state.value != State.INITIAL: 517 if self._state.value == State.STARTED: 518 raise ProcessError("Already started server") 519 elif self._state.value == State.SHUTDOWN: 520 raise ProcessError("Manager has shut down") 521 else: 522 raise ProcessError( 523 "Unknown state {!r}".format(self._state.value)) 524 return Server(self._registry, self._address, 525 self._authkey, self._serializer) 526 527 def connect(self): 528 ''' 529 Connect manager object to the server process 530 ''' 531 Listener, Client = listener_client[self._serializer] 532 conn = Client(self._address, authkey=self._authkey) 533 dispatch(conn, None, 'dummy') 534 self._state.value = State.STARTED 535 536 def start(self, initializer=None, initargs=()): 537 ''' 538 Spawn a server process for this manager object 539 ''' 540 if self._state.value != State.INITIAL: 541 if self._state.value == State.STARTED: 542 raise ProcessError("Already started server") 543 elif self._state.value == State.SHUTDOWN: 544 raise ProcessError("Manager has shut down") 545 else: 546 raise ProcessError( 547 "Unknown state {!r}".format(self._state.value)) 548 549 if initializer is not None and not callable(initializer): 550 raise TypeError('initializer must be a callable') 551 552 # pipe over which we will retrieve address of server 553 reader, writer = connection.Pipe(duplex=False) 554 555 # spawn process which runs a server 556 self._process = self._ctx.Process( 557 target=type(self)._run_server, 558 args=(self._registry, self._address, self._authkey, 559 self._serializer, writer, initializer, initargs), 560 ) 561 ident = ':'.join(str(i) for i in self._process._identity) 562 self._process.name = type(self).__name__ + '-' + ident 563 self._process.start() 564 565 # get address of server 566 writer.close() 567 self._address = reader.recv() 568 reader.close() 569 570 # register a finalizer 571 self._state.value = State.STARTED 572 self.shutdown = util.Finalize( 573 self, type(self)._finalize_manager, 574 args=(self._process, self._address, self._authkey, self._state, 575 self._Client, self._shutdown_timeout), 576 exitpriority=0 577 ) 578 579 @classmethod 580 def _run_server(cls, registry, address, authkey, serializer, writer, 581 initializer=None, initargs=()): 582 ''' 583 Create a server, report its address and run it 584 ''' 585 # bpo-36368: protect server process from KeyboardInterrupt signals 586 signal.signal(signal.SIGINT, signal.SIG_IGN) 587 588 if initializer is not None: 589 initializer(*initargs) 590 591 # create server 592 server = cls._Server(registry, address, authkey, serializer) 593 594 # inform parent process of the server's address 595 writer.send(server.address) 596 writer.close() 597 598 # run the manager 599 util.info('manager serving at %r', server.address) 600 server.serve_forever() 601 602 def _create(self, typeid, /, *args, **kwds): 603 ''' 604 Create a new shared object; return the token and exposed tuple 605 ''' 606 assert self._state.value == State.STARTED, 'server not yet started' 607 conn = self._Client(self._address, authkey=self._authkey) 608 try: 609 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) 610 finally: 611 conn.close() 612 return Token(typeid, self._address, id), exposed 613 614 def join(self, timeout=None): 615 ''' 616 Join the manager process (if it has been spawned) 617 ''' 618 if self._process is not None: 619 self._process.join(timeout) 620 if not self._process.is_alive(): 621 self._process = None 622 623 def _debug_info(self): 624 ''' 625 Return some info about the servers shared objects and connections 626 ''' 627 conn = self._Client(self._address, authkey=self._authkey) 628 try: 629 return dispatch(conn, None, 'debug_info') 630 finally: 631 conn.close() 632 633 def _number_of_objects(self): 634 ''' 635 Return the number of shared objects 636 ''' 637 conn = self._Client(self._address, authkey=self._authkey) 638 try: 639 return dispatch(conn, None, 'number_of_objects') 640 finally: 641 conn.close() 642 643 def __enter__(self): 644 if self._state.value == State.INITIAL: 645 self.start() 646 if self._state.value != State.STARTED: 647 if self._state.value == State.INITIAL: 648 raise ProcessError("Unable to start server") 649 elif self._state.value == State.SHUTDOWN: 650 raise ProcessError("Manager has shut down") 651 else: 652 raise ProcessError( 653 "Unknown state {!r}".format(self._state.value)) 654 return self 655 656 def __exit__(self, exc_type, exc_val, exc_tb): 657 self.shutdown() 658 659 @staticmethod 660 def _finalize_manager(process, address, authkey, state, _Client, 661 shutdown_timeout): 662 ''' 663 Shutdown the manager process; will be registered as a finalizer 664 ''' 665 if process.is_alive(): 666 util.info('sending shutdown message to manager') 667 try: 668 conn = _Client(address, authkey=authkey) 669 try: 670 dispatch(conn, None, 'shutdown') 671 finally: 672 conn.close() 673 except Exception: 674 pass 675 676 process.join(timeout=shutdown_timeout) 677 if process.is_alive(): 678 util.info('manager still alive') 679 if hasattr(process, 'terminate'): 680 util.info('trying to `terminate()` manager process') 681 process.terminate() 682 process.join(timeout=shutdown_timeout) 683 if process.is_alive(): 684 util.info('manager still alive after terminate') 685 process.kill() 686 process.join() 687 688 state.value = State.SHUTDOWN 689 try: 690 del BaseProxy._address_to_local[address] 691 except KeyError: 692 pass 693 694 @property 695 def address(self): 696 return self._address 697 698 @classmethod 699 def register(cls, typeid, callable=None, proxytype=None, exposed=None, 700 method_to_typeid=None, create_method=True): 701 ''' 702 Register a typeid with the manager type 703 ''' 704 if '_registry' not in cls.__dict__: 705 cls._registry = cls._registry.copy() 706 707 if proxytype is None: 708 proxytype = AutoProxy 709 710 exposed = exposed or getattr(proxytype, '_exposed_', None) 711 712 method_to_typeid = method_to_typeid or \ 713 getattr(proxytype, '_method_to_typeid_', None) 714 715 if method_to_typeid: 716 for key, value in list(method_to_typeid.items()): # isinstance? 717 assert type(key) is str, '%r is not a string' % key 718 assert type(value) is str, '%r is not a string' % value 719 720 cls._registry[typeid] = ( 721 callable, exposed, method_to_typeid, proxytype 722 ) 723 724 if create_method: 725 def temp(self, /, *args, **kwds): 726 util.debug('requesting creation of a shared %r object', typeid) 727 token, exp = self._create(typeid, *args, **kwds) 728 proxy = proxytype( 729 token, self._serializer, manager=self, 730 authkey=self._authkey, exposed=exp 731 ) 732 conn = self._Client(token.address, authkey=self._authkey) 733 dispatch(conn, None, 'decref', (token.id,)) 734 return proxy 735 temp.__name__ = typeid 736 setattr(cls, typeid, temp) 737 738# 739# Subclass of set which get cleared after a fork 740# 741 742class ProcessLocalSet(set): 743 def __init__(self): 744 util.register_after_fork(self, lambda obj: obj.clear()) 745 def __reduce__(self): 746 return type(self), () 747 748# 749# Definition of BaseProxy 750# 751 752class BaseProxy(object): 753 ''' 754 A base for proxies of shared objects 755 ''' 756 _address_to_local = {} 757 _mutex = util.ForkAwareThreadLock() 758 759 def __init__(self, token, serializer, manager=None, 760 authkey=None, exposed=None, incref=True, manager_owned=False): 761 with BaseProxy._mutex: 762 tls_idset = BaseProxy._address_to_local.get(token.address, None) 763 if tls_idset is None: 764 tls_idset = util.ForkAwareLocal(), ProcessLocalSet() 765 BaseProxy._address_to_local[token.address] = tls_idset 766 767 # self._tls is used to record the connection used by this 768 # thread to communicate with the manager at token.address 769 self._tls = tls_idset[0] 770 771 # self._idset is used to record the identities of all shared 772 # objects for which the current process owns references and 773 # which are in the manager at token.address 774 self._idset = tls_idset[1] 775 776 self._token = token 777 self._id = self._token.id 778 self._manager = manager 779 self._serializer = serializer 780 self._Client = listener_client[serializer][1] 781 782 # Should be set to True only when a proxy object is being created 783 # on the manager server; primary use case: nested proxy objects. 784 # RebuildProxy detects when a proxy is being created on the manager 785 # and sets this value appropriately. 786 self._owned_by_manager = manager_owned 787 788 if authkey is not None: 789 self._authkey = process.AuthenticationString(authkey) 790 elif self._manager is not None: 791 self._authkey = self._manager._authkey 792 else: 793 self._authkey = process.current_process().authkey 794 795 if incref: 796 self._incref() 797 798 util.register_after_fork(self, BaseProxy._after_fork) 799 800 def _connect(self): 801 util.debug('making connection to manager') 802 name = process.current_process().name 803 if threading.current_thread().name != 'MainThread': 804 name += '|' + threading.current_thread().name 805 conn = self._Client(self._token.address, authkey=self._authkey) 806 dispatch(conn, None, 'accept_connection', (name,)) 807 self._tls.connection = conn 808 809 def _callmethod(self, methodname, args=(), kwds={}): 810 ''' 811 Try to call a method of the referent and return a copy of the result 812 ''' 813 try: 814 conn = self._tls.connection 815 except AttributeError: 816 util.debug('thread %r does not own a connection', 817 threading.current_thread().name) 818 self._connect() 819 conn = self._tls.connection 820 821 conn.send((self._id, methodname, args, kwds)) 822 kind, result = conn.recv() 823 824 if kind == '#RETURN': 825 return result 826 elif kind == '#PROXY': 827 exposed, token = result 828 proxytype = self._manager._registry[token.typeid][-1] 829 token.address = self._token.address 830 proxy = proxytype( 831 token, self._serializer, manager=self._manager, 832 authkey=self._authkey, exposed=exposed 833 ) 834 conn = self._Client(token.address, authkey=self._authkey) 835 dispatch(conn, None, 'decref', (token.id,)) 836 return proxy 837 raise convert_to_error(kind, result) 838 839 def _getvalue(self): 840 ''' 841 Get a copy of the value of the referent 842 ''' 843 return self._callmethod('#GETVALUE') 844 845 def _incref(self): 846 if self._owned_by_manager: 847 util.debug('owned_by_manager skipped INCREF of %r', self._token.id) 848 return 849 850 conn = self._Client(self._token.address, authkey=self._authkey) 851 dispatch(conn, None, 'incref', (self._id,)) 852 util.debug('INCREF %r', self._token.id) 853 854 self._idset.add(self._id) 855 856 state = self._manager and self._manager._state 857 858 self._close = util.Finalize( 859 self, BaseProxy._decref, 860 args=(self._token, self._authkey, state, 861 self._tls, self._idset, self._Client), 862 exitpriority=10 863 ) 864 865 @staticmethod 866 def _decref(token, authkey, state, tls, idset, _Client): 867 idset.discard(token.id) 868 869 # check whether manager is still alive 870 if state is None or state.value == State.STARTED: 871 # tell manager this process no longer cares about referent 872 try: 873 util.debug('DECREF %r', token.id) 874 conn = _Client(token.address, authkey=authkey) 875 dispatch(conn, None, 'decref', (token.id,)) 876 except Exception as e: 877 util.debug('... decref failed %s', e) 878 879 else: 880 util.debug('DECREF %r -- manager already shutdown', token.id) 881 882 # check whether we can close this thread's connection because 883 # the process owns no more references to objects for this manager 884 if not idset and hasattr(tls, 'connection'): 885 util.debug('thread %r has no more proxies so closing conn', 886 threading.current_thread().name) 887 tls.connection.close() 888 del tls.connection 889 890 def _after_fork(self): 891 self._manager = None 892 try: 893 self._incref() 894 except Exception as e: 895 # the proxy may just be for a manager which has shutdown 896 util.info('incref failed: %s' % e) 897 898 def __reduce__(self): 899 kwds = {} 900 if get_spawning_popen() is not None: 901 kwds['authkey'] = self._authkey 902 903 if getattr(self, '_isauto', False): 904 kwds['exposed'] = self._exposed_ 905 return (RebuildProxy, 906 (AutoProxy, self._token, self._serializer, kwds)) 907 else: 908 return (RebuildProxy, 909 (type(self), self._token, self._serializer, kwds)) 910 911 def __deepcopy__(self, memo): 912 return self._getvalue() 913 914 def __repr__(self): 915 return '<%s object, typeid %r at %#x>' % \ 916 (type(self).__name__, self._token.typeid, id(self)) 917 918 def __str__(self): 919 ''' 920 Return representation of the referent (or a fall-back if that fails) 921 ''' 922 try: 923 return self._callmethod('__repr__') 924 except Exception: 925 return repr(self)[:-1] + "; '__str__()' failed>" 926 927# 928# Function used for unpickling 929# 930 931def RebuildProxy(func, token, serializer, kwds): 932 ''' 933 Function used for unpickling proxy objects. 934 ''' 935 server = getattr(process.current_process(), '_manager_server', None) 936 if server and server.address == token.address: 937 util.debug('Rebuild a proxy owned by manager, token=%r', token) 938 kwds['manager_owned'] = True 939 if token.id not in server.id_to_local_proxy_obj: 940 server.id_to_local_proxy_obj[token.id] = \ 941 server.id_to_obj[token.id] 942 incref = ( 943 kwds.pop('incref', True) and 944 not getattr(process.current_process(), '_inheriting', False) 945 ) 946 return func(token, serializer, incref=incref, **kwds) 947 948# 949# Functions to create proxies and proxy types 950# 951 952def MakeProxyType(name, exposed, _cache={}): 953 ''' 954 Return a proxy type whose methods are given by `exposed` 955 ''' 956 exposed = tuple(exposed) 957 try: 958 return _cache[(name, exposed)] 959 except KeyError: 960 pass 961 962 dic = {} 963 964 for meth in exposed: 965 exec('''def %s(self, /, *args, **kwds): 966 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) 967 968 ProxyType = type(name, (BaseProxy,), dic) 969 ProxyType._exposed_ = exposed 970 _cache[(name, exposed)] = ProxyType 971 return ProxyType 972 973 974def AutoProxy(token, serializer, manager=None, authkey=None, 975 exposed=None, incref=True, manager_owned=False): 976 ''' 977 Return an auto-proxy for `token` 978 ''' 979 _Client = listener_client[serializer][1] 980 981 if exposed is None: 982 conn = _Client(token.address, authkey=authkey) 983 try: 984 exposed = dispatch(conn, None, 'get_methods', (token,)) 985 finally: 986 conn.close() 987 988 if authkey is None and manager is not None: 989 authkey = manager._authkey 990 if authkey is None: 991 authkey = process.current_process().authkey 992 993 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) 994 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, 995 incref=incref, manager_owned=manager_owned) 996 proxy._isauto = True 997 return proxy 998 999# 1000# Types/callables which we will register with SyncManager 1001# 1002 1003class Namespace(object): 1004 def __init__(self, /, **kwds): 1005 self.__dict__.update(kwds) 1006 def __repr__(self): 1007 items = list(self.__dict__.items()) 1008 temp = [] 1009 for name, value in items: 1010 if not name.startswith('_'): 1011 temp.append('%s=%r' % (name, value)) 1012 temp.sort() 1013 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) 1014 1015class Value(object): 1016 def __init__(self, typecode, value, lock=True): 1017 self._typecode = typecode 1018 self._value = value 1019 def get(self): 1020 return self._value 1021 def set(self, value): 1022 self._value = value 1023 def __repr__(self): 1024 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 1025 value = property(get, set) 1026 1027def Array(typecode, sequence, lock=True): 1028 return array.array(typecode, sequence) 1029 1030# 1031# Proxy types used by SyncManager 1032# 1033 1034class IteratorProxy(BaseProxy): 1035 _exposed_ = ('__next__', 'send', 'throw', 'close') 1036 def __iter__(self): 1037 return self 1038 def __next__(self, *args): 1039 return self._callmethod('__next__', args) 1040 def send(self, *args): 1041 return self._callmethod('send', args) 1042 def throw(self, *args): 1043 return self._callmethod('throw', args) 1044 def close(self, *args): 1045 return self._callmethod('close', args) 1046 1047 1048class AcquirerProxy(BaseProxy): 1049 _exposed_ = ('acquire', 'release') 1050 def acquire(self, blocking=True, timeout=None): 1051 args = (blocking,) if timeout is None else (blocking, timeout) 1052 return self._callmethod('acquire', args) 1053 def release(self): 1054 return self._callmethod('release') 1055 def __enter__(self): 1056 return self._callmethod('acquire') 1057 def __exit__(self, exc_type, exc_val, exc_tb): 1058 return self._callmethod('release') 1059 1060 1061class ConditionProxy(AcquirerProxy): 1062 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') 1063 def wait(self, timeout=None): 1064 return self._callmethod('wait', (timeout,)) 1065 def notify(self, n=1): 1066 return self._callmethod('notify', (n,)) 1067 def notify_all(self): 1068 return self._callmethod('notify_all') 1069 def wait_for(self, predicate, timeout=None): 1070 result = predicate() 1071 if result: 1072 return result 1073 if timeout is not None: 1074 endtime = time.monotonic() + timeout 1075 else: 1076 endtime = None 1077 waittime = None 1078 while not result: 1079 if endtime is not None: 1080 waittime = endtime - time.monotonic() 1081 if waittime <= 0: 1082 break 1083 self.wait(waittime) 1084 result = predicate() 1085 return result 1086 1087 1088class EventProxy(BaseProxy): 1089 _exposed_ = ('is_set', 'set', 'clear', 'wait') 1090 def is_set(self): 1091 return self._callmethod('is_set') 1092 def set(self): 1093 return self._callmethod('set') 1094 def clear(self): 1095 return self._callmethod('clear') 1096 def wait(self, timeout=None): 1097 return self._callmethod('wait', (timeout,)) 1098 1099 1100class BarrierProxy(BaseProxy): 1101 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') 1102 def wait(self, timeout=None): 1103 return self._callmethod('wait', (timeout,)) 1104 def abort(self): 1105 return self._callmethod('abort') 1106 def reset(self): 1107 return self._callmethod('reset') 1108 @property 1109 def parties(self): 1110 return self._callmethod('__getattribute__', ('parties',)) 1111 @property 1112 def n_waiting(self): 1113 return self._callmethod('__getattribute__', ('n_waiting',)) 1114 @property 1115 def broken(self): 1116 return self._callmethod('__getattribute__', ('broken',)) 1117 1118 1119class NamespaceProxy(BaseProxy): 1120 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') 1121 def __getattr__(self, key): 1122 if key[0] == '_': 1123 return object.__getattribute__(self, key) 1124 callmethod = object.__getattribute__(self, '_callmethod') 1125 return callmethod('__getattribute__', (key,)) 1126 def __setattr__(self, key, value): 1127 if key[0] == '_': 1128 return object.__setattr__(self, key, value) 1129 callmethod = object.__getattribute__(self, '_callmethod') 1130 return callmethod('__setattr__', (key, value)) 1131 def __delattr__(self, key): 1132 if key[0] == '_': 1133 return object.__delattr__(self, key) 1134 callmethod = object.__getattribute__(self, '_callmethod') 1135 return callmethod('__delattr__', (key,)) 1136 1137 1138class ValueProxy(BaseProxy): 1139 _exposed_ = ('get', 'set') 1140 def get(self): 1141 return self._callmethod('get') 1142 def set(self, value): 1143 return self._callmethod('set', (value,)) 1144 value = property(get, set) 1145 1146 __class_getitem__ = classmethod(types.GenericAlias) 1147 1148 1149BaseListProxy = MakeProxyType('BaseListProxy', ( 1150 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', 1151 '__mul__', '__reversed__', '__rmul__', '__setitem__', 1152 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 1153 'reverse', 'sort', '__imul__' 1154 )) 1155class ListProxy(BaseListProxy): 1156 def __iadd__(self, value): 1157 self._callmethod('extend', (value,)) 1158 return self 1159 def __imul__(self, value): 1160 self._callmethod('__imul__', (value,)) 1161 return self 1162 1163 1164DictProxy = MakeProxyType('DictProxy', ( 1165 '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__', 1166 '__setitem__', 'clear', 'copy', 'get', 'items', 1167 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' 1168 )) 1169DictProxy._method_to_typeid_ = { 1170 '__iter__': 'Iterator', 1171 } 1172 1173 1174ArrayProxy = MakeProxyType('ArrayProxy', ( 1175 '__len__', '__getitem__', '__setitem__' 1176 )) 1177 1178 1179BasePoolProxy = MakeProxyType('PoolProxy', ( 1180 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 1181 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', 1182 )) 1183BasePoolProxy._method_to_typeid_ = { 1184 'apply_async': 'AsyncResult', 1185 'map_async': 'AsyncResult', 1186 'starmap_async': 'AsyncResult', 1187 'imap': 'Iterator', 1188 'imap_unordered': 'Iterator' 1189 } 1190class PoolProxy(BasePoolProxy): 1191 def __enter__(self): 1192 return self 1193 def __exit__(self, exc_type, exc_val, exc_tb): 1194 self.terminate() 1195 1196# 1197# Definition of SyncManager 1198# 1199 1200class SyncManager(BaseManager): 1201 ''' 1202 Subclass of `BaseManager` which supports a number of shared object types. 1203 1204 The types registered are those intended for the synchronization 1205 of threads, plus `dict`, `list` and `Namespace`. 1206 1207 The `multiprocessing.Manager()` function creates started instances of 1208 this class. 1209 ''' 1210 1211SyncManager.register('Queue', queue.Queue) 1212SyncManager.register('JoinableQueue', queue.Queue) 1213SyncManager.register('Event', threading.Event, EventProxy) 1214SyncManager.register('Lock', threading.Lock, AcquirerProxy) 1215SyncManager.register('RLock', threading.RLock, AcquirerProxy) 1216SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) 1217SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, 1218 AcquirerProxy) 1219SyncManager.register('Condition', threading.Condition, ConditionProxy) 1220SyncManager.register('Barrier', threading.Barrier, BarrierProxy) 1221SyncManager.register('Pool', pool.Pool, PoolProxy) 1222SyncManager.register('list', list, ListProxy) 1223SyncManager.register('dict', dict, DictProxy) 1224SyncManager.register('Value', Value, ValueProxy) 1225SyncManager.register('Array', Array, ArrayProxy) 1226SyncManager.register('Namespace', Namespace, NamespaceProxy) 1227 1228# types returned by methods of PoolProxy 1229SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) 1230SyncManager.register('AsyncResult', create_method=False) 1231 1232# 1233# Definition of SharedMemoryManager and SharedMemoryServer 1234# 1235 1236if HAS_SHMEM: 1237 class _SharedMemoryTracker: 1238 "Manages one or more shared memory segments." 1239 1240 def __init__(self, name, segment_names=[]): 1241 self.shared_memory_context_name = name 1242 self.segment_names = segment_names 1243 1244 def register_segment(self, segment_name): 1245 "Adds the supplied shared memory block name to tracker." 1246 util.debug(f"Register segment {segment_name!r} in pid {getpid()}") 1247 self.segment_names.append(segment_name) 1248 1249 def destroy_segment(self, segment_name): 1250 """Calls unlink() on the shared memory block with the supplied name 1251 and removes it from the list of blocks being tracked.""" 1252 util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}") 1253 self.segment_names.remove(segment_name) 1254 segment = shared_memory.SharedMemory(segment_name) 1255 segment.close() 1256 segment.unlink() 1257 1258 def unlink(self): 1259 "Calls destroy_segment() on all tracked shared memory blocks." 1260 for segment_name in self.segment_names[:]: 1261 self.destroy_segment(segment_name) 1262 1263 def __del__(self): 1264 util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}") 1265 self.unlink() 1266 1267 def __getstate__(self): 1268 return (self.shared_memory_context_name, self.segment_names) 1269 1270 def __setstate__(self, state): 1271 self.__init__(*state) 1272 1273 1274 class SharedMemoryServer(Server): 1275 1276 public = Server.public + \ 1277 ['track_segment', 'release_segment', 'list_segments'] 1278 1279 def __init__(self, *args, **kwargs): 1280 Server.__init__(self, *args, **kwargs) 1281 address = self.address 1282 # The address of Linux abstract namespaces can be bytes 1283 if isinstance(address, bytes): 1284 address = os.fsdecode(address) 1285 self.shared_memory_context = \ 1286 _SharedMemoryTracker(f"shm_{address}_{getpid()}") 1287 util.debug(f"SharedMemoryServer started by pid {getpid()}") 1288 1289 def create(self, c, typeid, /, *args, **kwargs): 1290 """Create a new distributed-shared object (not backed by a shared 1291 memory block) and return its id to be used in a Proxy Object.""" 1292 # Unless set up as a shared proxy, don't make shared_memory_context 1293 # a standard part of kwargs. This makes things easier for supplying 1294 # simple functions. 1295 if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"): 1296 kwargs['shared_memory_context'] = self.shared_memory_context 1297 return Server.create(self, c, typeid, *args, **kwargs) 1298 1299 def shutdown(self, c): 1300 "Call unlink() on all tracked shared memory, terminate the Server." 1301 self.shared_memory_context.unlink() 1302 return Server.shutdown(self, c) 1303 1304 def track_segment(self, c, segment_name): 1305 "Adds the supplied shared memory block name to Server's tracker." 1306 self.shared_memory_context.register_segment(segment_name) 1307 1308 def release_segment(self, c, segment_name): 1309 """Calls unlink() on the shared memory block with the supplied name 1310 and removes it from the tracker instance inside the Server.""" 1311 self.shared_memory_context.destroy_segment(segment_name) 1312 1313 def list_segments(self, c): 1314 """Returns a list of names of shared memory blocks that the Server 1315 is currently tracking.""" 1316 return self.shared_memory_context.segment_names 1317 1318 1319 class SharedMemoryManager(BaseManager): 1320 """Like SyncManager but uses SharedMemoryServer instead of Server. 1321 1322 It provides methods for creating and returning SharedMemory instances 1323 and for creating a list-like object (ShareableList) backed by shared 1324 memory. It also provides methods that create and return Proxy Objects 1325 that support synchronization across processes (i.e. multi-process-safe 1326 locks and semaphores). 1327 """ 1328 1329 _Server = SharedMemoryServer 1330 1331 def __init__(self, *args, **kwargs): 1332 if os.name == "posix": 1333 # bpo-36867: Ensure the resource_tracker is running before 1334 # launching the manager process, so that concurrent 1335 # shared_memory manipulation both in the manager and in the 1336 # current process does not create two resource_tracker 1337 # processes. 1338 from . import resource_tracker 1339 resource_tracker.ensure_running() 1340 BaseManager.__init__(self, *args, **kwargs) 1341 util.debug(f"{self.__class__.__name__} created by pid {getpid()}") 1342 1343 def __del__(self): 1344 util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}") 1345 1346 def get_server(self): 1347 'Better than monkeypatching for now; merge into Server ultimately' 1348 if self._state.value != State.INITIAL: 1349 if self._state.value == State.STARTED: 1350 raise ProcessError("Already started SharedMemoryServer") 1351 elif self._state.value == State.SHUTDOWN: 1352 raise ProcessError("SharedMemoryManager has shut down") 1353 else: 1354 raise ProcessError( 1355 "Unknown state {!r}".format(self._state.value)) 1356 return self._Server(self._registry, self._address, 1357 self._authkey, self._serializer) 1358 1359 def SharedMemory(self, size): 1360 """Returns a new SharedMemory instance with the specified size in 1361 bytes, to be tracked by the manager.""" 1362 with self._Client(self._address, authkey=self._authkey) as conn: 1363 sms = shared_memory.SharedMemory(None, create=True, size=size) 1364 try: 1365 dispatch(conn, None, 'track_segment', (sms.name,)) 1366 except BaseException as e: 1367 sms.unlink() 1368 raise e 1369 return sms 1370 1371 def ShareableList(self, sequence): 1372 """Returns a new ShareableList instance populated with the values 1373 from the input sequence, to be tracked by the manager.""" 1374 with self._Client(self._address, authkey=self._authkey) as conn: 1375 sl = shared_memory.ShareableList(sequence) 1376 try: 1377 dispatch(conn, None, 'track_segment', (sl.shm.name,)) 1378 except BaseException as e: 1379 sl.shm.unlink() 1380 raise e 1381 return sl 1382