1import collections
2import operator
3import pathlib
4import zipfile
5
6from . import abc
7
8from ._itertools import unique_everseen
9
10
11def remove_duplicates(items):
12    return iter(collections.OrderedDict.fromkeys(items))
13
14
15class FileReader(abc.TraversableResources):
16    def __init__(self, loader):
17        self.path = pathlib.Path(loader.path).parent
18
19    def resource_path(self, resource):
20        """
21        Return the file system path to prevent
22        `resources.path()` from creating a temporary
23        copy.
24        """
25        return str(self.path.joinpath(resource))
26
27    def files(self):
28        return self.path
29
30
31class ZipReader(abc.TraversableResources):
32    def __init__(self, loader, module):
33        _, _, name = module.rpartition('.')
34        self.prefix = loader.prefix.replace('\\', '/') + name + '/'
35        self.archive = loader.archive
36
37    def open_resource(self, resource):
38        try:
39            return super().open_resource(resource)
40        except KeyError as exc:
41            raise FileNotFoundError(exc.args[0])
42
43    def is_resource(self, path):
44        # workaround for `zipfile.Path.is_file` returning true
45        # for non-existent paths.
46        target = self.files().joinpath(path)
47        return target.is_file() and target.exists()
48
49    def files(self):
50        return zipfile.Path(self.archive, self.prefix)
51
52
53class MultiplexedPath(abc.Traversable):
54    """
55    Given a series of Traversable objects, implement a merged
56    version of the interface across all objects. Useful for
57    namespace packages which may be multihomed at a single
58    name.
59    """
60
61    def __init__(self, *paths):
62        self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
63        if not self._paths:
64            message = 'MultiplexedPath must contain at least one path'
65            raise FileNotFoundError(message)
66        if not all(path.is_dir() for path in self._paths):
67            raise NotADirectoryError('MultiplexedPath only supports directories')
68
69    def iterdir(self):
70        files = (file for path in self._paths for file in path.iterdir())
71        return unique_everseen(files, key=operator.attrgetter('name'))
72
73    def read_bytes(self):
74        raise FileNotFoundError(f'{self} is not a file')
75
76    def read_text(self, *args, **kwargs):
77        raise FileNotFoundError(f'{self} is not a file')
78
79    def is_dir(self):
80        return True
81
82    def is_file(self):
83        return False
84
85    def joinpath(self, child):
86        # first try to find child in current paths
87        for file in self.iterdir():
88            if file.name == child:
89                return file
90        # if it does not exist, construct it with the first path
91        return self._paths[0] / child
92
93    __truediv__ = joinpath
94
95    def open(self, *args, **kwargs):
96        raise FileNotFoundError(f'{self} is not a file')
97
98    @property
99    def name(self):
100        return self._paths[0].name
101
102    def __repr__(self):
103        paths = ', '.join(f"'{path}'" for path in self._paths)
104        return f'MultiplexedPath({paths})'
105
106
107class NamespaceReader(abc.TraversableResources):
108    def __init__(self, namespace_path):
109        if 'NamespacePath' not in str(namespace_path):
110            raise ValueError('Invalid path')
111        self.path = MultiplexedPath(*list(namespace_path))
112
113    def resource_path(self, resource):
114        """
115        Return the file system path to prevent
116        `resources.path()` from creating a temporary
117        copy.
118        """
119        return str(self.path.joinpath(resource))
120
121    def files(self):
122        return self.path
123