1*cda5da8dSAndroid Build Coastguard Workerfrom _weakrefset import WeakSet 2*cda5da8dSAndroid Build Coastguard Worker 3*cda5da8dSAndroid Build Coastguard Worker 4*cda5da8dSAndroid Build Coastguard Workerdef get_cache_token(): 5*cda5da8dSAndroid Build Coastguard Worker """Returns the current ABC cache token. 6*cda5da8dSAndroid Build Coastguard Worker 7*cda5da8dSAndroid Build Coastguard Worker The token is an opaque object (supporting equality testing) identifying the 8*cda5da8dSAndroid Build Coastguard Worker current version of the ABC cache for virtual subclasses. The token changes 9*cda5da8dSAndroid Build Coastguard Worker with every call to ``register()`` on any ABC. 10*cda5da8dSAndroid Build Coastguard Worker """ 11*cda5da8dSAndroid Build Coastguard Worker return ABCMeta._abc_invalidation_counter 12*cda5da8dSAndroid Build Coastguard Worker 13*cda5da8dSAndroid Build Coastguard Worker 14*cda5da8dSAndroid Build Coastguard Workerclass ABCMeta(type): 15*cda5da8dSAndroid Build Coastguard Worker """Metaclass for defining Abstract Base Classes (ABCs). 16*cda5da8dSAndroid Build Coastguard Worker 17*cda5da8dSAndroid Build Coastguard Worker Use this metaclass to create an ABC. An ABC can be subclassed 18*cda5da8dSAndroid Build Coastguard Worker directly, and then acts as a mix-in class. You can also register 19*cda5da8dSAndroid Build Coastguard Worker unrelated concrete classes (even built-in classes) and unrelated 20*cda5da8dSAndroid Build Coastguard Worker ABCs as 'virtual subclasses' -- these and their descendants will 21*cda5da8dSAndroid Build Coastguard Worker be considered subclasses of the registering ABC by the built-in 22*cda5da8dSAndroid Build Coastguard Worker issubclass() function, but the registering ABC won't show up in 23*cda5da8dSAndroid Build Coastguard Worker their MRO (Method Resolution Order) nor will method 24*cda5da8dSAndroid Build Coastguard Worker implementations defined by the registering ABC be callable (not 25*cda5da8dSAndroid Build Coastguard Worker even via super()). 26*cda5da8dSAndroid Build Coastguard Worker """ 27*cda5da8dSAndroid Build Coastguard Worker 28*cda5da8dSAndroid Build Coastguard Worker # A global counter that is incremented each time a class is 29*cda5da8dSAndroid Build Coastguard Worker # registered as a virtual subclass of anything. It forces the 30*cda5da8dSAndroid Build Coastguard Worker # negative cache to be cleared before its next use. 31*cda5da8dSAndroid Build Coastguard Worker # Note: this counter is private. Use `abc.get_cache_token()` for 32*cda5da8dSAndroid Build Coastguard Worker # external code. 33*cda5da8dSAndroid Build Coastguard Worker _abc_invalidation_counter = 0 34*cda5da8dSAndroid Build Coastguard Worker 35*cda5da8dSAndroid Build Coastguard Worker def __new__(mcls, name, bases, namespace, /, **kwargs): 36*cda5da8dSAndroid Build Coastguard Worker cls = super().__new__(mcls, name, bases, namespace, **kwargs) 37*cda5da8dSAndroid Build Coastguard Worker # Compute set of abstract method names 38*cda5da8dSAndroid Build Coastguard Worker abstracts = {name 39*cda5da8dSAndroid Build Coastguard Worker for name, value in namespace.items() 40*cda5da8dSAndroid Build Coastguard Worker if getattr(value, "__isabstractmethod__", False)} 41*cda5da8dSAndroid Build Coastguard Worker for base in bases: 42*cda5da8dSAndroid Build Coastguard Worker for name in getattr(base, "__abstractmethods__", set()): 43*cda5da8dSAndroid Build Coastguard Worker value = getattr(cls, name, None) 44*cda5da8dSAndroid Build Coastguard Worker if getattr(value, "__isabstractmethod__", False): 45*cda5da8dSAndroid Build Coastguard Worker abstracts.add(name) 46*cda5da8dSAndroid Build Coastguard Worker cls.__abstractmethods__ = frozenset(abstracts) 47*cda5da8dSAndroid Build Coastguard Worker # Set up inheritance registry 48*cda5da8dSAndroid Build Coastguard Worker cls._abc_registry = WeakSet() 49*cda5da8dSAndroid Build Coastguard Worker cls._abc_cache = WeakSet() 50*cda5da8dSAndroid Build Coastguard Worker cls._abc_negative_cache = WeakSet() 51*cda5da8dSAndroid Build Coastguard Worker cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter 52*cda5da8dSAndroid Build Coastguard Worker return cls 53*cda5da8dSAndroid Build Coastguard Worker 54*cda5da8dSAndroid Build Coastguard Worker def register(cls, subclass): 55*cda5da8dSAndroid Build Coastguard Worker """Register a virtual subclass of an ABC. 56*cda5da8dSAndroid Build Coastguard Worker 57*cda5da8dSAndroid Build Coastguard Worker Returns the subclass, to allow usage as a class decorator. 58*cda5da8dSAndroid Build Coastguard Worker """ 59*cda5da8dSAndroid Build Coastguard Worker if not isinstance(subclass, type): 60*cda5da8dSAndroid Build Coastguard Worker raise TypeError("Can only register classes") 61*cda5da8dSAndroid Build Coastguard Worker if issubclass(subclass, cls): 62*cda5da8dSAndroid Build Coastguard Worker return subclass # Already a subclass 63*cda5da8dSAndroid Build Coastguard Worker # Subtle: test for cycles *after* testing for "already a subclass"; 64*cda5da8dSAndroid Build Coastguard Worker # this means we allow X.register(X) and interpret it as a no-op. 65*cda5da8dSAndroid Build Coastguard Worker if issubclass(cls, subclass): 66*cda5da8dSAndroid Build Coastguard Worker # This would create a cycle, which is bad for the algorithm below 67*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError("Refusing to create an inheritance cycle") 68*cda5da8dSAndroid Build Coastguard Worker cls._abc_registry.add(subclass) 69*cda5da8dSAndroid Build Coastguard Worker ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache 70*cda5da8dSAndroid Build Coastguard Worker return subclass 71*cda5da8dSAndroid Build Coastguard Worker 72*cda5da8dSAndroid Build Coastguard Worker def _dump_registry(cls, file=None): 73*cda5da8dSAndroid Build Coastguard Worker """Debug helper to print the ABC registry.""" 74*cda5da8dSAndroid Build Coastguard Worker print(f"Class: {cls.__module__}.{cls.__qualname__}", file=file) 75*cda5da8dSAndroid Build Coastguard Worker print(f"Inv. counter: {get_cache_token()}", file=file) 76*cda5da8dSAndroid Build Coastguard Worker for name in cls.__dict__: 77*cda5da8dSAndroid Build Coastguard Worker if name.startswith("_abc_"): 78*cda5da8dSAndroid Build Coastguard Worker value = getattr(cls, name) 79*cda5da8dSAndroid Build Coastguard Worker if isinstance(value, WeakSet): 80*cda5da8dSAndroid Build Coastguard Worker value = set(value) 81*cda5da8dSAndroid Build Coastguard Worker print(f"{name}: {value!r}", file=file) 82*cda5da8dSAndroid Build Coastguard Worker 83*cda5da8dSAndroid Build Coastguard Worker def _abc_registry_clear(cls): 84*cda5da8dSAndroid Build Coastguard Worker """Clear the registry (for debugging or testing).""" 85*cda5da8dSAndroid Build Coastguard Worker cls._abc_registry.clear() 86*cda5da8dSAndroid Build Coastguard Worker 87*cda5da8dSAndroid Build Coastguard Worker def _abc_caches_clear(cls): 88*cda5da8dSAndroid Build Coastguard Worker """Clear the caches (for debugging or testing).""" 89*cda5da8dSAndroid Build Coastguard Worker cls._abc_cache.clear() 90*cda5da8dSAndroid Build Coastguard Worker cls._abc_negative_cache.clear() 91*cda5da8dSAndroid Build Coastguard Worker 92*cda5da8dSAndroid Build Coastguard Worker def __instancecheck__(cls, instance): 93*cda5da8dSAndroid Build Coastguard Worker """Override for isinstance(instance, cls).""" 94*cda5da8dSAndroid Build Coastguard Worker # Inline the cache checking 95*cda5da8dSAndroid Build Coastguard Worker subclass = instance.__class__ 96*cda5da8dSAndroid Build Coastguard Worker if subclass in cls._abc_cache: 97*cda5da8dSAndroid Build Coastguard Worker return True 98*cda5da8dSAndroid Build Coastguard Worker subtype = type(instance) 99*cda5da8dSAndroid Build Coastguard Worker if subtype is subclass: 100*cda5da8dSAndroid Build Coastguard Worker if (cls._abc_negative_cache_version == 101*cda5da8dSAndroid Build Coastguard Worker ABCMeta._abc_invalidation_counter and 102*cda5da8dSAndroid Build Coastguard Worker subclass in cls._abc_negative_cache): 103*cda5da8dSAndroid Build Coastguard Worker return False 104*cda5da8dSAndroid Build Coastguard Worker # Fall back to the subclass check. 105*cda5da8dSAndroid Build Coastguard Worker return cls.__subclasscheck__(subclass) 106*cda5da8dSAndroid Build Coastguard Worker return any(cls.__subclasscheck__(c) for c in (subclass, subtype)) 107*cda5da8dSAndroid Build Coastguard Worker 108*cda5da8dSAndroid Build Coastguard Worker def __subclasscheck__(cls, subclass): 109*cda5da8dSAndroid Build Coastguard Worker """Override for issubclass(subclass, cls).""" 110*cda5da8dSAndroid Build Coastguard Worker if not isinstance(subclass, type): 111*cda5da8dSAndroid Build Coastguard Worker raise TypeError('issubclass() arg 1 must be a class') 112*cda5da8dSAndroid Build Coastguard Worker # Check cache 113*cda5da8dSAndroid Build Coastguard Worker if subclass in cls._abc_cache: 114*cda5da8dSAndroid Build Coastguard Worker return True 115*cda5da8dSAndroid Build Coastguard Worker # Check negative cache; may have to invalidate 116*cda5da8dSAndroid Build Coastguard Worker if cls._abc_negative_cache_version < ABCMeta._abc_invalidation_counter: 117*cda5da8dSAndroid Build Coastguard Worker # Invalidate the negative cache 118*cda5da8dSAndroid Build Coastguard Worker cls._abc_negative_cache = WeakSet() 119*cda5da8dSAndroid Build Coastguard Worker cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter 120*cda5da8dSAndroid Build Coastguard Worker elif subclass in cls._abc_negative_cache: 121*cda5da8dSAndroid Build Coastguard Worker return False 122*cda5da8dSAndroid Build Coastguard Worker # Check the subclass hook 123*cda5da8dSAndroid Build Coastguard Worker ok = cls.__subclasshook__(subclass) 124*cda5da8dSAndroid Build Coastguard Worker if ok is not NotImplemented: 125*cda5da8dSAndroid Build Coastguard Worker assert isinstance(ok, bool) 126*cda5da8dSAndroid Build Coastguard Worker if ok: 127*cda5da8dSAndroid Build Coastguard Worker cls._abc_cache.add(subclass) 128*cda5da8dSAndroid Build Coastguard Worker else: 129*cda5da8dSAndroid Build Coastguard Worker cls._abc_negative_cache.add(subclass) 130*cda5da8dSAndroid Build Coastguard Worker return ok 131*cda5da8dSAndroid Build Coastguard Worker # Check if it's a direct subclass 132*cda5da8dSAndroid Build Coastguard Worker if cls in getattr(subclass, '__mro__', ()): 133*cda5da8dSAndroid Build Coastguard Worker cls._abc_cache.add(subclass) 134*cda5da8dSAndroid Build Coastguard Worker return True 135*cda5da8dSAndroid Build Coastguard Worker # Check if it's a subclass of a registered class (recursive) 136*cda5da8dSAndroid Build Coastguard Worker for rcls in cls._abc_registry: 137*cda5da8dSAndroid Build Coastguard Worker if issubclass(subclass, rcls): 138*cda5da8dSAndroid Build Coastguard Worker cls._abc_cache.add(subclass) 139*cda5da8dSAndroid Build Coastguard Worker return True 140*cda5da8dSAndroid Build Coastguard Worker # Check if it's a subclass of a subclass (recursive) 141*cda5da8dSAndroid Build Coastguard Worker for scls in cls.__subclasses__(): 142*cda5da8dSAndroid Build Coastguard Worker if issubclass(subclass, scls): 143*cda5da8dSAndroid Build Coastguard Worker cls._abc_cache.add(subclass) 144*cda5da8dSAndroid Build Coastguard Worker return True 145*cda5da8dSAndroid Build Coastguard Worker # No dice; update negative cache 146*cda5da8dSAndroid Build Coastguard Worker cls._abc_negative_cache.add(subclass) 147*cda5da8dSAndroid Build Coastguard Worker return False 148