xref: /aosp_15_r20/external/pigweed/pw_package/py/pw_package/git_repo.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Install and check status of Git repository-based packages."""
15
16import logging
17import os
18from pathlib import Path
19import shutil
20import subprocess
21import urllib.parse
22
23import pw_package.package_manager
24
25_LOG: logging.Logger = logging.getLogger(__name__)
26
27_GIT_CONFIG = [
28    # Suppress "You are in 'detached HEAD' state" message
29    '-c',
30    'advice.detachedHead=false',
31]
32
33
34def git_stdout(
35    *args: Path | str, show_stderr=False, repo: Path | str = '.'
36) -> str:
37    _LOG.debug('executing %r in %r', args, repo)
38    return (
39        subprocess.run(
40            ['git'] + _GIT_CONFIG + ['-C', repo, *args],
41            stdout=subprocess.PIPE,
42            stderr=None if show_stderr else subprocess.DEVNULL,
43            check=True,
44        )
45        .stdout.decode()
46        .strip()
47    )
48
49
50def git(
51    *args: Path | str, repo: Path | str = '.'
52) -> subprocess.CompletedProcess:
53    _LOG.debug('executing %r in %r', args, repo)
54    return subprocess.run(
55        ['git'] + _GIT_CONFIG + ['-C', repo, *args], check=True
56    )
57
58
59class GitRepo(pw_package.package_manager.Package):
60    """Install and check status of Git repository-based packages."""
61
62    def __init__(
63        self, url, *args, commit='', tag='', sparse_list=None, **kwargs
64    ):
65        super().__init__(*args, **kwargs)
66        if not (commit or tag):
67            raise ValueError('git repo must specify a commit or tag')
68
69        self._url = url
70        self._commit = commit
71        self._tag = tag
72        self._sparse_list = sparse_list
73        self._allow_use_in_downstream = False
74
75    def status(self, path: Path) -> bool:
76        _LOG.debug('%s: status', self.name)
77        # TODO(tonymd): Check the correct SHA is checked out here.
78        if not os.path.isdir(path / '.git'):
79            _LOG.debug('%s: no .git folder', self.name)
80            return False
81
82        remote = git_stdout('remote', 'get-url', 'origin', repo=path)
83        url = urllib.parse.urlparse(remote)
84        if url.scheme == 'sso' or '.git.corp.google.com' in url.netloc:
85            host = url.netloc.replace(
86                '.git.corp.google.com',
87                '.googlesource.com',
88            )
89            if not host.endswith('.googlesource.com'):
90                host += '.googlesource.com'
91            remote = 'https://{}{}'.format(host, url.path)
92        if remote != self._url:
93            _LOG.debug(
94                "%s: remote doesn't match expected %s actual %s",
95                self.name,
96                self._url,
97                remote,
98            )
99            return False
100
101        commit = git_stdout('rev-parse', 'HEAD', repo=path)
102        if self._commit and self._commit != 'HEAD' and self._commit != commit:
103            _LOG.debug(
104                "%s: commits don't match expected %s actual %s",
105                self.name,
106                self._commit,
107                commit,
108            )
109            return False
110
111        if self._tag:
112            tag = git_stdout('describe', '--tags', repo=path)
113            if self._tag != tag:
114                _LOG.debug(
115                    "%s: tags don't match expected %s actual %s",
116                    self.name,
117                    self._tag,
118                    tag,
119                )
120                return False
121
122        # If it is a sparse checkout, sparse list shall match.
123        if self._sparse_list:
124            if not self.check_sparse_list(path):
125                _LOG.debug("%s: sparse lists don't match", self.name)
126                return False
127
128        status = git_stdout('status', '--porcelain=v1', repo=path)
129        _LOG.debug('%s: status %r', self.name, status)
130        return not status
131
132    def install(self, path: Path) -> None:
133        _LOG.debug('%s: install', self.name)
134        # If already installed and at correct version exit now.
135        if self.status(path):
136            _LOG.debug('%s: already installed, exiting', self.name)
137            return
138
139        # Otherwise delete current version and clone again.
140        if os.path.isdir(path):
141            _LOG.debug('%s: removing', self.name)
142            shutil.rmtree(path)
143
144        if self._sparse_list:
145            self.checkout_sparse(path)
146        else:
147            self.checkout_full(path)
148
149    def checkout_full(self, path: Path) -> None:
150        # --filter=blob:none means we don't get history, just the current
151        # revision. If we later run commands that need history it will be
152        # retrieved on-demand. For small repositories the effect is negligible
153        # but for large repositories this should be a significant improvement.
154        # --filter=... causes progress messages to be printed to stderr even if
155        # using --quiet so we wrap our clone command in `get_stdout` to prevent
156        # the output from being emitted.
157        _LOG.debug('%s: checkout_full', self.name)
158        if self._commit:
159            git_stdout('clone', '--filter=blob:none', self._url, path)
160            git('reset', '--hard', self._commit, repo=path)
161        elif self._tag:
162            git_stdout(
163                'clone', '-b', self._tag, '--filter=blob:none', self._url, path
164            )
165
166    def checkout_sparse(self, path: Path) -> None:
167        _LOG.debug('%s: checkout_sparse', self.name)
168        # sparse checkout
169        git('init', path)
170        git('remote', 'add', 'origin', self._url, repo=path)
171        git('config', 'core.sparseCheckout', 'true', repo=path)
172
173        # Add files to checkout by editing .git/info/sparse-checkout
174        with open(path / '.git' / 'info' / 'sparse-checkout', 'w') as sparse:
175            for source in self._sparse_list:
176                sparse.write(source + '\n')
177
178        # Either pull from a commit or a tag.
179        target = self._commit if self._commit else self._tag
180        git('pull', '--depth=1', 'origin', target, repo=path)
181
182    def check_sparse_list(self, path: Path) -> bool:
183        sparse_list = (
184            git_stdout('sparse-checkout', 'list', repo=path)
185            .strip('\n')
186            .splitlines()
187        )
188        return set(sparse_list) == set(self._sparse_list)
189