1*55e87721SMatt Gilbride# Copyright 2018 Google LLC 2*55e87721SMatt Gilbride# 3*55e87721SMatt Gilbride# Licensed under the Apache License, Version 2.0 (the "License"); 4*55e87721SMatt Gilbride# you may not use this file except in compliance with the License. 5*55e87721SMatt Gilbride# You may obtain a copy of the License at 6*55e87721SMatt Gilbride# 7*55e87721SMatt Gilbride# https://www.apache.org/licenses/LICENSE-2.0 8*55e87721SMatt Gilbride# 9*55e87721SMatt Gilbride# Unless required by applicable law or agreed to in writing, software 10*55e87721SMatt Gilbride# distributed under the License is distributed on an "AS IS" BASIS, 11*55e87721SMatt Gilbride# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12*55e87721SMatt Gilbride# See the License for the specific language governing permissions and 13*55e87721SMatt Gilbride# limitations under the License. 14*55e87721SMatt Gilbride 15*55e87721SMatt Gilbridefrom pathlib import Path 16*55e87721SMatt Gilbrideimport shutil 17*55e87721SMatt Gilbridefrom typing import Callable, Iterable, Union, List, Optional 18*55e87721SMatt Gilbrideimport os 19*55e87721SMatt Gilbrideimport re 20*55e87721SMatt Gilbrideimport sys 21*55e87721SMatt Gilbride 22*55e87721SMatt Gilbridefrom synthtool import _tracked_paths 23*55e87721SMatt Gilbridefrom synthtool.log import logger 24*55e87721SMatt Gilbridefrom synthtool import metadata 25*55e87721SMatt Gilbride 26*55e87721SMatt GilbridePathOrStr = Union[str, Path] 27*55e87721SMatt GilbrideListOfPathsOrStrs = Iterable[Union[str, Path]] 28*55e87721SMatt Gilbride 29*55e87721SMatt Gilbride 30*55e87721SMatt Gilbrideclass MissingSourceError(Exception): 31*55e87721SMatt Gilbride pass 32*55e87721SMatt Gilbride 33*55e87721SMatt Gilbride 34*55e87721SMatt Gilbridedef _expand_paths(paths: ListOfPathsOrStrs, root: PathOrStr = None) -> Iterable[Path]: 35*55e87721SMatt Gilbride """Given a list of globs/paths, expands them into a flat sequence, 36*55e87721SMatt Gilbride expanding globs as necessary.""" 37*55e87721SMatt Gilbride if paths is None: 38*55e87721SMatt Gilbride return [] 39*55e87721SMatt Gilbride 40*55e87721SMatt Gilbride if isinstance(paths, (str, Path)): 41*55e87721SMatt Gilbride paths = [paths] 42*55e87721SMatt Gilbride 43*55e87721SMatt Gilbride if root is None: 44*55e87721SMatt Gilbride root = Path(".") 45*55e87721SMatt Gilbride 46*55e87721SMatt Gilbride # ensure root is a path 47*55e87721SMatt Gilbride root = Path(root) 48*55e87721SMatt Gilbride 49*55e87721SMatt Gilbride # record name of synth script so we don't try to do transforms on it 50*55e87721SMatt Gilbride synth_script_name = sys.argv[0] 51*55e87721SMatt Gilbride 52*55e87721SMatt Gilbride for path in paths: 53*55e87721SMatt Gilbride if isinstance(path, Path): 54*55e87721SMatt Gilbride if path.is_absolute(): 55*55e87721SMatt Gilbride anchor = Path(path.anchor) 56*55e87721SMatt Gilbride remainder = str(path.relative_to(path.anchor)) 57*55e87721SMatt Gilbride yield from anchor.glob(remainder) 58*55e87721SMatt Gilbride else: 59*55e87721SMatt Gilbride yield from root.glob(str(path)) 60*55e87721SMatt Gilbride else: 61*55e87721SMatt Gilbride yield from ( 62*55e87721SMatt Gilbride p 63*55e87721SMatt Gilbride for p in root.glob(path) 64*55e87721SMatt Gilbride if p.absolute() != Path(synth_script_name).absolute() 65*55e87721SMatt Gilbride ) 66*55e87721SMatt Gilbride 67*55e87721SMatt Gilbride 68*55e87721SMatt Gilbridedef _filter_files(paths: Iterable[Path]) -> Iterable[Path]: 69*55e87721SMatt Gilbride """Returns only the paths that are files (no directories).""" 70*55e87721SMatt Gilbride 71*55e87721SMatt Gilbride return (path for path in paths if path.is_file() and os.access(path, os.W_OK)) 72*55e87721SMatt Gilbride 73*55e87721SMatt Gilbride 74*55e87721SMatt Gilbridedef _merge_file( 75*55e87721SMatt Gilbride source_path: Path, dest_path: Path, merge: Callable[[str, str, Path], str] 76*55e87721SMatt Gilbride): 77*55e87721SMatt Gilbride """ 78*55e87721SMatt Gilbride Writes to the destination the result of merging the source with the 79*55e87721SMatt Gilbride existing destination contents, using the given merge function. 80*55e87721SMatt Gilbride 81*55e87721SMatt Gilbride The merge function must take three arguments: the source contents, the 82*55e87721SMatt Gilbride old destination contents, and a Path to the file to be written. 83*55e87721SMatt Gilbride """ 84*55e87721SMatt Gilbride 85*55e87721SMatt Gilbride with source_path.open("r") as source_file: 86*55e87721SMatt Gilbride source_text = source_file.read() 87*55e87721SMatt Gilbride 88*55e87721SMatt Gilbride with dest_path.open("r+") as dest_file: 89*55e87721SMatt Gilbride dest_text = dest_file.read() 90*55e87721SMatt Gilbride 91*55e87721SMatt Gilbride final_text = merge(source_text, dest_text, dest_path) 92*55e87721SMatt Gilbride 93*55e87721SMatt Gilbride # use the source file's file permission mode 94*55e87721SMatt Gilbride os.chmod(dest_path, os.stat(source_path).st_mode) 95*55e87721SMatt Gilbride if final_text != dest_text: 96*55e87721SMatt Gilbride dest_file.seek(0) 97*55e87721SMatt Gilbride dest_file.write(final_text) 98*55e87721SMatt Gilbride dest_file.truncate() 99*55e87721SMatt Gilbride else: 100*55e87721SMatt Gilbride dest_path.touch() 101*55e87721SMatt Gilbride 102*55e87721SMatt Gilbride 103*55e87721SMatt Gilbridedef _copy_dir_to_existing_dir( 104*55e87721SMatt Gilbride source: Path, 105*55e87721SMatt Gilbride destination: Path, 106*55e87721SMatt Gilbride excludes: ListOfPathsOrStrs = None, 107*55e87721SMatt Gilbride merge: Callable[[str, str, Path], str] = None, 108*55e87721SMatt Gilbride) -> bool: 109*55e87721SMatt Gilbride """ 110*55e87721SMatt Gilbride copies files over existing files to an existing directory 111*55e87721SMatt Gilbride this function does not copy empty directories. 112*55e87721SMatt Gilbride 113*55e87721SMatt Gilbride Returns: True if any files were copied, False otherwise. 114*55e87721SMatt Gilbride """ 115*55e87721SMatt Gilbride copied = False 116*55e87721SMatt Gilbride 117*55e87721SMatt Gilbride if not excludes: 118*55e87721SMatt Gilbride excludes = [] 119*55e87721SMatt Gilbride for root, _, files in os.walk(source): 120*55e87721SMatt Gilbride for name in files: 121*55e87721SMatt Gilbride rel_path = str(Path(root).relative_to(source)) 122*55e87721SMatt Gilbride dest_dir = destination / rel_path 123*55e87721SMatt Gilbride dest_path = dest_dir / name 124*55e87721SMatt Gilbride exclude = [ 125*55e87721SMatt Gilbride e 126*55e87721SMatt Gilbride for e in excludes 127*55e87721SMatt Gilbride if ( 128*55e87721SMatt Gilbride Path(e) == _tracked_paths.relativize(root) 129*55e87721SMatt Gilbride or Path(e) == _tracked_paths.relativize(Path(root) / name) 130*55e87721SMatt Gilbride ) 131*55e87721SMatt Gilbride ] 132*55e87721SMatt Gilbride if not exclude: 133*55e87721SMatt Gilbride os.makedirs(str(dest_dir), exist_ok=True) 134*55e87721SMatt Gilbride source_path = Path(os.path.join(root, name)) 135*55e87721SMatt Gilbride if merge is not None and dest_path.is_file(): 136*55e87721SMatt Gilbride try: 137*55e87721SMatt Gilbride _merge_file(source_path, dest_path, merge) 138*55e87721SMatt Gilbride except Exception: 139*55e87721SMatt Gilbride logger.exception( 140*55e87721SMatt Gilbride "_merge_file failed for %s, fall back to copy", 141*55e87721SMatt Gilbride source_path, 142*55e87721SMatt Gilbride ) 143*55e87721SMatt Gilbride shutil.copy2(str(source_path), str(dest_path)) 144*55e87721SMatt Gilbride else: 145*55e87721SMatt Gilbride shutil.copy2(str(source_path), str(dest_path)) 146*55e87721SMatt Gilbride copied = True 147*55e87721SMatt Gilbride 148*55e87721SMatt Gilbride return copied 149*55e87721SMatt Gilbride 150*55e87721SMatt Gilbride 151*55e87721SMatt Gilbridedef dont_overwrite( 152*55e87721SMatt Gilbride patterns: ListOfPathsOrStrs, 153*55e87721SMatt Gilbride) -> Callable[[str, str, Path], str]: 154*55e87721SMatt Gilbride """Returns a merge function that doesn't overwrite the specified files. 155*55e87721SMatt Gilbride 156*55e87721SMatt Gilbride Pass the return value to move() or copy() to avoid overwriting existing 157*55e87721SMatt Gilbride files. 158*55e87721SMatt Gilbride """ 159*55e87721SMatt Gilbride 160*55e87721SMatt Gilbride def merge(source_text: str, destinaton_text: str, file_path: Path) -> str: 161*55e87721SMatt Gilbride for pattern in patterns: 162*55e87721SMatt Gilbride if file_path.match(str(pattern)): 163*55e87721SMatt Gilbride logger.debug(f"Preserving existing contents of {file_path}.") 164*55e87721SMatt Gilbride return destinaton_text 165*55e87721SMatt Gilbride return source_text 166*55e87721SMatt Gilbride 167*55e87721SMatt Gilbride return merge 168*55e87721SMatt Gilbride 169*55e87721SMatt Gilbride 170*55e87721SMatt Gilbridedef move( 171*55e87721SMatt Gilbride sources: ListOfPathsOrStrs, 172*55e87721SMatt Gilbride destination: PathOrStr = None, 173*55e87721SMatt Gilbride excludes: ListOfPathsOrStrs = None, 174*55e87721SMatt Gilbride merge: Callable[[str, str, Path], str] = None, 175*55e87721SMatt Gilbride required: bool = False, 176*55e87721SMatt Gilbride) -> bool: 177*55e87721SMatt Gilbride """ 178*55e87721SMatt Gilbride copy file(s) at source to current directory, preserving file mode. 179*55e87721SMatt Gilbride 180*55e87721SMatt Gilbride Args: 181*55e87721SMatt Gilbride sources (ListOfPathsOrStrs): Glob pattern(s) to copy 182*55e87721SMatt Gilbride destination (PathOrStr): Destination folder for copied files 183*55e87721SMatt Gilbride excludes (ListOfPathsOrStrs): Glob pattern(s) of files to skip 184*55e87721SMatt Gilbride merge (Callable[[str, str, Path], str]): Callback function for merging files 185*55e87721SMatt Gilbride if there is an existing file. 186*55e87721SMatt Gilbride required (bool): If required and no source files are copied, throws a MissingSourceError 187*55e87721SMatt Gilbride 188*55e87721SMatt Gilbride Returns: 189*55e87721SMatt Gilbride True if any files were copied, False otherwise. 190*55e87721SMatt Gilbride """ 191*55e87721SMatt Gilbride copied = False 192*55e87721SMatt Gilbride 193*55e87721SMatt Gilbride for excluded_pattern in excludes or []: 194*55e87721SMatt Gilbride metadata.add_pattern_excluded_during_copy(str(excluded_pattern)) 195*55e87721SMatt Gilbride 196*55e87721SMatt Gilbride for source in _expand_paths(sources): 197*55e87721SMatt Gilbride if destination is None: 198*55e87721SMatt Gilbride canonical_destination = _tracked_paths.relativize(source) 199*55e87721SMatt Gilbride else: 200*55e87721SMatt Gilbride canonical_destination = Path(destination) 201*55e87721SMatt Gilbride 202*55e87721SMatt Gilbride if excludes: 203*55e87721SMatt Gilbride excludes = [ 204*55e87721SMatt Gilbride _tracked_paths.relativize(e) for e in _expand_paths(excludes, source) 205*55e87721SMatt Gilbride ] 206*55e87721SMatt Gilbride else: 207*55e87721SMatt Gilbride excludes = [] 208*55e87721SMatt Gilbride if source.is_dir(): 209*55e87721SMatt Gilbride copied = copied or _copy_dir_to_existing_dir( 210*55e87721SMatt Gilbride source, canonical_destination, excludes=excludes, merge=merge 211*55e87721SMatt Gilbride ) 212*55e87721SMatt Gilbride elif source not in excludes: 213*55e87721SMatt Gilbride # copy individual file 214*55e87721SMatt Gilbride if merge is not None and canonical_destination.is_file(): 215*55e87721SMatt Gilbride try: 216*55e87721SMatt Gilbride _merge_file(source, canonical_destination, merge) 217*55e87721SMatt Gilbride except Exception: 218*55e87721SMatt Gilbride logger.exception( 219*55e87721SMatt Gilbride "_merge_file failed for %s, fall back to copy", source 220*55e87721SMatt Gilbride ) 221*55e87721SMatt Gilbride shutil.copy2(source, canonical_destination) 222*55e87721SMatt Gilbride else: 223*55e87721SMatt Gilbride shutil.copy2(source, canonical_destination) 224*55e87721SMatt Gilbride copied = True 225*55e87721SMatt Gilbride 226*55e87721SMatt Gilbride if not copied: 227*55e87721SMatt Gilbride if required: 228*55e87721SMatt Gilbride raise MissingSourceError( 229*55e87721SMatt Gilbride f"No files in sources {sources} were copied. Does the source " 230*55e87721SMatt Gilbride f"contain files?" 231*55e87721SMatt Gilbride ) 232*55e87721SMatt Gilbride else: 233*55e87721SMatt Gilbride logger.warning( 234*55e87721SMatt Gilbride f"No files in sources {sources} were copied. Does the source " 235*55e87721SMatt Gilbride f"contain files?" 236*55e87721SMatt Gilbride ) 237*55e87721SMatt Gilbride 238*55e87721SMatt Gilbride return copied 239*55e87721SMatt Gilbride 240*55e87721SMatt Gilbride 241*55e87721SMatt Gilbridedef _replace_in_file(path, expr, replacement): 242*55e87721SMatt Gilbride try: 243*55e87721SMatt Gilbride with path.open("r+") as fh: 244*55e87721SMatt Gilbride return _replace_in_file_handle(fh, expr, replacement) 245*55e87721SMatt Gilbride except UnicodeDecodeError: 246*55e87721SMatt Gilbride pass # It's a binary file. Try again with a binary regular expression. 247*55e87721SMatt Gilbride flags = expr.flags & ~re.UNICODE 248*55e87721SMatt Gilbride expr = re.compile(expr.pattern.encode(), flags) 249*55e87721SMatt Gilbride with path.open("rb+") as fh: 250*55e87721SMatt Gilbride return _replace_in_file_handle(fh, expr, replacement.encode()) 251*55e87721SMatt Gilbride 252*55e87721SMatt Gilbride 253*55e87721SMatt Gilbridedef _replace_in_file_handle(fh, expr, replacement): 254*55e87721SMatt Gilbride content = fh.read() 255*55e87721SMatt Gilbride content, count = expr.subn(replacement, content) 256*55e87721SMatt Gilbride 257*55e87721SMatt Gilbride # Don't bother writing the file if we didn't change 258*55e87721SMatt Gilbride # anything. 259*55e87721SMatt Gilbride if count: 260*55e87721SMatt Gilbride fh.seek(0) 261*55e87721SMatt Gilbride fh.write(content) 262*55e87721SMatt Gilbride fh.truncate() 263*55e87721SMatt Gilbride return count 264*55e87721SMatt Gilbride 265*55e87721SMatt Gilbride 266*55e87721SMatt Gilbridedef replace( 267*55e87721SMatt Gilbride sources: ListOfPathsOrStrs, before: str, after: str, flags: int = re.MULTILINE 268*55e87721SMatt Gilbride) -> int: 269*55e87721SMatt Gilbride """Replaces occurrences of before with after in all the given sources. 270*55e87721SMatt Gilbride 271*55e87721SMatt Gilbride Returns: 272*55e87721SMatt Gilbride The number of times the text was found and replaced across all files. 273*55e87721SMatt Gilbride """ 274*55e87721SMatt Gilbride expr = re.compile(before, flags=flags or 0) 275*55e87721SMatt Gilbride paths = _filter_files(_expand_paths(sources, ".")) 276*55e87721SMatt Gilbride 277*55e87721SMatt Gilbride if not paths: 278*55e87721SMatt Gilbride logger.warning(f"No files were found in sources {sources} for replace()") 279*55e87721SMatt Gilbride 280*55e87721SMatt Gilbride count_replaced = 0 281*55e87721SMatt Gilbride for path in paths: 282*55e87721SMatt Gilbride replaced = _replace_in_file(path, expr, after) 283*55e87721SMatt Gilbride count_replaced += replaced 284*55e87721SMatt Gilbride if replaced: 285*55e87721SMatt Gilbride logger.info(f"Replaced {before!r} in {path}.") 286*55e87721SMatt Gilbride 287*55e87721SMatt Gilbride if not count_replaced: 288*55e87721SMatt Gilbride logger.warning( 289*55e87721SMatt Gilbride f"No replacements made in {sources} for pattern {before}, maybe " 290*55e87721SMatt Gilbride "replacement is no longer needed?" 291*55e87721SMatt Gilbride ) 292*55e87721SMatt Gilbride return count_replaced 293*55e87721SMatt Gilbride 294*55e87721SMatt Gilbride 295*55e87721SMatt Gilbridedef get_staging_dirs( 296*55e87721SMatt Gilbride default_version: Optional[str] = None, staging_path: Optional[str] = None 297*55e87721SMatt Gilbride) -> List[Path]: 298*55e87721SMatt Gilbride """Returns the list of directories, one per version, copied from 299*55e87721SMatt Gilbride https://github.com/googleapis/googleapis-gen. Will return in lexical sorting 300*55e87721SMatt Gilbride order with the exception of the default_version which will be last (if specified). 301*55e87721SMatt Gilbride 302*55e87721SMatt Gilbride Args: 303*55e87721SMatt Gilbride default_version: the default version of the API. The directory for this version 304*55e87721SMatt Gilbride will be the last item in the returned list if specified. 305*55e87721SMatt Gilbride staging_path: the path to the staging directory. 306*55e87721SMatt Gilbride 307*55e87721SMatt Gilbride Returns: the empty list if no file were copied. 308*55e87721SMatt Gilbride """ 309*55e87721SMatt Gilbride 310*55e87721SMatt Gilbride if staging_path: 311*55e87721SMatt Gilbride staging = Path(staging_path) 312*55e87721SMatt Gilbride else: 313*55e87721SMatt Gilbride staging = Path("owl-bot-staging") 314*55e87721SMatt Gilbride if staging.is_dir(): 315*55e87721SMatt Gilbride # Collect the subdirectories of the staging directory. 316*55e87721SMatt Gilbride versions = [v.name for v in staging.iterdir() if v.is_dir()] 317*55e87721SMatt Gilbride # Reorder the versions so the default version always comes last. 318*55e87721SMatt Gilbride versions = [v for v in versions if v != default_version] 319*55e87721SMatt Gilbride versions.sort() 320*55e87721SMatt Gilbride if default_version is not None: 321*55e87721SMatt Gilbride versions += [default_version] 322*55e87721SMatt Gilbride dirs = [staging / v for v in versions] 323*55e87721SMatt Gilbride for dir in dirs: 324*55e87721SMatt Gilbride _tracked_paths.add(dir) 325*55e87721SMatt Gilbride return dirs 326*55e87721SMatt Gilbride else: 327*55e87721SMatt Gilbride return [] 328*55e87721SMatt Gilbride 329*55e87721SMatt Gilbride 330*55e87721SMatt Gilbridedef remove_staging_dirs(): 331*55e87721SMatt Gilbride """Removes all the staging directories.""" 332*55e87721SMatt Gilbride staging = Path("owl-bot-staging") 333*55e87721SMatt Gilbride if staging.is_dir(): 334*55e87721SMatt Gilbride shutil.rmtree(staging) 335