1import io 2import posixpath 3import zipfile 4import itertools 5import contextlib 6import sys 7import pathlib 8 9if sys.version_info < (3, 7): 10 from collections import OrderedDict 11else: 12 OrderedDict = dict 13 14 15__all__ = ['Path'] 16 17 18def _parents(path): 19 """ 20 Given a path with elements separated by 21 posixpath.sep, generate all parents of that path. 22 23 >>> list(_parents('b/d')) 24 ['b'] 25 >>> list(_parents('/b/d/')) 26 ['/b'] 27 >>> list(_parents('b/d/f/')) 28 ['b/d', 'b'] 29 >>> list(_parents('b')) 30 [] 31 >>> list(_parents('')) 32 [] 33 """ 34 return itertools.islice(_ancestry(path), 1, None) 35 36 37def _ancestry(path): 38 """ 39 Given a path with elements separated by 40 posixpath.sep, generate all elements of that path 41 42 >>> list(_ancestry('b/d')) 43 ['b/d', 'b'] 44 >>> list(_ancestry('/b/d/')) 45 ['/b/d', '/b'] 46 >>> list(_ancestry('b/d/f/')) 47 ['b/d/f', 'b/d', 'b'] 48 >>> list(_ancestry('b')) 49 ['b'] 50 >>> list(_ancestry('')) 51 [] 52 """ 53 path = path.rstrip(posixpath.sep) 54 while path and path != posixpath.sep: 55 yield path 56 path, tail = posixpath.split(path) 57 58 59_dedupe = OrderedDict.fromkeys 60"""Deduplicate an iterable in original order""" 61 62 63def _difference(minuend, subtrahend): 64 """ 65 Return items in minuend not in subtrahend, retaining order 66 with O(1) lookup. 67 """ 68 return itertools.filterfalse(set(subtrahend).__contains__, minuend) 69 70 71class CompleteDirs(zipfile.ZipFile): 72 """ 73 A ZipFile subclass that ensures that implied directories 74 are always included in the namelist. 75 """ 76 77 @staticmethod 78 def _implied_dirs(names): 79 parents = itertools.chain.from_iterable(map(_parents, names)) 80 as_dirs = (p + posixpath.sep for p in parents) 81 return _dedupe(_difference(as_dirs, names)) 82 83 def namelist(self): 84 names = super(CompleteDirs, self).namelist() 85 return names + list(self._implied_dirs(names)) 86 87 def _name_set(self): 88 return set(self.namelist()) 89 90 def resolve_dir(self, name): 91 """ 92 If the name represents a directory, return that name 93 as a directory (with the trailing slash). 94 """ 95 names = self._name_set() 96 dirname = name + '/' 97 dir_match = name not in names and dirname in names 98 return dirname if dir_match else name 99 100 @classmethod 101 def make(cls, source): 102 """ 103 Given a source (filename or zipfile), return an 104 appropriate CompleteDirs subclass. 105 """ 106 if isinstance(source, CompleteDirs): 107 return source 108 109 if not isinstance(source, zipfile.ZipFile): 110 return cls(_pathlib_compat(source)) 111 112 # Only allow for FastLookup when supplied zipfile is read-only 113 if 'r' not in source.mode: 114 cls = CompleteDirs 115 116 source.__class__ = cls 117 return source 118 119 120class FastLookup(CompleteDirs): 121 """ 122 ZipFile subclass to ensure implicit 123 dirs exist and are resolved rapidly. 124 """ 125 126 def namelist(self): 127 with contextlib.suppress(AttributeError): 128 return self.__names 129 self.__names = super(FastLookup, self).namelist() 130 return self.__names 131 132 def _name_set(self): 133 with contextlib.suppress(AttributeError): 134 return self.__lookup 135 self.__lookup = super(FastLookup, self)._name_set() 136 return self.__lookup 137 138 139def _pathlib_compat(path): 140 """ 141 For path-like objects, convert to a filename for compatibility 142 on Python 3.6.1 and earlier. 143 """ 144 try: 145 return path.__fspath__() 146 except AttributeError: 147 return str(path) 148 149 150class Path: 151 """ 152 A pathlib-compatible interface for zip files. 153 154 Consider a zip file with this structure:: 155 156 . 157 ├── a.txt 158 └── b 159 ├── c.txt 160 └── d 161 └── e.txt 162 163 >>> data = io.BytesIO() 164 >>> zf = zipfile.ZipFile(data, 'w') 165 >>> zf.writestr('a.txt', 'content of a') 166 >>> zf.writestr('b/c.txt', 'content of c') 167 >>> zf.writestr('b/d/e.txt', 'content of e') 168 >>> zf.filename = 'mem/abcde.zip' 169 170 Path accepts the zipfile object itself or a filename 171 172 >>> root = Path(zf) 173 174 From there, several path operations are available. 175 176 Directory iteration (including the zip file itself): 177 178 >>> a, b = root.iterdir() 179 >>> a 180 Path('mem/abcde.zip', 'a.txt') 181 >>> b 182 Path('mem/abcde.zip', 'b/') 183 184 name property: 185 186 >>> b.name 187 'b' 188 189 join with divide operator: 190 191 >>> c = b / 'c.txt' 192 >>> c 193 Path('mem/abcde.zip', 'b/c.txt') 194 >>> c.name 195 'c.txt' 196 197 Read text: 198 199 >>> c.read_text() 200 'content of c' 201 202 existence: 203 204 >>> c.exists() 205 True 206 >>> (b / 'missing.txt').exists() 207 False 208 209 Coercion to string: 210 211 >>> import os 212 >>> str(c).replace(os.sep, posixpath.sep) 213 'mem/abcde.zip/b/c.txt' 214 215 At the root, ``name``, ``filename``, and ``parent`` 216 resolve to the zipfile. Note these attributes are not 217 valid and will raise a ``ValueError`` if the zipfile 218 has no filename. 219 220 >>> root.name 221 'abcde.zip' 222 >>> str(root.filename).replace(os.sep, posixpath.sep) 223 'mem/abcde.zip' 224 >>> str(root.parent) 225 'mem' 226 """ 227 228 __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" 229 230 def __init__(self, root, at=""): 231 """ 232 Construct a Path from a ZipFile or filename. 233 234 Note: When the source is an existing ZipFile object, 235 its type (__class__) will be mutated to a 236 specialized type. If the caller wishes to retain the 237 original type, the caller should either create a 238 separate ZipFile object or pass a filename. 239 """ 240 self.root = FastLookup.make(root) 241 self.at = at 242 243 def open(self, mode='r', *args, pwd=None, **kwargs): 244 """ 245 Open this entry as text or binary following the semantics 246 of ``pathlib.Path.open()`` by passing arguments through 247 to io.TextIOWrapper(). 248 """ 249 if self.is_dir(): 250 raise IsADirectoryError(self) 251 zip_mode = mode[0] 252 if not self.exists() and zip_mode == 'r': 253 raise FileNotFoundError(self) 254 stream = self.root.open(self.at, zip_mode, pwd=pwd) 255 if 'b' in mode: 256 if args or kwargs: 257 raise ValueError("encoding args invalid for binary operation") 258 return stream 259 return io.TextIOWrapper(stream, *args, **kwargs) 260 261 @property 262 def name(self): 263 return pathlib.Path(self.at).name or self.filename.name 264 265 @property 266 def suffix(self): 267 return pathlib.Path(self.at).suffix or self.filename.suffix 268 269 @property 270 def suffixes(self): 271 return pathlib.Path(self.at).suffixes or self.filename.suffixes 272 273 @property 274 def stem(self): 275 return pathlib.Path(self.at).stem or self.filename.stem 276 277 @property 278 def filename(self): 279 return pathlib.Path(self.root.filename).joinpath(self.at) 280 281 def read_text(self, *args, **kwargs): 282 with self.open('r', *args, **kwargs) as strm: 283 return strm.read() 284 285 def read_bytes(self): 286 with self.open('rb') as strm: 287 return strm.read() 288 289 def _is_child(self, path): 290 return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") 291 292 def _next(self, at): 293 return self.__class__(self.root, at) 294 295 def is_dir(self): 296 return not self.at or self.at.endswith("/") 297 298 def is_file(self): 299 return self.exists() and not self.is_dir() 300 301 def exists(self): 302 return self.at in self.root._name_set() 303 304 def iterdir(self): 305 if not self.is_dir(): 306 raise ValueError("Can't listdir a file") 307 subs = map(self._next, self.root.namelist()) 308 return filter(self._is_child, subs) 309 310 def __str__(self): 311 return posixpath.join(self.root.filename, self.at) 312 313 def __repr__(self): 314 return self.__repr.format(self=self) 315 316 def joinpath(self, *other): 317 next = posixpath.join(self.at, *map(_pathlib_compat, other)) 318 return self._next(self.root.resolve_dir(next)) 319 320 __truediv__ = joinpath 321 322 @property 323 def parent(self): 324 if not self.at: 325 return self.filename.parent 326 parent_at = posixpath.dirname(self.at.rstrip('/')) 327 if parent_at: 328 parent_at += '/' 329 return self._next(parent_at) 330