xref: /aosp_15_r20/external/google-cloud-java/owl-bot-postprocessor/synthtool/transforms.py (revision 55e87721aa1bc457b326496a7ca40f3ea1a63287)
1*55e87721SMatt Gilbride# Copyright 2018 Google LLC
2*55e87721SMatt Gilbride#
3*55e87721SMatt Gilbride# Licensed under the Apache License, Version 2.0 (the "License");
4*55e87721SMatt Gilbride# you may not use this file except in compliance with the License.
5*55e87721SMatt Gilbride# You may obtain a copy of the License at
6*55e87721SMatt Gilbride#
7*55e87721SMatt Gilbride#     https://www.apache.org/licenses/LICENSE-2.0
8*55e87721SMatt Gilbride#
9*55e87721SMatt Gilbride# Unless required by applicable law or agreed to in writing, software
10*55e87721SMatt Gilbride# distributed under the License is distributed on an "AS IS" BASIS,
11*55e87721SMatt Gilbride# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*55e87721SMatt Gilbride# See the License for the specific language governing permissions and
13*55e87721SMatt Gilbride# limitations under the License.
14*55e87721SMatt Gilbride
15*55e87721SMatt Gilbridefrom pathlib import Path
16*55e87721SMatt Gilbrideimport shutil
17*55e87721SMatt Gilbridefrom typing import Callable, Iterable, Union, List, Optional
18*55e87721SMatt Gilbrideimport os
19*55e87721SMatt Gilbrideimport re
20*55e87721SMatt Gilbrideimport sys
21*55e87721SMatt Gilbride
22*55e87721SMatt Gilbridefrom synthtool import _tracked_paths
23*55e87721SMatt Gilbridefrom synthtool.log import logger
24*55e87721SMatt Gilbridefrom synthtool import metadata
25*55e87721SMatt Gilbride
26*55e87721SMatt GilbridePathOrStr = Union[str, Path]
27*55e87721SMatt GilbrideListOfPathsOrStrs = Iterable[Union[str, Path]]
28*55e87721SMatt Gilbride
29*55e87721SMatt Gilbride
30*55e87721SMatt Gilbrideclass MissingSourceError(Exception):
31*55e87721SMatt Gilbride    pass
32*55e87721SMatt Gilbride
33*55e87721SMatt Gilbride
34*55e87721SMatt Gilbridedef _expand_paths(paths: ListOfPathsOrStrs, root: PathOrStr = None) -> Iterable[Path]:
35*55e87721SMatt Gilbride    """Given a list of globs/paths, expands them into a flat sequence,
36*55e87721SMatt Gilbride    expanding globs as necessary."""
37*55e87721SMatt Gilbride    if paths is None:
38*55e87721SMatt Gilbride        return []
39*55e87721SMatt Gilbride
40*55e87721SMatt Gilbride    if isinstance(paths, (str, Path)):
41*55e87721SMatt Gilbride        paths = [paths]
42*55e87721SMatt Gilbride
43*55e87721SMatt Gilbride    if root is None:
44*55e87721SMatt Gilbride        root = Path(".")
45*55e87721SMatt Gilbride
46*55e87721SMatt Gilbride    # ensure root is a path
47*55e87721SMatt Gilbride    root = Path(root)
48*55e87721SMatt Gilbride
49*55e87721SMatt Gilbride    # record name of synth script so we don't try to do transforms on it
50*55e87721SMatt Gilbride    synth_script_name = sys.argv[0]
51*55e87721SMatt Gilbride
52*55e87721SMatt Gilbride    for path in paths:
53*55e87721SMatt Gilbride        if isinstance(path, Path):
54*55e87721SMatt Gilbride            if path.is_absolute():
55*55e87721SMatt Gilbride                anchor = Path(path.anchor)
56*55e87721SMatt Gilbride                remainder = str(path.relative_to(path.anchor))
57*55e87721SMatt Gilbride                yield from anchor.glob(remainder)
58*55e87721SMatt Gilbride            else:
59*55e87721SMatt Gilbride                yield from root.glob(str(path))
60*55e87721SMatt Gilbride        else:
61*55e87721SMatt Gilbride            yield from (
62*55e87721SMatt Gilbride                p
63*55e87721SMatt Gilbride                for p in root.glob(path)
64*55e87721SMatt Gilbride                if p.absolute() != Path(synth_script_name).absolute()
65*55e87721SMatt Gilbride            )
66*55e87721SMatt Gilbride
67*55e87721SMatt Gilbride
68*55e87721SMatt Gilbridedef _filter_files(paths: Iterable[Path]) -> Iterable[Path]:
69*55e87721SMatt Gilbride    """Returns only the paths that are files (no directories)."""
70*55e87721SMatt Gilbride
71*55e87721SMatt Gilbride    return (path for path in paths if path.is_file() and os.access(path, os.W_OK))
72*55e87721SMatt Gilbride
73*55e87721SMatt Gilbride
74*55e87721SMatt Gilbridedef _merge_file(
75*55e87721SMatt Gilbride    source_path: Path, dest_path: Path, merge: Callable[[str, str, Path], str]
76*55e87721SMatt Gilbride):
77*55e87721SMatt Gilbride    """
78*55e87721SMatt Gilbride    Writes to the destination the result of merging the source with the
79*55e87721SMatt Gilbride    existing destination contents, using the given merge function.
80*55e87721SMatt Gilbride
81*55e87721SMatt Gilbride    The merge function must take three arguments: the source contents, the
82*55e87721SMatt Gilbride    old destination contents, and a Path to the file to be written.
83*55e87721SMatt Gilbride    """
84*55e87721SMatt Gilbride
85*55e87721SMatt Gilbride    with source_path.open("r") as source_file:
86*55e87721SMatt Gilbride        source_text = source_file.read()
87*55e87721SMatt Gilbride
88*55e87721SMatt Gilbride    with dest_path.open("r+") as dest_file:
89*55e87721SMatt Gilbride        dest_text = dest_file.read()
90*55e87721SMatt Gilbride
91*55e87721SMatt Gilbride        final_text = merge(source_text, dest_text, dest_path)
92*55e87721SMatt Gilbride
93*55e87721SMatt Gilbride        # use the source file's file permission mode
94*55e87721SMatt Gilbride        os.chmod(dest_path, os.stat(source_path).st_mode)
95*55e87721SMatt Gilbride        if final_text != dest_text:
96*55e87721SMatt Gilbride            dest_file.seek(0)
97*55e87721SMatt Gilbride            dest_file.write(final_text)
98*55e87721SMatt Gilbride            dest_file.truncate()
99*55e87721SMatt Gilbride        else:
100*55e87721SMatt Gilbride            dest_path.touch()
101*55e87721SMatt Gilbride
102*55e87721SMatt Gilbride
103*55e87721SMatt Gilbridedef _copy_dir_to_existing_dir(
104*55e87721SMatt Gilbride    source: Path,
105*55e87721SMatt Gilbride    destination: Path,
106*55e87721SMatt Gilbride    excludes: ListOfPathsOrStrs = None,
107*55e87721SMatt Gilbride    merge: Callable[[str, str, Path], str] = None,
108*55e87721SMatt Gilbride) -> bool:
109*55e87721SMatt Gilbride    """
110*55e87721SMatt Gilbride    copies files over existing files to an existing directory
111*55e87721SMatt Gilbride    this function does not copy empty directories.
112*55e87721SMatt Gilbride
113*55e87721SMatt Gilbride    Returns: True if any files were copied, False otherwise.
114*55e87721SMatt Gilbride    """
115*55e87721SMatt Gilbride    copied = False
116*55e87721SMatt Gilbride
117*55e87721SMatt Gilbride    if not excludes:
118*55e87721SMatt Gilbride        excludes = []
119*55e87721SMatt Gilbride    for root, _, files in os.walk(source):
120*55e87721SMatt Gilbride        for name in files:
121*55e87721SMatt Gilbride            rel_path = str(Path(root).relative_to(source))
122*55e87721SMatt Gilbride            dest_dir = destination / rel_path
123*55e87721SMatt Gilbride            dest_path = dest_dir / name
124*55e87721SMatt Gilbride            exclude = [
125*55e87721SMatt Gilbride                e
126*55e87721SMatt Gilbride                for e in excludes
127*55e87721SMatt Gilbride                if (
128*55e87721SMatt Gilbride                    Path(e) == _tracked_paths.relativize(root)
129*55e87721SMatt Gilbride                    or Path(e) == _tracked_paths.relativize(Path(root) / name)
130*55e87721SMatt Gilbride                )
131*55e87721SMatt Gilbride            ]
132*55e87721SMatt Gilbride            if not exclude:
133*55e87721SMatt Gilbride                os.makedirs(str(dest_dir), exist_ok=True)
134*55e87721SMatt Gilbride                source_path = Path(os.path.join(root, name))
135*55e87721SMatt Gilbride                if merge is not None and dest_path.is_file():
136*55e87721SMatt Gilbride                    try:
137*55e87721SMatt Gilbride                        _merge_file(source_path, dest_path, merge)
138*55e87721SMatt Gilbride                    except Exception:
139*55e87721SMatt Gilbride                        logger.exception(
140*55e87721SMatt Gilbride                            "_merge_file failed for %s, fall back to copy",
141*55e87721SMatt Gilbride                            source_path,
142*55e87721SMatt Gilbride                        )
143*55e87721SMatt Gilbride                        shutil.copy2(str(source_path), str(dest_path))
144*55e87721SMatt Gilbride                else:
145*55e87721SMatt Gilbride                    shutil.copy2(str(source_path), str(dest_path))
146*55e87721SMatt Gilbride                copied = True
147*55e87721SMatt Gilbride
148*55e87721SMatt Gilbride    return copied
149*55e87721SMatt Gilbride
150*55e87721SMatt Gilbride
151*55e87721SMatt Gilbridedef dont_overwrite(
152*55e87721SMatt Gilbride    patterns: ListOfPathsOrStrs,
153*55e87721SMatt Gilbride) -> Callable[[str, str, Path], str]:
154*55e87721SMatt Gilbride    """Returns a merge function that doesn't overwrite the specified files.
155*55e87721SMatt Gilbride
156*55e87721SMatt Gilbride    Pass the return value to move() or copy() to avoid overwriting existing
157*55e87721SMatt Gilbride    files.
158*55e87721SMatt Gilbride    """
159*55e87721SMatt Gilbride
160*55e87721SMatt Gilbride    def merge(source_text: str, destinaton_text: str, file_path: Path) -> str:
161*55e87721SMatt Gilbride        for pattern in patterns:
162*55e87721SMatt Gilbride            if file_path.match(str(pattern)):
163*55e87721SMatt Gilbride                logger.debug(f"Preserving existing contents of {file_path}.")
164*55e87721SMatt Gilbride                return destinaton_text
165*55e87721SMatt Gilbride        return source_text
166*55e87721SMatt Gilbride
167*55e87721SMatt Gilbride    return merge
168*55e87721SMatt Gilbride
169*55e87721SMatt Gilbride
170*55e87721SMatt Gilbridedef move(
171*55e87721SMatt Gilbride    sources: ListOfPathsOrStrs,
172*55e87721SMatt Gilbride    destination: PathOrStr = None,
173*55e87721SMatt Gilbride    excludes: ListOfPathsOrStrs = None,
174*55e87721SMatt Gilbride    merge: Callable[[str, str, Path], str] = None,
175*55e87721SMatt Gilbride    required: bool = False,
176*55e87721SMatt Gilbride) -> bool:
177*55e87721SMatt Gilbride    """
178*55e87721SMatt Gilbride    copy file(s) at source to current directory, preserving file mode.
179*55e87721SMatt Gilbride
180*55e87721SMatt Gilbride    Args:
181*55e87721SMatt Gilbride        sources (ListOfPathsOrStrs): Glob pattern(s) to copy
182*55e87721SMatt Gilbride        destination (PathOrStr): Destination folder for copied files
183*55e87721SMatt Gilbride        excludes (ListOfPathsOrStrs): Glob pattern(s) of files to skip
184*55e87721SMatt Gilbride        merge (Callable[[str, str, Path], str]): Callback function for merging files
185*55e87721SMatt Gilbride            if there is an existing file.
186*55e87721SMatt Gilbride        required (bool): If required and no source files are copied, throws a MissingSourceError
187*55e87721SMatt Gilbride
188*55e87721SMatt Gilbride    Returns:
189*55e87721SMatt Gilbride        True if any files were copied, False otherwise.
190*55e87721SMatt Gilbride    """
191*55e87721SMatt Gilbride    copied = False
192*55e87721SMatt Gilbride
193*55e87721SMatt Gilbride    for excluded_pattern in excludes or []:
194*55e87721SMatt Gilbride        metadata.add_pattern_excluded_during_copy(str(excluded_pattern))
195*55e87721SMatt Gilbride
196*55e87721SMatt Gilbride    for source in _expand_paths(sources):
197*55e87721SMatt Gilbride        if destination is None:
198*55e87721SMatt Gilbride            canonical_destination = _tracked_paths.relativize(source)
199*55e87721SMatt Gilbride        else:
200*55e87721SMatt Gilbride            canonical_destination = Path(destination)
201*55e87721SMatt Gilbride
202*55e87721SMatt Gilbride        if excludes:
203*55e87721SMatt Gilbride            excludes = [
204*55e87721SMatt Gilbride                _tracked_paths.relativize(e) for e in _expand_paths(excludes, source)
205*55e87721SMatt Gilbride            ]
206*55e87721SMatt Gilbride        else:
207*55e87721SMatt Gilbride            excludes = []
208*55e87721SMatt Gilbride        if source.is_dir():
209*55e87721SMatt Gilbride            copied = copied or _copy_dir_to_existing_dir(
210*55e87721SMatt Gilbride                source, canonical_destination, excludes=excludes, merge=merge
211*55e87721SMatt Gilbride            )
212*55e87721SMatt Gilbride        elif source not in excludes:
213*55e87721SMatt Gilbride            # copy individual file
214*55e87721SMatt Gilbride            if merge is not None and canonical_destination.is_file():
215*55e87721SMatt Gilbride                try:
216*55e87721SMatt Gilbride                    _merge_file(source, canonical_destination, merge)
217*55e87721SMatt Gilbride                except Exception:
218*55e87721SMatt Gilbride                    logger.exception(
219*55e87721SMatt Gilbride                        "_merge_file failed for %s, fall back to copy", source
220*55e87721SMatt Gilbride                    )
221*55e87721SMatt Gilbride                    shutil.copy2(source, canonical_destination)
222*55e87721SMatt Gilbride            else:
223*55e87721SMatt Gilbride                shutil.copy2(source, canonical_destination)
224*55e87721SMatt Gilbride            copied = True
225*55e87721SMatt Gilbride
226*55e87721SMatt Gilbride    if not copied:
227*55e87721SMatt Gilbride        if required:
228*55e87721SMatt Gilbride            raise MissingSourceError(
229*55e87721SMatt Gilbride                f"No files in sources {sources} were copied. Does the source "
230*55e87721SMatt Gilbride                f"contain files?"
231*55e87721SMatt Gilbride            )
232*55e87721SMatt Gilbride        else:
233*55e87721SMatt Gilbride            logger.warning(
234*55e87721SMatt Gilbride                f"No files in sources {sources} were copied. Does the source "
235*55e87721SMatt Gilbride                f"contain files?"
236*55e87721SMatt Gilbride            )
237*55e87721SMatt Gilbride
238*55e87721SMatt Gilbride    return copied
239*55e87721SMatt Gilbride
240*55e87721SMatt Gilbride
241*55e87721SMatt Gilbridedef _replace_in_file(path, expr, replacement):
242*55e87721SMatt Gilbride    try:
243*55e87721SMatt Gilbride        with path.open("r+") as fh:
244*55e87721SMatt Gilbride            return _replace_in_file_handle(fh, expr, replacement)
245*55e87721SMatt Gilbride    except UnicodeDecodeError:
246*55e87721SMatt Gilbride        pass  # It's a binary file.  Try again with a binary regular expression.
247*55e87721SMatt Gilbride    flags = expr.flags & ~re.UNICODE
248*55e87721SMatt Gilbride    expr = re.compile(expr.pattern.encode(), flags)
249*55e87721SMatt Gilbride    with path.open("rb+") as fh:
250*55e87721SMatt Gilbride        return _replace_in_file_handle(fh, expr, replacement.encode())
251*55e87721SMatt Gilbride
252*55e87721SMatt Gilbride
253*55e87721SMatt Gilbridedef _replace_in_file_handle(fh, expr, replacement):
254*55e87721SMatt Gilbride    content = fh.read()
255*55e87721SMatt Gilbride    content, count = expr.subn(replacement, content)
256*55e87721SMatt Gilbride
257*55e87721SMatt Gilbride    # Don't bother writing the file if we didn't change
258*55e87721SMatt Gilbride    # anything.
259*55e87721SMatt Gilbride    if count:
260*55e87721SMatt Gilbride        fh.seek(0)
261*55e87721SMatt Gilbride        fh.write(content)
262*55e87721SMatt Gilbride        fh.truncate()
263*55e87721SMatt Gilbride    return count
264*55e87721SMatt Gilbride
265*55e87721SMatt Gilbride
266*55e87721SMatt Gilbridedef replace(
267*55e87721SMatt Gilbride    sources: ListOfPathsOrStrs, before: str, after: str, flags: int = re.MULTILINE
268*55e87721SMatt Gilbride) -> int:
269*55e87721SMatt Gilbride    """Replaces occurrences of before with after in all the given sources.
270*55e87721SMatt Gilbride
271*55e87721SMatt Gilbride    Returns:
272*55e87721SMatt Gilbride      The number of times the text was found and replaced across all files.
273*55e87721SMatt Gilbride    """
274*55e87721SMatt Gilbride    expr = re.compile(before, flags=flags or 0)
275*55e87721SMatt Gilbride    paths = _filter_files(_expand_paths(sources, "."))
276*55e87721SMatt Gilbride
277*55e87721SMatt Gilbride    if not paths:
278*55e87721SMatt Gilbride        logger.warning(f"No files were found in sources {sources} for replace()")
279*55e87721SMatt Gilbride
280*55e87721SMatt Gilbride    count_replaced = 0
281*55e87721SMatt Gilbride    for path in paths:
282*55e87721SMatt Gilbride        replaced = _replace_in_file(path, expr, after)
283*55e87721SMatt Gilbride        count_replaced += replaced
284*55e87721SMatt Gilbride        if replaced:
285*55e87721SMatt Gilbride            logger.info(f"Replaced {before!r} in {path}.")
286*55e87721SMatt Gilbride
287*55e87721SMatt Gilbride    if not count_replaced:
288*55e87721SMatt Gilbride        logger.warning(
289*55e87721SMatt Gilbride            f"No replacements made in {sources} for pattern {before}, maybe "
290*55e87721SMatt Gilbride            "replacement is no longer needed?"
291*55e87721SMatt Gilbride        )
292*55e87721SMatt Gilbride    return count_replaced
293*55e87721SMatt Gilbride
294*55e87721SMatt Gilbride
295*55e87721SMatt Gilbridedef get_staging_dirs(
296*55e87721SMatt Gilbride    default_version: Optional[str] = None, staging_path: Optional[str] = None
297*55e87721SMatt Gilbride) -> List[Path]:
298*55e87721SMatt Gilbride    """Returns the list of directories, one per version, copied from
299*55e87721SMatt Gilbride    https://github.com/googleapis/googleapis-gen. Will return in lexical sorting
300*55e87721SMatt Gilbride    order with the exception of the default_version which will be last (if specified).
301*55e87721SMatt Gilbride
302*55e87721SMatt Gilbride    Args:
303*55e87721SMatt Gilbride      default_version: the default version of the API. The directory for this version
304*55e87721SMatt Gilbride        will be the last item in the returned list if specified.
305*55e87721SMatt Gilbride      staging_path: the path to the staging directory.
306*55e87721SMatt Gilbride
307*55e87721SMatt Gilbride    Returns: the empty list if no file were copied.
308*55e87721SMatt Gilbride    """
309*55e87721SMatt Gilbride
310*55e87721SMatt Gilbride    if staging_path:
311*55e87721SMatt Gilbride        staging = Path(staging_path)
312*55e87721SMatt Gilbride    else:
313*55e87721SMatt Gilbride        staging = Path("owl-bot-staging")
314*55e87721SMatt Gilbride    if staging.is_dir():
315*55e87721SMatt Gilbride        # Collect the subdirectories of the staging directory.
316*55e87721SMatt Gilbride        versions = [v.name for v in staging.iterdir() if v.is_dir()]
317*55e87721SMatt Gilbride        # Reorder the versions so the default version always comes last.
318*55e87721SMatt Gilbride        versions = [v for v in versions if v != default_version]
319*55e87721SMatt Gilbride        versions.sort()
320*55e87721SMatt Gilbride        if default_version is not None:
321*55e87721SMatt Gilbride            versions += [default_version]
322*55e87721SMatt Gilbride        dirs = [staging / v for v in versions]
323*55e87721SMatt Gilbride        for dir in dirs:
324*55e87721SMatt Gilbride            _tracked_paths.add(dir)
325*55e87721SMatt Gilbride        return dirs
326*55e87721SMatt Gilbride    else:
327*55e87721SMatt Gilbride        return []
328*55e87721SMatt Gilbride
329*55e87721SMatt Gilbride
330*55e87721SMatt Gilbridedef remove_staging_dirs():
331*55e87721SMatt Gilbride    """Removes all the staging directories."""
332*55e87721SMatt Gilbride    staging = Path("owl-bot-staging")
333*55e87721SMatt Gilbride    if staging.is_dir():
334*55e87721SMatt Gilbride        shutil.rmtree(staging)
335