1import collections 2import operator 3import pathlib 4import zipfile 5 6from . import abc 7 8from ._itertools import unique_everseen 9 10 11def remove_duplicates(items): 12 return iter(collections.OrderedDict.fromkeys(items)) 13 14 15class FileReader(abc.TraversableResources): 16 def __init__(self, loader): 17 self.path = pathlib.Path(loader.path).parent 18 19 def resource_path(self, resource): 20 """ 21 Return the file system path to prevent 22 `resources.path()` from creating a temporary 23 copy. 24 """ 25 return str(self.path.joinpath(resource)) 26 27 def files(self): 28 return self.path 29 30 31class ZipReader(abc.TraversableResources): 32 def __init__(self, loader, module): 33 _, _, name = module.rpartition('.') 34 self.prefix = loader.prefix.replace('\\', '/') + name + '/' 35 self.archive = loader.archive 36 37 def open_resource(self, resource): 38 try: 39 return super().open_resource(resource) 40 except KeyError as exc: 41 raise FileNotFoundError(exc.args[0]) 42 43 def is_resource(self, path): 44 # workaround for `zipfile.Path.is_file` returning true 45 # for non-existent paths. 46 target = self.files().joinpath(path) 47 return target.is_file() and target.exists() 48 49 def files(self): 50 return zipfile.Path(self.archive, self.prefix) 51 52 53class MultiplexedPath(abc.Traversable): 54 """ 55 Given a series of Traversable objects, implement a merged 56 version of the interface across all objects. Useful for 57 namespace packages which may be multihomed at a single 58 name. 59 """ 60 61 def __init__(self, *paths): 62 self._paths = list(map(pathlib.Path, remove_duplicates(paths))) 63 if not self._paths: 64 message = 'MultiplexedPath must contain at least one path' 65 raise FileNotFoundError(message) 66 if not all(path.is_dir() for path in self._paths): 67 raise NotADirectoryError('MultiplexedPath only supports directories') 68 69 def iterdir(self): 70 files = (file for path in self._paths for file in path.iterdir()) 71 return unique_everseen(files, key=operator.attrgetter('name')) 72 73 def read_bytes(self): 74 raise FileNotFoundError(f'{self} is not a file') 75 76 def read_text(self, *args, **kwargs): 77 raise FileNotFoundError(f'{self} is not a file') 78 79 def is_dir(self): 80 return True 81 82 def is_file(self): 83 return False 84 85 def joinpath(self, child): 86 # first try to find child in current paths 87 for file in self.iterdir(): 88 if file.name == child: 89 return file 90 # if it does not exist, construct it with the first path 91 return self._paths[0] / child 92 93 __truediv__ = joinpath 94 95 def open(self, *args, **kwargs): 96 raise FileNotFoundError(f'{self} is not a file') 97 98 @property 99 def name(self): 100 return self._paths[0].name 101 102 def __repr__(self): 103 paths = ', '.join(f"'{path}'" for path in self._paths) 104 return f'MultiplexedPath({paths})' 105 106 107class NamespaceReader(abc.TraversableResources): 108 def __init__(self, namespace_path): 109 if 'NamespacePath' not in str(namespace_path): 110 raise ValueError('Invalid path') 111 self.path = MultiplexedPath(*list(namespace_path)) 112 113 def resource_path(self, resource): 114 """ 115 Return the file system path to prevent 116 `resources.path()` from creating a temporary 117 copy. 118 """ 119 return str(self.path.joinpath(resource)) 120 121 def files(self): 122 return self.path 123