# Copyright (C) 2020 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module to check updates from crates.io.""" import json import os from pathlib import Path import re import shutil import tempfile import urllib.request from typing import IO import archive_utils from base_updater import Updater import git_utils # pylint: disable=import-error import metadata_pb2 # type: ignore import updater_utils LIBRARY_NAME_PATTERN: str = r"([-\w]+)" ALPHA_BETA_PATTERN: str = r"^.*[0-9]+\.[0-9]+\.[0-9]+-(alpha|beta).*" ALPHA_BETA_RE: re.Pattern = re.compile(ALPHA_BETA_PATTERN) """Match both x.y.z and x.y.z+a.b.c which is used by some Vulkan binding libraries""" VERSION_PATTERN: str = r"([0-9]+)\.([0-9]+)\.([0-9]+)(\+([0-9]+)\.([0-9]+)\.([0-9]+))?" VERSION_RE: re.Pattern = re.compile(VERSION_PATTERN) CRATES_IO_ARCHIVE_URL_PATTERN: str = (r"^https:\/\/static.crates.io\/crates\/" + LIBRARY_NAME_PATTERN + "/" + LIBRARY_NAME_PATTERN + "-" + "(.*?)" + ".crate") CRATES_IO_ARCHIVE_URL_RE: re.Pattern = re.compile(CRATES_IO_ARCHIVE_URL_PATTERN) DESCRIPTION_PATTERN: str = r"^description *= *(\".+\")" DESCRIPTION_RE: re.Pattern = re.compile(DESCRIPTION_PATTERN) class CratesUpdater(Updater): """Updater for crates.io packages.""" UPSTREAM_REMOTE_NAME: str = "update_origin" download_url: str package: str package_dir: str temp_file: IO def is_supported_url(self) -> bool: match = CRATES_IO_ARCHIVE_URL_RE.match(self._old_identifier.value) if match is None: return False self.package = match.group(1) return True def setup_remote(self) -> None: url = "https://crates.io/api/v1/crates/" + self.package with urllib.request.urlopen(url) as request: data = json.loads(request.read().decode()) homepage = data["crate"]["repository"] remotes = git_utils.list_remotes(self._proj_path) current_remote_url = None for name, url in remotes.items(): if name == self.UPSTREAM_REMOTE_NAME: current_remote_url = url if current_remote_url is not None and current_remote_url != homepage: git_utils.remove_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME) current_remote_url = None if current_remote_url is None: git_utils.add_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME, homepage) branch = git_utils.detect_default_branch(self._proj_path, self.UPSTREAM_REMOTE_NAME) git_utils.fetch(self._proj_path, self.UPSTREAM_REMOTE_NAME, branch) def _get_version_numbers(self, version: str) -> tuple[int, int, int]: match = VERSION_RE.match(version) if match is not None: return ( int(match.group(1)), int(match.group(2)), int(match.group(3)), ) return (0, 0, 0) def _is_newer_version(self, prev_version: str, prev_id: int, check_version: str, check_id: int): """Return true if check_version+id is newer than prev_version+id.""" return ((self._get_version_numbers(check_version), check_id) > (self._get_version_numbers(prev_version), prev_id)) def _find_latest_non_test_version(self) -> None: url = f"https://crates.io/api/v1/crates/{self.package}/versions" with urllib.request.urlopen(url) as request: data = json.loads(request.read().decode()) last_id = 0 self._new_identifier.version = "" for v in data["versions"]: version = v["num"] if (not v["yanked"] and not ALPHA_BETA_RE.match(version) and self._is_newer_version( self._new_identifier.version, last_id, version, int(v["id"]))): last_id = int(v["id"]) self._new_identifier.version = version self.download_url = "https://crates.io" + v["dl_path"] def check(self) -> None: """Checks crates.io and returns whether a new version is available.""" url = "https://crates.io/api/v1/crates/" + self.package with urllib.request.urlopen(url) as request: data = json.loads(request.read().decode()) self._new_identifier.version = data["crate"]["max_version"] # Skip d.d.d-{alpha,beta}* versions if ALPHA_BETA_RE.match(self._new_identifier.version): print(f"Ignore alpha or beta release:{self.package}-{self._new_identifier.version}.") self._find_latest_non_test_version() else: url = url + "/" + self._new_identifier.version with urllib.request.urlopen(url) as request: data = json.loads(request.read().decode()) self.download_url = "https://crates.io" + data["version"]["dl_path"] def set_new_version_to_old(self): super().refresh_without_upgrading() # A shortcut to use the static download path. self.download_url = f"https://static.crates.io/crates/{self.package}/" \ f"{self.package}-{self._new_identifier.version}.crate" def update(self) -> None: """Updates the package. Has to call check() before this function. """ try: temporary_dir = archive_utils.download_and_extract(self.download_url) self.package_dir = archive_utils.find_archive_root(temporary_dir) self.temp_file = tempfile.NamedTemporaryFile() updater_utils.replace_package(self.package_dir, self._proj_path, self.temp_file.name) self.check_for_errors() finally: urllib.request.urlcleanup() def rollback(self) -> bool: # Only rollback if we have already swapped, # which we denote by writing to this file. if os.fstat(self.temp_file.fileno()).st_size > 0: tmp_dir = tempfile.TemporaryDirectory() shutil.move(self._proj_path, tmp_dir.name) shutil.move(self.package_dir, self._proj_path) shutil.move(Path(tmp_dir.name) / self.package, self.package_dir) return True return False def update_metadata(self, metadata: metadata_pb2.MetaData) -> metadata_pb2: """Updates METADATA content.""" # copy only HOMEPAGE url, and then add new ARCHIVE url. updated_metadata = super().update_metadata(metadata) for identifier in updated_metadata.third_party.identifier: if identifier.version: identifier.value = f"https://static.crates.io/crates/" \ f"{updated_metadata.name}/"\ f"{updated_metadata.name}" \ f"-{self.latest_identifier.version}.crate" break # copy description from Cargo.toml to METADATA cargo_toml = os.path.join(self.project_path, "Cargo.toml") description = self._get_cargo_description(cargo_toml) if description and description != updated_metadata.description: print("New METADATA description:", description) updated_metadata.description = description return updated_metadata def check_for_errors(self) -> None: # Check for .rej patches from failing to apply patches. # If this has too many false positives, we could either # check if the files are modified by patches or somehow # track which files existed before the patching. rejects = list(self._proj_path.glob('**/*.rej')) if len(rejects) > 0: print(f"Error: Found patch reject files: {str(rejects)}") self._has_errors = True def _toml2str(self, line: str) -> str: """Convert a quoted toml string to a Python str without quotes.""" if line.startswith("\"\"\""): return "" # cannot handle broken multi-line description # TOML string escapes: \b \t \n \f \r \" \\ (no unicode escape) line = line[1:-1].replace("\\\\", "\n").replace("\\b", "") line = line.replace("\\t", " ").replace("\\n", " ").replace("\\f", " ") line = line.replace("\\r", "").replace("\\\"", "\"").replace("\n", "\\") # replace a unicode quotation mark, used in the libloading crate return line.replace("’", "'").strip() def _get_cargo_description(self, cargo_toml: str) -> str: """Return the description in Cargo.toml or empty string.""" if os.path.isfile(cargo_toml) and os.access(cargo_toml, os.R_OK): with open(cargo_toml, "r", encoding="utf-8") as toml_file: for line in toml_file: match = DESCRIPTION_RE.match(line) if match: return self._toml2str(match.group(1)) return ""