xref: /aosp_15_r20/external/bazelbuild-rules_python/python/private/pypi/whl_installer/wheel.py (revision 60517a1edbc8ecf509223e9af94a7adec7d736b8)
1# Copyright 2023 The Bazel Authors. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Utility class to inspect an extracted wheel directory"""
16
17import email
18import re
19from collections import defaultdict
20from dataclasses import dataclass
21from pathlib import Path
22from typing import Dict, List, Optional, Set, Tuple
23
24import installer
25from packaging.requirements import Requirement
26from pip._vendor.packaging.utils import canonicalize_name
27
28from python.private.pypi.whl_installer.platform import (
29    Platform,
30    host_interpreter_minor_version,
31)
32
33
34@dataclass(frozen=True)
35class FrozenDeps:
36    deps: List[str]
37    deps_select: Dict[str, List[str]]
38
39
40class Deps:
41    """Deps is a dependency builder that has a build() method to return FrozenDeps."""
42
43    def __init__(
44        self,
45        name: str,
46        requires_dist: List[str],
47        *,
48        extras: Optional[Set[str]] = None,
49        platforms: Optional[Set[Platform]] = None,
50    ):
51        """Create a new instance and parse the requires_dist
52
53        Args:
54            name (str): The name of the whl distribution
55            requires_dist (list[Str]): The Requires-Dist from the METADATA of the whl
56                distribution.
57            extras (set[str], optional): The list of requested extras, defaults to None.
58            platforms (set[Platform], optional): The list of target platforms, defaults to
59                None. If the list of platforms has multiple `minor_version` values, it
60                will change the code to generate the select statements using
61                `@rules_python//python/config_settings:is_python_3.y` conditions.
62        """
63        self.name: str = Deps._normalize(name)
64        self._platforms: Set[Platform] = platforms or set()
65        self._target_versions = {p.minor_version for p in platforms or {}}
66        self._default_minor_version = None
67        if platforms and len(self._target_versions) > 2:
68            # TODO @aignas 2024-06-23: enable this to be set via a CLI arg
69            # for being more explicit.
70            self._default_minor_version = host_interpreter_minor_version()
71
72        if None in self._target_versions and len(self._target_versions) > 2:
73            raise ValueError(
74                f"all python versions need to be specified explicitly, got: {platforms}"
75            )
76
77        # Sort so that the dictionary order in the FrozenDeps is deterministic
78        # without the final sort because Python retains insertion order. That way
79        # the sorting by platform is limited within the Platform class itself and
80        # the unit-tests for the Deps can be simpler.
81        reqs = sorted(
82            (Requirement(wheel_req) for wheel_req in requires_dist),
83            key=lambda x: f"{x.name}:{sorted(x.extras)}",
84        )
85
86        want_extras = self._resolve_extras(reqs, extras)
87
88        # Then add all of the requirements in order
89        self._deps: Set[str] = set()
90        self._select: Dict[Platform, Set[str]] = defaultdict(set)
91        for req in reqs:
92            self._add_req(req, want_extras)
93
94    def _add(self, dep: str, platform: Optional[Platform]):
95        dep = Deps._normalize(dep)
96
97        # Self-edges are processed in _resolve_extras
98        if dep == self.name:
99            return
100
101        if not platform:
102            self._deps.add(dep)
103
104            # If the dep is in the platform-specific list, remove it from the select.
105            pop_keys = []
106            for p, deps in self._select.items():
107                if dep not in deps:
108                    continue
109
110                deps.remove(dep)
111                if not deps:
112                    pop_keys.append(p)
113
114            for p in pop_keys:
115                self._select.pop(p)
116            return
117
118        if dep in self._deps:
119            # If the dep is already in the main dependency list, no need to add it in the
120            # platform-specific dependency list.
121            return
122
123        # Add the platform-specific dep
124        self._select[platform].add(dep)
125
126        # Add the dep to specializations of the given platform if they
127        # exist in the select statement.
128        for p in platform.all_specializations():
129            if p not in self._select:
130                continue
131
132            self._select[p].add(dep)
133
134        if len(self._select[platform]) == 1:
135            # We are adding a new item to the select and we need to ensure that
136            # existing dependencies from less specialized platforms are propagated
137            # to the newly added dependency set.
138            for p, deps in self._select.items():
139                # Check if the existing platform overlaps with the given platform
140                if p == platform or platform not in p.all_specializations():
141                    continue
142
143                self._select[platform].update(self._select[p])
144
145    def _maybe_add_common_dep(self, dep):
146        if len(self._target_versions) < 2:
147            return
148
149        platforms = [Platform()] + [
150            Platform(minor_version=v) for v in self._target_versions
151        ]
152
153        # If the dep is targeting all target python versions, lets add it to
154        # the common dependency list to simplify the select statements.
155        for p in platforms:
156            if p not in self._select:
157                return
158
159            if dep not in self._select[p]:
160                return
161
162        # All of the python version-specific branches have the dep, so lets add
163        # it to the common deps.
164        self._deps.add(dep)
165        for p in platforms:
166            self._select[p].remove(dep)
167            if not self._select[p]:
168                self._select.pop(p)
169
170    @staticmethod
171    def _normalize(name: str) -> str:
172        return re.sub(r"[-_.]+", "_", name).lower()
173
174    def _resolve_extras(
175        self, reqs: List[Requirement], extras: Optional[Set[str]]
176    ) -> Set[str]:
177        """Resolve extras which are due to depending on self[some_other_extra].
178
179        Some packages may have cyclic dependencies resulting from extras being used, one example is
180        `etils`, where we have one set of extras as aliases for other extras
181        and we have an extra called 'all' that includes all other extras.
182
183        Example: github.com/google/etils/blob/a0b71032095db14acf6b33516bca6d885fe09e35/pyproject.toml#L32.
184
185        When the `requirements.txt` is generated by `pip-tools`, then it is likely that
186        this step is not needed, but for other `requirements.txt` files this may be useful.
187
188        NOTE @aignas 2023-12-08: the extra resolution is not platform dependent,
189        but in order for it to become platform dependent we would have to have
190        separate targets for each extra in extras.
191        """
192
193        # Resolve any extra extras due to self-edges, empty string means no
194        # extras The empty string in the set is just a way to make the handling
195        # of no extras and a single extra easier and having a set of {"", "foo"}
196        # is equivalent to having {"foo"}.
197        extras = extras or {""}
198
199        self_reqs = []
200        for req in reqs:
201            if Deps._normalize(req.name) != self.name:
202                continue
203
204            if req.marker is None:
205                # I am pretty sure we cannot reach this code as it does not
206                # make sense to specify packages in this way, but since it is
207                # easy to handle, lets do it.
208                #
209                # TODO @aignas 2023-12-08: add a test
210                extras = extras | req.extras
211            else:
212                # process these in a separate loop
213                self_reqs.append(req)
214
215        # A double loop is not strictly optimal, but always correct without recursion
216        for req in self_reqs:
217            if any(req.marker.evaluate({"extra": extra}) for extra in extras):
218                extras = extras | req.extras
219            else:
220                continue
221
222            # Iterate through all packages to ensure that we include all of the extras from previously
223            # visited packages.
224            for req_ in self_reqs:
225                if any(req_.marker.evaluate({"extra": extra}) for extra in extras):
226                    extras = extras | req_.extras
227
228        return extras
229
230    def _add_req(self, req: Requirement, extras: Set[str]) -> None:
231        if req.marker is None:
232            self._add(req.name, None)
233            return
234
235        marker_str = str(req.marker)
236
237        if not self._platforms:
238            if any(req.marker.evaluate({"extra": extra}) for extra in extras):
239                self._add(req.name, None)
240            return
241
242        # NOTE @aignas 2023-12-08: in order to have reasonable select statements
243        # we do have to have some parsing of the markers, so it begs the question
244        # if packaging should be reimplemented in Starlark to have the best solution
245        # for now we will implement it in Python and see what the best parsing result
246        # can be before making this decision.
247        match_os = any(
248            tag in marker_str
249            for tag in [
250                "os_name",
251                "sys_platform",
252                "platform_system",
253            ]
254        )
255        match_arch = "platform_machine" in marker_str
256        match_version = "version" in marker_str
257
258        if not (match_os or match_arch or match_version):
259            if any(req.marker.evaluate({"extra": extra}) for extra in extras):
260                self._add(req.name, None)
261            return
262
263        for plat in self._platforms:
264            if not any(
265                req.marker.evaluate(plat.env_markers(extra)) for extra in extras
266            ):
267                continue
268
269            if match_arch and self._default_minor_version:
270                self._add(req.name, plat)
271                if plat.minor_version == self._default_minor_version:
272                    self._add(req.name, Platform(plat.os, plat.arch))
273            elif match_arch:
274                self._add(req.name, Platform(plat.os, plat.arch))
275            elif match_os and self._default_minor_version:
276                self._add(req.name, Platform(plat.os, minor_version=plat.minor_version))
277                if plat.minor_version == self._default_minor_version:
278                    self._add(req.name, Platform(plat.os))
279            elif match_os:
280                self._add(req.name, Platform(plat.os))
281            elif match_version and self._default_minor_version:
282                self._add(req.name, Platform(minor_version=plat.minor_version))
283                if plat.minor_version == self._default_minor_version:
284                    self._add(req.name, Platform())
285            elif match_version:
286                self._add(req.name, None)
287
288        # Merge to common if possible after processing all platforms
289        self._maybe_add_common_dep(req.name)
290
291    def build(self) -> FrozenDeps:
292        return FrozenDeps(
293            deps=sorted(self._deps),
294            deps_select={str(p): sorted(deps) for p, deps in self._select.items()},
295        )
296
297
298class Wheel:
299    """Representation of the compressed .whl file"""
300
301    def __init__(self, path: Path):
302        self._path = path
303
304    @property
305    def path(self) -> Path:
306        return self._path
307
308    @property
309    def name(self) -> str:
310        # TODO Also available as installer.sources.WheelSource.distribution
311        name = str(self.metadata["Name"])
312        return canonicalize_name(name)
313
314    @property
315    def metadata(self) -> email.message.Message:
316        with installer.sources.WheelFile.open(self.path) as wheel_source:
317            metadata_contents = wheel_source.read_dist_info("METADATA")
318            metadata = installer.utils.parse_metadata_file(metadata_contents)
319        return metadata
320
321    @property
322    def version(self) -> str:
323        # TODO Also available as installer.sources.WheelSource.version
324        return str(self.metadata["Version"])
325
326    def entry_points(self) -> Dict[str, Tuple[str, str]]:
327        """Returns the entrypoints defined in the current wheel
328
329        See https://packaging.python.org/specifications/entry-points/ for more info
330
331        Returns:
332            Dict[str, Tuple[str, str]]: A mapping of the entry point's name to it's module and attribute
333        """
334        with installer.sources.WheelFile.open(self.path) as wheel_source:
335            if "entry_points.txt" not in wheel_source.dist_info_filenames:
336                return dict()
337
338            entry_points_mapping = dict()
339            entry_points_contents = wheel_source.read_dist_info("entry_points.txt")
340            entry_points = installer.utils.parse_entrypoints(entry_points_contents)
341            for script, module, attribute, script_section in entry_points:
342                if script_section == "console":
343                    entry_points_mapping[script] = (module, attribute)
344
345            return entry_points_mapping
346
347    def dependencies(
348        self,
349        extras_requested: Set[str] = None,
350        platforms: Optional[Set[Platform]] = None,
351    ) -> FrozenDeps:
352        return Deps(
353            self.name,
354            extras=extras_requested,
355            platforms=platforms,
356            requires_dist=self.metadata.get_all("Requires-Dist", []),
357        ).build()
358
359    def unzip(self, directory: str) -> None:
360        installation_schemes = {
361            "purelib": "/site-packages",
362            "platlib": "/site-packages",
363            "headers": "/include",
364            "scripts": "/bin",
365            "data": "/data",
366        }
367        destination = installer.destinations.SchemeDictionaryDestination(
368            installation_schemes,
369            # TODO Should entry_point scripts also be handled by installer rather than custom code?
370            interpreter="/dev/null",
371            script_kind="posix",
372            destdir=directory,
373            bytecode_optimization_levels=[],
374        )
375
376        with installer.sources.WheelFile.open(self.path) as wheel_source:
377            installer.install(
378                source=wheel_source,
379                destination=destination,
380                additional_metadata={
381                    "INSTALLER": b"https://github.com/bazelbuild/rules_python",
382                },
383            )
384