1#
2# Module providing manager classes for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk
8# Licensed to PSF under a Contributor Agreement.
9#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
17import sys
18import threading
19import signal
20import array
21import queue
22import time
23import types
24import os
25from os import getpid
26
27from traceback import format_exc
28
29from . import connection
30from .context import reduction, get_spawning_popen, ProcessError
31from . import pool
32from . import process
33from . import util
34from . import get_context
35try:
36    from . import shared_memory
37except ImportError:
38    HAS_SHMEM = False
39else:
40    HAS_SHMEM = True
41    __all__.append('SharedMemoryManager')
42
43#
44# Register some things for pickling
45#
46
47def reduce_array(a):
48    return array.array, (a.typecode, a.tobytes())
49reduction.register(array.array, reduce_array)
50
51view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
52def rebuild_as_list(obj):
53    return list, (list(obj),)
54for view_type in view_types:
55    reduction.register(view_type, rebuild_as_list)
56del view_type, view_types
57
58#
59# Type for identifying shared objects
60#
61
62class Token(object):
63    '''
64    Type to uniquely identify a shared object
65    '''
66    __slots__ = ('typeid', 'address', 'id')
67
68    def __init__(self, typeid, address, id):
69        (self.typeid, self.address, self.id) = (typeid, address, id)
70
71    def __getstate__(self):
72        return (self.typeid, self.address, self.id)
73
74    def __setstate__(self, state):
75        (self.typeid, self.address, self.id) = state
76
77    def __repr__(self):
78        return '%s(typeid=%r, address=%r, id=%r)' % \
79               (self.__class__.__name__, self.typeid, self.address, self.id)
80
81#
82# Function for communication with a manager's server process
83#
84
85def dispatch(c, id, methodname, args=(), kwds={}):
86    '''
87    Send a message to manager using connection `c` and return response
88    '''
89    c.send((id, methodname, args, kwds))
90    kind, result = c.recv()
91    if kind == '#RETURN':
92        return result
93    raise convert_to_error(kind, result)
94
95def convert_to_error(kind, result):
96    if kind == '#ERROR':
97        return result
98    elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
99        if not isinstance(result, str):
100            raise TypeError(
101                "Result {0!r} (kind '{1}') type is {2}, not str".format(
102                    result, kind, type(result)))
103        if kind == '#UNSERIALIZABLE':
104            return RemoteError('Unserializable message: %s\n' % result)
105        else:
106            return RemoteError(result)
107    else:
108        return ValueError('Unrecognized message type {!r}'.format(kind))
109
110class RemoteError(Exception):
111    def __str__(self):
112        return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
113
114#
115# Functions for finding the method names of an object
116#
117
118def all_methods(obj):
119    '''
120    Return a list of names of methods of `obj`
121    '''
122    temp = []
123    for name in dir(obj):
124        func = getattr(obj, name)
125        if callable(func):
126            temp.append(name)
127    return temp
128
129def public_methods(obj):
130    '''
131    Return a list of names of methods of `obj` which do not start with '_'
132    '''
133    return [name for name in all_methods(obj) if name[0] != '_']
134
135#
136# Server which is run in a process controlled by a manager
137#
138
139class Server(object):
140    '''
141    Server class which runs in a process controlled by a manager object
142    '''
143    public = ['shutdown', 'create', 'accept_connection', 'get_methods',
144              'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
145
146    def __init__(self, registry, address, authkey, serializer):
147        if not isinstance(authkey, bytes):
148            raise TypeError(
149                "Authkey {0!r} is type {1!s}, not bytes".format(
150                    authkey, type(authkey)))
151        self.registry = registry
152        self.authkey = process.AuthenticationString(authkey)
153        Listener, Client = listener_client[serializer]
154
155        # do authentication later
156        self.listener = Listener(address=address, backlog=16)
157        self.address = self.listener.address
158
159        self.id_to_obj = {'0': (None, ())}
160        self.id_to_refcount = {}
161        self.id_to_local_proxy_obj = {}
162        self.mutex = threading.Lock()
163
164    def serve_forever(self):
165        '''
166        Run the server forever
167        '''
168        self.stop_event = threading.Event()
169        process.current_process()._manager_server = self
170        try:
171            accepter = threading.Thread(target=self.accepter)
172            accepter.daemon = True
173            accepter.start()
174            try:
175                while not self.stop_event.is_set():
176                    self.stop_event.wait(1)
177            except (KeyboardInterrupt, SystemExit):
178                pass
179        finally:
180            if sys.stdout != sys.__stdout__: # what about stderr?
181                util.debug('resetting stdout, stderr')
182                sys.stdout = sys.__stdout__
183                sys.stderr = sys.__stderr__
184            sys.exit(0)
185
186    def accepter(self):
187        while True:
188            try:
189                c = self.listener.accept()
190            except OSError:
191                continue
192            t = threading.Thread(target=self.handle_request, args=(c,))
193            t.daemon = True
194            t.start()
195
196    def _handle_request(self, c):
197        request = None
198        try:
199            connection.deliver_challenge(c, self.authkey)
200            connection.answer_challenge(c, self.authkey)
201            request = c.recv()
202            ignore, funcname, args, kwds = request
203            assert funcname in self.public, '%r unrecognized' % funcname
204            func = getattr(self, funcname)
205        except Exception:
206            msg = ('#TRACEBACK', format_exc())
207        else:
208            try:
209                result = func(c, *args, **kwds)
210            except Exception:
211                msg = ('#TRACEBACK', format_exc())
212            else:
213                msg = ('#RETURN', result)
214
215        try:
216            c.send(msg)
217        except Exception as e:
218            try:
219                c.send(('#TRACEBACK', format_exc()))
220            except Exception:
221                pass
222            util.info('Failure to send message: %r', msg)
223            util.info(' ... request was %r', request)
224            util.info(' ... exception was %r', e)
225
226    def handle_request(self, conn):
227        '''
228        Handle a new connection
229        '''
230        try:
231            self._handle_request(conn)
232        except SystemExit:
233            # Server.serve_client() calls sys.exit(0) on EOF
234            pass
235        finally:
236            conn.close()
237
238    def serve_client(self, conn):
239        '''
240        Handle requests from the proxies in a particular process/thread
241        '''
242        util.debug('starting server thread to service %r',
243                   threading.current_thread().name)
244
245        recv = conn.recv
246        send = conn.send
247        id_to_obj = self.id_to_obj
248
249        while not self.stop_event.is_set():
250
251            try:
252                methodname = obj = None
253                request = recv()
254                ident, methodname, args, kwds = request
255                try:
256                    obj, exposed, gettypeid = id_to_obj[ident]
257                except KeyError as ke:
258                    try:
259                        obj, exposed, gettypeid = \
260                            self.id_to_local_proxy_obj[ident]
261                    except KeyError:
262                        raise ke
263
264                if methodname not in exposed:
265                    raise AttributeError(
266                        'method %r of %r object is not in exposed=%r' %
267                        (methodname, type(obj), exposed)
268                        )
269
270                function = getattr(obj, methodname)
271
272                try:
273                    res = function(*args, **kwds)
274                except Exception as e:
275                    msg = ('#ERROR', e)
276                else:
277                    typeid = gettypeid and gettypeid.get(methodname, None)
278                    if typeid:
279                        rident, rexposed = self.create(conn, typeid, res)
280                        token = Token(typeid, self.address, rident)
281                        msg = ('#PROXY', (rexposed, token))
282                    else:
283                        msg = ('#RETURN', res)
284
285            except AttributeError:
286                if methodname is None:
287                    msg = ('#TRACEBACK', format_exc())
288                else:
289                    try:
290                        fallback_func = self.fallback_mapping[methodname]
291                        result = fallback_func(
292                            self, conn, ident, obj, *args, **kwds
293                            )
294                        msg = ('#RETURN', result)
295                    except Exception:
296                        msg = ('#TRACEBACK', format_exc())
297
298            except EOFError:
299                util.debug('got EOF -- exiting thread serving %r',
300                           threading.current_thread().name)
301                sys.exit(0)
302
303            except Exception:
304                msg = ('#TRACEBACK', format_exc())
305
306            try:
307                try:
308                    send(msg)
309                except Exception:
310                    send(('#UNSERIALIZABLE', format_exc()))
311            except Exception as e:
312                util.info('exception in thread serving %r',
313                        threading.current_thread().name)
314                util.info(' ... message was %r', msg)
315                util.info(' ... exception was %r', e)
316                conn.close()
317                sys.exit(1)
318
319    def fallback_getvalue(self, conn, ident, obj):
320        return obj
321
322    def fallback_str(self, conn, ident, obj):
323        return str(obj)
324
325    def fallback_repr(self, conn, ident, obj):
326        return repr(obj)
327
328    fallback_mapping = {
329        '__str__':fallback_str,
330        '__repr__':fallback_repr,
331        '#GETVALUE':fallback_getvalue
332        }
333
334    def dummy(self, c):
335        pass
336
337    def debug_info(self, c):
338        '''
339        Return some info --- useful to spot problems with refcounting
340        '''
341        # Perhaps include debug info about 'c'?
342        with self.mutex:
343            result = []
344            keys = list(self.id_to_refcount.keys())
345            keys.sort()
346            for ident in keys:
347                if ident != '0':
348                    result.append('  %s:       refcount=%s\n    %s' %
349                                  (ident, self.id_to_refcount[ident],
350                                   str(self.id_to_obj[ident][0])[:75]))
351            return '\n'.join(result)
352
353    def number_of_objects(self, c):
354        '''
355        Number of shared objects
356        '''
357        # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
358        return len(self.id_to_refcount)
359
360    def shutdown(self, c):
361        '''
362        Shutdown this process
363        '''
364        try:
365            util.debug('manager received shutdown message')
366            c.send(('#RETURN', None))
367        except:
368            import traceback
369            traceback.print_exc()
370        finally:
371            self.stop_event.set()
372
373    def create(self, c, typeid, /, *args, **kwds):
374        '''
375        Create a new shared object and return its id
376        '''
377        with self.mutex:
378            callable, exposed, method_to_typeid, proxytype = \
379                      self.registry[typeid]
380
381            if callable is None:
382                if kwds or (len(args) != 1):
383                    raise ValueError(
384                        "Without callable, must have one non-keyword argument")
385                obj = args[0]
386            else:
387                obj = callable(*args, **kwds)
388
389            if exposed is None:
390                exposed = public_methods(obj)
391            if method_to_typeid is not None:
392                if not isinstance(method_to_typeid, dict):
393                    raise TypeError(
394                        "Method_to_typeid {0!r}: type {1!s}, not dict".format(
395                            method_to_typeid, type(method_to_typeid)))
396                exposed = list(exposed) + list(method_to_typeid)
397
398            ident = '%x' % id(obj)  # convert to string because xmlrpclib
399                                    # only has 32 bit signed integers
400            util.debug('%r callable returned object with id %r', typeid, ident)
401
402            self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
403            if ident not in self.id_to_refcount:
404                self.id_to_refcount[ident] = 0
405
406        self.incref(c, ident)
407        return ident, tuple(exposed)
408
409    def get_methods(self, c, token):
410        '''
411        Return the methods of the shared object indicated by token
412        '''
413        return tuple(self.id_to_obj[token.id][1])
414
415    def accept_connection(self, c, name):
416        '''
417        Spawn a new thread to serve this connection
418        '''
419        threading.current_thread().name = name
420        c.send(('#RETURN', None))
421        self.serve_client(c)
422
423    def incref(self, c, ident):
424        with self.mutex:
425            try:
426                self.id_to_refcount[ident] += 1
427            except KeyError as ke:
428                # If no external references exist but an internal (to the
429                # manager) still does and a new external reference is created
430                # from it, restore the manager's tracking of it from the
431                # previously stashed internal ref.
432                if ident in self.id_to_local_proxy_obj:
433                    self.id_to_refcount[ident] = 1
434                    self.id_to_obj[ident] = \
435                        self.id_to_local_proxy_obj[ident]
436                    obj, exposed, gettypeid = self.id_to_obj[ident]
437                    util.debug('Server re-enabled tracking & INCREF %r', ident)
438                else:
439                    raise ke
440
441    def decref(self, c, ident):
442        if ident not in self.id_to_refcount and \
443            ident in self.id_to_local_proxy_obj:
444            util.debug('Server DECREF skipping %r', ident)
445            return
446
447        with self.mutex:
448            if self.id_to_refcount[ident] <= 0:
449                raise AssertionError(
450                    "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
451                        ident, self.id_to_obj[ident],
452                        self.id_to_refcount[ident]))
453            self.id_to_refcount[ident] -= 1
454            if self.id_to_refcount[ident] == 0:
455                del self.id_to_refcount[ident]
456
457        if ident not in self.id_to_refcount:
458            # Two-step process in case the object turns out to contain other
459            # proxy objects (e.g. a managed list of managed lists).
460            # Otherwise, deleting self.id_to_obj[ident] would trigger the
461            # deleting of the stored value (another managed object) which would
462            # in turn attempt to acquire the mutex that is already held here.
463            self.id_to_obj[ident] = (None, (), None)  # thread-safe
464            util.debug('disposing of obj with id %r', ident)
465            with self.mutex:
466                del self.id_to_obj[ident]
467
468
469#
470# Class to represent state of a manager
471#
472
473class State(object):
474    __slots__ = ['value']
475    INITIAL = 0
476    STARTED = 1
477    SHUTDOWN = 2
478
479#
480# Mapping from serializer name to Listener and Client types
481#
482
483listener_client = {
484    'pickle' : (connection.Listener, connection.Client),
485    'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
486    }
487
488#
489# Definition of BaseManager
490#
491
492class BaseManager(object):
493    '''
494    Base class for managers
495    '''
496    _registry = {}
497    _Server = Server
498
499    def __init__(self, address=None, authkey=None, serializer='pickle',
500                 ctx=None, *, shutdown_timeout=1.0):
501        if authkey is None:
502            authkey = process.current_process().authkey
503        self._address = address     # XXX not final address if eg ('', 0)
504        self._authkey = process.AuthenticationString(authkey)
505        self._state = State()
506        self._state.value = State.INITIAL
507        self._serializer = serializer
508        self._Listener, self._Client = listener_client[serializer]
509        self._ctx = ctx or get_context()
510        self._shutdown_timeout = shutdown_timeout
511
512    def get_server(self):
513        '''
514        Return server object with serve_forever() method and address attribute
515        '''
516        if self._state.value != State.INITIAL:
517            if self._state.value == State.STARTED:
518                raise ProcessError("Already started server")
519            elif self._state.value == State.SHUTDOWN:
520                raise ProcessError("Manager has shut down")
521            else:
522                raise ProcessError(
523                    "Unknown state {!r}".format(self._state.value))
524        return Server(self._registry, self._address,
525                      self._authkey, self._serializer)
526
527    def connect(self):
528        '''
529        Connect manager object to the server process
530        '''
531        Listener, Client = listener_client[self._serializer]
532        conn = Client(self._address, authkey=self._authkey)
533        dispatch(conn, None, 'dummy')
534        self._state.value = State.STARTED
535
536    def start(self, initializer=None, initargs=()):
537        '''
538        Spawn a server process for this manager object
539        '''
540        if self._state.value != State.INITIAL:
541            if self._state.value == State.STARTED:
542                raise ProcessError("Already started server")
543            elif self._state.value == State.SHUTDOWN:
544                raise ProcessError("Manager has shut down")
545            else:
546                raise ProcessError(
547                    "Unknown state {!r}".format(self._state.value))
548
549        if initializer is not None and not callable(initializer):
550            raise TypeError('initializer must be a callable')
551
552        # pipe over which we will retrieve address of server
553        reader, writer = connection.Pipe(duplex=False)
554
555        # spawn process which runs a server
556        self._process = self._ctx.Process(
557            target=type(self)._run_server,
558            args=(self._registry, self._address, self._authkey,
559                  self._serializer, writer, initializer, initargs),
560            )
561        ident = ':'.join(str(i) for i in self._process._identity)
562        self._process.name = type(self).__name__  + '-' + ident
563        self._process.start()
564
565        # get address of server
566        writer.close()
567        self._address = reader.recv()
568        reader.close()
569
570        # register a finalizer
571        self._state.value = State.STARTED
572        self.shutdown = util.Finalize(
573            self, type(self)._finalize_manager,
574            args=(self._process, self._address, self._authkey, self._state,
575                  self._Client, self._shutdown_timeout),
576            exitpriority=0
577            )
578
579    @classmethod
580    def _run_server(cls, registry, address, authkey, serializer, writer,
581                    initializer=None, initargs=()):
582        '''
583        Create a server, report its address and run it
584        '''
585        # bpo-36368: protect server process from KeyboardInterrupt signals
586        signal.signal(signal.SIGINT, signal.SIG_IGN)
587
588        if initializer is not None:
589            initializer(*initargs)
590
591        # create server
592        server = cls._Server(registry, address, authkey, serializer)
593
594        # inform parent process of the server's address
595        writer.send(server.address)
596        writer.close()
597
598        # run the manager
599        util.info('manager serving at %r', server.address)
600        server.serve_forever()
601
602    def _create(self, typeid, /, *args, **kwds):
603        '''
604        Create a new shared object; return the token and exposed tuple
605        '''
606        assert self._state.value == State.STARTED, 'server not yet started'
607        conn = self._Client(self._address, authkey=self._authkey)
608        try:
609            id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
610        finally:
611            conn.close()
612        return Token(typeid, self._address, id), exposed
613
614    def join(self, timeout=None):
615        '''
616        Join the manager process (if it has been spawned)
617        '''
618        if self._process is not None:
619            self._process.join(timeout)
620            if not self._process.is_alive():
621                self._process = None
622
623    def _debug_info(self):
624        '''
625        Return some info about the servers shared objects and connections
626        '''
627        conn = self._Client(self._address, authkey=self._authkey)
628        try:
629            return dispatch(conn, None, 'debug_info')
630        finally:
631            conn.close()
632
633    def _number_of_objects(self):
634        '''
635        Return the number of shared objects
636        '''
637        conn = self._Client(self._address, authkey=self._authkey)
638        try:
639            return dispatch(conn, None, 'number_of_objects')
640        finally:
641            conn.close()
642
643    def __enter__(self):
644        if self._state.value == State.INITIAL:
645            self.start()
646        if self._state.value != State.STARTED:
647            if self._state.value == State.INITIAL:
648                raise ProcessError("Unable to start server")
649            elif self._state.value == State.SHUTDOWN:
650                raise ProcessError("Manager has shut down")
651            else:
652                raise ProcessError(
653                    "Unknown state {!r}".format(self._state.value))
654        return self
655
656    def __exit__(self, exc_type, exc_val, exc_tb):
657        self.shutdown()
658
659    @staticmethod
660    def _finalize_manager(process, address, authkey, state, _Client,
661                          shutdown_timeout):
662        '''
663        Shutdown the manager process; will be registered as a finalizer
664        '''
665        if process.is_alive():
666            util.info('sending shutdown message to manager')
667            try:
668                conn = _Client(address, authkey=authkey)
669                try:
670                    dispatch(conn, None, 'shutdown')
671                finally:
672                    conn.close()
673            except Exception:
674                pass
675
676            process.join(timeout=shutdown_timeout)
677            if process.is_alive():
678                util.info('manager still alive')
679                if hasattr(process, 'terminate'):
680                    util.info('trying to `terminate()` manager process')
681                    process.terminate()
682                    process.join(timeout=shutdown_timeout)
683                    if process.is_alive():
684                        util.info('manager still alive after terminate')
685                        process.kill()
686                        process.join()
687
688        state.value = State.SHUTDOWN
689        try:
690            del BaseProxy._address_to_local[address]
691        except KeyError:
692            pass
693
694    @property
695    def address(self):
696        return self._address
697
698    @classmethod
699    def register(cls, typeid, callable=None, proxytype=None, exposed=None,
700                 method_to_typeid=None, create_method=True):
701        '''
702        Register a typeid with the manager type
703        '''
704        if '_registry' not in cls.__dict__:
705            cls._registry = cls._registry.copy()
706
707        if proxytype is None:
708            proxytype = AutoProxy
709
710        exposed = exposed or getattr(proxytype, '_exposed_', None)
711
712        method_to_typeid = method_to_typeid or \
713                           getattr(proxytype, '_method_to_typeid_', None)
714
715        if method_to_typeid:
716            for key, value in list(method_to_typeid.items()): # isinstance?
717                assert type(key) is str, '%r is not a string' % key
718                assert type(value) is str, '%r is not a string' % value
719
720        cls._registry[typeid] = (
721            callable, exposed, method_to_typeid, proxytype
722            )
723
724        if create_method:
725            def temp(self, /, *args, **kwds):
726                util.debug('requesting creation of a shared %r object', typeid)
727                token, exp = self._create(typeid, *args, **kwds)
728                proxy = proxytype(
729                    token, self._serializer, manager=self,
730                    authkey=self._authkey, exposed=exp
731                    )
732                conn = self._Client(token.address, authkey=self._authkey)
733                dispatch(conn, None, 'decref', (token.id,))
734                return proxy
735            temp.__name__ = typeid
736            setattr(cls, typeid, temp)
737
738#
739# Subclass of set which get cleared after a fork
740#
741
742class ProcessLocalSet(set):
743    def __init__(self):
744        util.register_after_fork(self, lambda obj: obj.clear())
745    def __reduce__(self):
746        return type(self), ()
747
748#
749# Definition of BaseProxy
750#
751
752class BaseProxy(object):
753    '''
754    A base for proxies of shared objects
755    '''
756    _address_to_local = {}
757    _mutex = util.ForkAwareThreadLock()
758
759    def __init__(self, token, serializer, manager=None,
760                 authkey=None, exposed=None, incref=True, manager_owned=False):
761        with BaseProxy._mutex:
762            tls_idset = BaseProxy._address_to_local.get(token.address, None)
763            if tls_idset is None:
764                tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
765                BaseProxy._address_to_local[token.address] = tls_idset
766
767        # self._tls is used to record the connection used by this
768        # thread to communicate with the manager at token.address
769        self._tls = tls_idset[0]
770
771        # self._idset is used to record the identities of all shared
772        # objects for which the current process owns references and
773        # which are in the manager at token.address
774        self._idset = tls_idset[1]
775
776        self._token = token
777        self._id = self._token.id
778        self._manager = manager
779        self._serializer = serializer
780        self._Client = listener_client[serializer][1]
781
782        # Should be set to True only when a proxy object is being created
783        # on the manager server; primary use case: nested proxy objects.
784        # RebuildProxy detects when a proxy is being created on the manager
785        # and sets this value appropriately.
786        self._owned_by_manager = manager_owned
787
788        if authkey is not None:
789            self._authkey = process.AuthenticationString(authkey)
790        elif self._manager is not None:
791            self._authkey = self._manager._authkey
792        else:
793            self._authkey = process.current_process().authkey
794
795        if incref:
796            self._incref()
797
798        util.register_after_fork(self, BaseProxy._after_fork)
799
800    def _connect(self):
801        util.debug('making connection to manager')
802        name = process.current_process().name
803        if threading.current_thread().name != 'MainThread':
804            name += '|' + threading.current_thread().name
805        conn = self._Client(self._token.address, authkey=self._authkey)
806        dispatch(conn, None, 'accept_connection', (name,))
807        self._tls.connection = conn
808
809    def _callmethod(self, methodname, args=(), kwds={}):
810        '''
811        Try to call a method of the referent and return a copy of the result
812        '''
813        try:
814            conn = self._tls.connection
815        except AttributeError:
816            util.debug('thread %r does not own a connection',
817                       threading.current_thread().name)
818            self._connect()
819            conn = self._tls.connection
820
821        conn.send((self._id, methodname, args, kwds))
822        kind, result = conn.recv()
823
824        if kind == '#RETURN':
825            return result
826        elif kind == '#PROXY':
827            exposed, token = result
828            proxytype = self._manager._registry[token.typeid][-1]
829            token.address = self._token.address
830            proxy = proxytype(
831                token, self._serializer, manager=self._manager,
832                authkey=self._authkey, exposed=exposed
833                )
834            conn = self._Client(token.address, authkey=self._authkey)
835            dispatch(conn, None, 'decref', (token.id,))
836            return proxy
837        raise convert_to_error(kind, result)
838
839    def _getvalue(self):
840        '''
841        Get a copy of the value of the referent
842        '''
843        return self._callmethod('#GETVALUE')
844
845    def _incref(self):
846        if self._owned_by_manager:
847            util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
848            return
849
850        conn = self._Client(self._token.address, authkey=self._authkey)
851        dispatch(conn, None, 'incref', (self._id,))
852        util.debug('INCREF %r', self._token.id)
853
854        self._idset.add(self._id)
855
856        state = self._manager and self._manager._state
857
858        self._close = util.Finalize(
859            self, BaseProxy._decref,
860            args=(self._token, self._authkey, state,
861                  self._tls, self._idset, self._Client),
862            exitpriority=10
863            )
864
865    @staticmethod
866    def _decref(token, authkey, state, tls, idset, _Client):
867        idset.discard(token.id)
868
869        # check whether manager is still alive
870        if state is None or state.value == State.STARTED:
871            # tell manager this process no longer cares about referent
872            try:
873                util.debug('DECREF %r', token.id)
874                conn = _Client(token.address, authkey=authkey)
875                dispatch(conn, None, 'decref', (token.id,))
876            except Exception as e:
877                util.debug('... decref failed %s', e)
878
879        else:
880            util.debug('DECREF %r -- manager already shutdown', token.id)
881
882        # check whether we can close this thread's connection because
883        # the process owns no more references to objects for this manager
884        if not idset and hasattr(tls, 'connection'):
885            util.debug('thread %r has no more proxies so closing conn',
886                       threading.current_thread().name)
887            tls.connection.close()
888            del tls.connection
889
890    def _after_fork(self):
891        self._manager = None
892        try:
893            self._incref()
894        except Exception as e:
895            # the proxy may just be for a manager which has shutdown
896            util.info('incref failed: %s' % e)
897
898    def __reduce__(self):
899        kwds = {}
900        if get_spawning_popen() is not None:
901            kwds['authkey'] = self._authkey
902
903        if getattr(self, '_isauto', False):
904            kwds['exposed'] = self._exposed_
905            return (RebuildProxy,
906                    (AutoProxy, self._token, self._serializer, kwds))
907        else:
908            return (RebuildProxy,
909                    (type(self), self._token, self._serializer, kwds))
910
911    def __deepcopy__(self, memo):
912        return self._getvalue()
913
914    def __repr__(self):
915        return '<%s object, typeid %r at %#x>' % \
916               (type(self).__name__, self._token.typeid, id(self))
917
918    def __str__(self):
919        '''
920        Return representation of the referent (or a fall-back if that fails)
921        '''
922        try:
923            return self._callmethod('__repr__')
924        except Exception:
925            return repr(self)[:-1] + "; '__str__()' failed>"
926
927#
928# Function used for unpickling
929#
930
931def RebuildProxy(func, token, serializer, kwds):
932    '''
933    Function used for unpickling proxy objects.
934    '''
935    server = getattr(process.current_process(), '_manager_server', None)
936    if server and server.address == token.address:
937        util.debug('Rebuild a proxy owned by manager, token=%r', token)
938        kwds['manager_owned'] = True
939        if token.id not in server.id_to_local_proxy_obj:
940            server.id_to_local_proxy_obj[token.id] = \
941                server.id_to_obj[token.id]
942    incref = (
943        kwds.pop('incref', True) and
944        not getattr(process.current_process(), '_inheriting', False)
945        )
946    return func(token, serializer, incref=incref, **kwds)
947
948#
949# Functions to create proxies and proxy types
950#
951
952def MakeProxyType(name, exposed, _cache={}):
953    '''
954    Return a proxy type whose methods are given by `exposed`
955    '''
956    exposed = tuple(exposed)
957    try:
958        return _cache[(name, exposed)]
959    except KeyError:
960        pass
961
962    dic = {}
963
964    for meth in exposed:
965        exec('''def %s(self, /, *args, **kwds):
966        return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
967
968    ProxyType = type(name, (BaseProxy,), dic)
969    ProxyType._exposed_ = exposed
970    _cache[(name, exposed)] = ProxyType
971    return ProxyType
972
973
974def AutoProxy(token, serializer, manager=None, authkey=None,
975              exposed=None, incref=True, manager_owned=False):
976    '''
977    Return an auto-proxy for `token`
978    '''
979    _Client = listener_client[serializer][1]
980
981    if exposed is None:
982        conn = _Client(token.address, authkey=authkey)
983        try:
984            exposed = dispatch(conn, None, 'get_methods', (token,))
985        finally:
986            conn.close()
987
988    if authkey is None and manager is not None:
989        authkey = manager._authkey
990    if authkey is None:
991        authkey = process.current_process().authkey
992
993    ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
994    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
995                      incref=incref, manager_owned=manager_owned)
996    proxy._isauto = True
997    return proxy
998
999#
1000# Types/callables which we will register with SyncManager
1001#
1002
1003class Namespace(object):
1004    def __init__(self, /, **kwds):
1005        self.__dict__.update(kwds)
1006    def __repr__(self):
1007        items = list(self.__dict__.items())
1008        temp = []
1009        for name, value in items:
1010            if not name.startswith('_'):
1011                temp.append('%s=%r' % (name, value))
1012        temp.sort()
1013        return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
1014
1015class Value(object):
1016    def __init__(self, typecode, value, lock=True):
1017        self._typecode = typecode
1018        self._value = value
1019    def get(self):
1020        return self._value
1021    def set(self, value):
1022        self._value = value
1023    def __repr__(self):
1024        return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1025    value = property(get, set)
1026
1027def Array(typecode, sequence, lock=True):
1028    return array.array(typecode, sequence)
1029
1030#
1031# Proxy types used by SyncManager
1032#
1033
1034class IteratorProxy(BaseProxy):
1035    _exposed_ = ('__next__', 'send', 'throw', 'close')
1036    def __iter__(self):
1037        return self
1038    def __next__(self, *args):
1039        return self._callmethod('__next__', args)
1040    def send(self, *args):
1041        return self._callmethod('send', args)
1042    def throw(self, *args):
1043        return self._callmethod('throw', args)
1044    def close(self, *args):
1045        return self._callmethod('close', args)
1046
1047
1048class AcquirerProxy(BaseProxy):
1049    _exposed_ = ('acquire', 'release')
1050    def acquire(self, blocking=True, timeout=None):
1051        args = (blocking,) if timeout is None else (blocking, timeout)
1052        return self._callmethod('acquire', args)
1053    def release(self):
1054        return self._callmethod('release')
1055    def __enter__(self):
1056        return self._callmethod('acquire')
1057    def __exit__(self, exc_type, exc_val, exc_tb):
1058        return self._callmethod('release')
1059
1060
1061class ConditionProxy(AcquirerProxy):
1062    _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
1063    def wait(self, timeout=None):
1064        return self._callmethod('wait', (timeout,))
1065    def notify(self, n=1):
1066        return self._callmethod('notify', (n,))
1067    def notify_all(self):
1068        return self._callmethod('notify_all')
1069    def wait_for(self, predicate, timeout=None):
1070        result = predicate()
1071        if result:
1072            return result
1073        if timeout is not None:
1074            endtime = time.monotonic() + timeout
1075        else:
1076            endtime = None
1077            waittime = None
1078        while not result:
1079            if endtime is not None:
1080                waittime = endtime - time.monotonic()
1081                if waittime <= 0:
1082                    break
1083            self.wait(waittime)
1084            result = predicate()
1085        return result
1086
1087
1088class EventProxy(BaseProxy):
1089    _exposed_ = ('is_set', 'set', 'clear', 'wait')
1090    def is_set(self):
1091        return self._callmethod('is_set')
1092    def set(self):
1093        return self._callmethod('set')
1094    def clear(self):
1095        return self._callmethod('clear')
1096    def wait(self, timeout=None):
1097        return self._callmethod('wait', (timeout,))
1098
1099
1100class BarrierProxy(BaseProxy):
1101    _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1102    def wait(self, timeout=None):
1103        return self._callmethod('wait', (timeout,))
1104    def abort(self):
1105        return self._callmethod('abort')
1106    def reset(self):
1107        return self._callmethod('reset')
1108    @property
1109    def parties(self):
1110        return self._callmethod('__getattribute__', ('parties',))
1111    @property
1112    def n_waiting(self):
1113        return self._callmethod('__getattribute__', ('n_waiting',))
1114    @property
1115    def broken(self):
1116        return self._callmethod('__getattribute__', ('broken',))
1117
1118
1119class NamespaceProxy(BaseProxy):
1120    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1121    def __getattr__(self, key):
1122        if key[0] == '_':
1123            return object.__getattribute__(self, key)
1124        callmethod = object.__getattribute__(self, '_callmethod')
1125        return callmethod('__getattribute__', (key,))
1126    def __setattr__(self, key, value):
1127        if key[0] == '_':
1128            return object.__setattr__(self, key, value)
1129        callmethod = object.__getattribute__(self, '_callmethod')
1130        return callmethod('__setattr__', (key, value))
1131    def __delattr__(self, key):
1132        if key[0] == '_':
1133            return object.__delattr__(self, key)
1134        callmethod = object.__getattribute__(self, '_callmethod')
1135        return callmethod('__delattr__', (key,))
1136
1137
1138class ValueProxy(BaseProxy):
1139    _exposed_ = ('get', 'set')
1140    def get(self):
1141        return self._callmethod('get')
1142    def set(self, value):
1143        return self._callmethod('set', (value,))
1144    value = property(get, set)
1145
1146    __class_getitem__ = classmethod(types.GenericAlias)
1147
1148
1149BaseListProxy = MakeProxyType('BaseListProxy', (
1150    '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1151    '__mul__', '__reversed__', '__rmul__', '__setitem__',
1152    'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1153    'reverse', 'sort', '__imul__'
1154    ))
1155class ListProxy(BaseListProxy):
1156    def __iadd__(self, value):
1157        self._callmethod('extend', (value,))
1158        return self
1159    def __imul__(self, value):
1160        self._callmethod('__imul__', (value,))
1161        return self
1162
1163
1164DictProxy = MakeProxyType('DictProxy', (
1165    '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
1166    '__setitem__', 'clear', 'copy', 'get', 'items',
1167    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1168    ))
1169DictProxy._method_to_typeid_ = {
1170    '__iter__': 'Iterator',
1171    }
1172
1173
1174ArrayProxy = MakeProxyType('ArrayProxy', (
1175    '__len__', '__getitem__', '__setitem__'
1176    ))
1177
1178
1179BasePoolProxy = MakeProxyType('PoolProxy', (
1180    'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1181    'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
1182    ))
1183BasePoolProxy._method_to_typeid_ = {
1184    'apply_async': 'AsyncResult',
1185    'map_async': 'AsyncResult',
1186    'starmap_async': 'AsyncResult',
1187    'imap': 'Iterator',
1188    'imap_unordered': 'Iterator'
1189    }
1190class PoolProxy(BasePoolProxy):
1191    def __enter__(self):
1192        return self
1193    def __exit__(self, exc_type, exc_val, exc_tb):
1194        self.terminate()
1195
1196#
1197# Definition of SyncManager
1198#
1199
1200class SyncManager(BaseManager):
1201    '''
1202    Subclass of `BaseManager` which supports a number of shared object types.
1203
1204    The types registered are those intended for the synchronization
1205    of threads, plus `dict`, `list` and `Namespace`.
1206
1207    The `multiprocessing.Manager()` function creates started instances of
1208    this class.
1209    '''
1210
1211SyncManager.register('Queue', queue.Queue)
1212SyncManager.register('JoinableQueue', queue.Queue)
1213SyncManager.register('Event', threading.Event, EventProxy)
1214SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1215SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1216SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1217SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1218                     AcquirerProxy)
1219SyncManager.register('Condition', threading.Condition, ConditionProxy)
1220SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
1221SyncManager.register('Pool', pool.Pool, PoolProxy)
1222SyncManager.register('list', list, ListProxy)
1223SyncManager.register('dict', dict, DictProxy)
1224SyncManager.register('Value', Value, ValueProxy)
1225SyncManager.register('Array', Array, ArrayProxy)
1226SyncManager.register('Namespace', Namespace, NamespaceProxy)
1227
1228# types returned by methods of PoolProxy
1229SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1230SyncManager.register('AsyncResult', create_method=False)
1231
1232#
1233# Definition of SharedMemoryManager and SharedMemoryServer
1234#
1235
1236if HAS_SHMEM:
1237    class _SharedMemoryTracker:
1238        "Manages one or more shared memory segments."
1239
1240        def __init__(self, name, segment_names=[]):
1241            self.shared_memory_context_name = name
1242            self.segment_names = segment_names
1243
1244        def register_segment(self, segment_name):
1245            "Adds the supplied shared memory block name to tracker."
1246            util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1247            self.segment_names.append(segment_name)
1248
1249        def destroy_segment(self, segment_name):
1250            """Calls unlink() on the shared memory block with the supplied name
1251            and removes it from the list of blocks being tracked."""
1252            util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1253            self.segment_names.remove(segment_name)
1254            segment = shared_memory.SharedMemory(segment_name)
1255            segment.close()
1256            segment.unlink()
1257
1258        def unlink(self):
1259            "Calls destroy_segment() on all tracked shared memory blocks."
1260            for segment_name in self.segment_names[:]:
1261                self.destroy_segment(segment_name)
1262
1263        def __del__(self):
1264            util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1265            self.unlink()
1266
1267        def __getstate__(self):
1268            return (self.shared_memory_context_name, self.segment_names)
1269
1270        def __setstate__(self, state):
1271            self.__init__(*state)
1272
1273
1274    class SharedMemoryServer(Server):
1275
1276        public = Server.public + \
1277                 ['track_segment', 'release_segment', 'list_segments']
1278
1279        def __init__(self, *args, **kwargs):
1280            Server.__init__(self, *args, **kwargs)
1281            address = self.address
1282            # The address of Linux abstract namespaces can be bytes
1283            if isinstance(address, bytes):
1284                address = os.fsdecode(address)
1285            self.shared_memory_context = \
1286                _SharedMemoryTracker(f"shm_{address}_{getpid()}")
1287            util.debug(f"SharedMemoryServer started by pid {getpid()}")
1288
1289        def create(self, c, typeid, /, *args, **kwargs):
1290            """Create a new distributed-shared object (not backed by a shared
1291            memory block) and return its id to be used in a Proxy Object."""
1292            # Unless set up as a shared proxy, don't make shared_memory_context
1293            # a standard part of kwargs.  This makes things easier for supplying
1294            # simple functions.
1295            if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1296                kwargs['shared_memory_context'] = self.shared_memory_context
1297            return Server.create(self, c, typeid, *args, **kwargs)
1298
1299        def shutdown(self, c):
1300            "Call unlink() on all tracked shared memory, terminate the Server."
1301            self.shared_memory_context.unlink()
1302            return Server.shutdown(self, c)
1303
1304        def track_segment(self, c, segment_name):
1305            "Adds the supplied shared memory block name to Server's tracker."
1306            self.shared_memory_context.register_segment(segment_name)
1307
1308        def release_segment(self, c, segment_name):
1309            """Calls unlink() on the shared memory block with the supplied name
1310            and removes it from the tracker instance inside the Server."""
1311            self.shared_memory_context.destroy_segment(segment_name)
1312
1313        def list_segments(self, c):
1314            """Returns a list of names of shared memory blocks that the Server
1315            is currently tracking."""
1316            return self.shared_memory_context.segment_names
1317
1318
1319    class SharedMemoryManager(BaseManager):
1320        """Like SyncManager but uses SharedMemoryServer instead of Server.
1321
1322        It provides methods for creating and returning SharedMemory instances
1323        and for creating a list-like object (ShareableList) backed by shared
1324        memory.  It also provides methods that create and return Proxy Objects
1325        that support synchronization across processes (i.e. multi-process-safe
1326        locks and semaphores).
1327        """
1328
1329        _Server = SharedMemoryServer
1330
1331        def __init__(self, *args, **kwargs):
1332            if os.name == "posix":
1333                # bpo-36867: Ensure the resource_tracker is running before
1334                # launching the manager process, so that concurrent
1335                # shared_memory manipulation both in the manager and in the
1336                # current process does not create two resource_tracker
1337                # processes.
1338                from . import resource_tracker
1339                resource_tracker.ensure_running()
1340            BaseManager.__init__(self, *args, **kwargs)
1341            util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1342
1343        def __del__(self):
1344            util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1345
1346        def get_server(self):
1347            'Better than monkeypatching for now; merge into Server ultimately'
1348            if self._state.value != State.INITIAL:
1349                if self._state.value == State.STARTED:
1350                    raise ProcessError("Already started SharedMemoryServer")
1351                elif self._state.value == State.SHUTDOWN:
1352                    raise ProcessError("SharedMemoryManager has shut down")
1353                else:
1354                    raise ProcessError(
1355                        "Unknown state {!r}".format(self._state.value))
1356            return self._Server(self._registry, self._address,
1357                                self._authkey, self._serializer)
1358
1359        def SharedMemory(self, size):
1360            """Returns a new SharedMemory instance with the specified size in
1361            bytes, to be tracked by the manager."""
1362            with self._Client(self._address, authkey=self._authkey) as conn:
1363                sms = shared_memory.SharedMemory(None, create=True, size=size)
1364                try:
1365                    dispatch(conn, None, 'track_segment', (sms.name,))
1366                except BaseException as e:
1367                    sms.unlink()
1368                    raise e
1369            return sms
1370
1371        def ShareableList(self, sequence):
1372            """Returns a new ShareableList instance populated with the values
1373            from the input sequence, to be tracked by the manager."""
1374            with self._Client(self._address, authkey=self._authkey) as conn:
1375                sl = shared_memory.ShareableList(sequence)
1376                try:
1377                    dispatch(conn, None, 'track_segment', (sl.shm.name,))
1378                except BaseException as e:
1379                    sl.shm.unlink()
1380                    raise e
1381            return sl
1382