xref: /aosp_15_r20/external/toolchain-utils/llvm_tools/atomic_write_file.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# Copyright 2023 The ChromiumOS Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Atomic file writing utilities.
6
7Provides atomic_write(...), which allows atomically replacing the contents
8of a file.
9"""
10
11import contextlib
12import logging
13import os
14from pathlib import Path
15import tempfile
16from typing import Iterator, Literal, Optional, Union
17
18
19@contextlib.contextmanager
20def atomic_write(
21    fp: Union[Path, str],
22    mode: Literal["w", "wb"] = "w",
23    encoding: Optional[str] = None,
24) -> Iterator:
25    """Write to a filepath atomically.
26
27    This works by a temp file swap, created with a .tmp suffix in
28    the same directory briefly until being renamed to the desired
29    filepath.
30
31    In the event an exception is raised during the write, the
32    temporary file is deleted and the original filepath is untouched.
33
34    Examples:
35        >>> with atomic_write("my_file.txt", encoding="utf-8") as f:
36        >>>     f.write("Hello world!")
37        >>>     # my_file.txt is still unmodified
38        >>> # "f" is closed here, and my_file.txt is written to.
39
40    Args:
41        fp: Filepath to open.
42        mode: File mode; can be 'w', 'wb'. Default 'w'.
43        encoding: the encoding to use (defaults to None).
44
45    Raises:
46        ValueError when the mode is invalid.
47    """
48    if isinstance(fp, str):
49        fp = Path(fp)
50    if mode not in ("w", "wb"):
51        raise ValueError(f"mode {mode} not accepted")
52
53    # We use mkstemp here because we want to handle the closing and
54    # replacement ourselves.
55    result = tempfile.mkstemp(
56        prefix=fp.name,
57        suffix=".tmp",
58        dir=fp.parent,
59    )
60    fd, tmp_path = (result[0], Path(result[1]))
61
62    try:
63        with os.fdopen(fd, mode=mode, encoding=encoding) as f:
64            yield f
65    except:
66        try:
67            tmp_path.unlink()
68        except Exception as e:
69            logging.exception("unexpected error removing temporary file %s", e)
70        raise
71    tmp_path.replace(fp)
72