1""" 2A structured logging utility supporting multiple data formats such as CSV, JSON, 3and YAML. 4 5The main purpose of this script, besides having relevant information available 6in a condensed and deserialized. 7 8This script defines a protocol for different file handling strategies and provides 9implementations for CSV, JSON, and YAML formats. The main class, StructuredLogger, 10allows for easy interaction with log data, enabling users to load, save, increment, 11set, and append fields in the log. The script also includes context managers for 12file locking and editing log data to ensure data integrity and avoid race conditions. 13""" 14 15import json 16import os 17from collections.abc import MutableMapping, MutableSequence 18from contextlib import contextmanager 19from datetime import datetime 20from pathlib import Path 21from typing import Any, Protocol 22 23import fire 24from filelock import FileLock 25 26try: 27 import polars as pl 28 29 CSV_LIB_EXCEPTION = None 30except ImportError as e: 31 CSV_LIB_EXCEPTION: ImportError = e 32 33try: 34 from ruamel.yaml import YAML 35 36 YAML_LIB_EXCEPTION = None 37except ImportError as e: 38 YAML_LIB_EXCEPTION: ImportError = e 39 40 41class ContainerProxy: 42 """ 43 A proxy class that wraps a mutable container object (such as a dictionary or 44 a list) and calls a provided save_callback function whenever the container 45 or its contents 46 are changed. 47 """ 48 def __init__(self, container, save_callback): 49 self.container = container 50 self.save_callback = save_callback 51 52 def __getitem__(self, key): 53 value = self.container[key] 54 if isinstance(value, (MutableMapping, MutableSequence)): 55 return ContainerProxy(value, self.save_callback) 56 return value 57 58 def __setitem__(self, key, value): 59 self.container[key] = value 60 self.save_callback() 61 62 def __delitem__(self, key): 63 del self.container[key] 64 self.save_callback() 65 66 def __getattr__(self, name): 67 attr = getattr(self.container, name) 68 69 if callable(attr): 70 def wrapper(*args, **kwargs): 71 result = attr(*args, **kwargs) 72 self.save_callback() 73 return result 74 75 return wrapper 76 return attr 77 78 def __iter__(self): 79 return iter(self.container) 80 81 def __len__(self): 82 return len(self.container) 83 84 def __repr__(self): 85 return repr(self.container) 86 87 88class AutoSaveDict(dict): 89 """ 90 A subclass of the built-in dict class with additional functionality to 91 automatically save changes to the dictionary. It maintains a timestamp of 92 the last modification and automatically wraps nested mutable containers 93 using ContainerProxy. 94 """ 95 timestamp_key = "_timestamp" 96 97 def __init__(self, *args, save_callback, register_timestamp=True, **kwargs): 98 self.save_callback = save_callback 99 self.__register_timestamp = register_timestamp 100 self.__heartbeat() 101 super().__init__(*args, **kwargs) 102 self.__wrap_dictionaries() 103 104 def __heartbeat(self): 105 if self.__register_timestamp: 106 self[AutoSaveDict.timestamp_key] = datetime.now().isoformat() 107 108 def __save(self): 109 self.__heartbeat() 110 self.save_callback() 111 112 def __wrap_dictionaries(self): 113 for key, value in self.items(): 114 if isinstance(value, MutableMapping) and not isinstance( 115 value, AutoSaveDict 116 ): 117 self[key] = AutoSaveDict( 118 value, save_callback=self.save_callback, register_timestamp=False 119 ) 120 121 def __setitem__(self, key, value): 122 if isinstance(value, MutableMapping) and not isinstance(value, AutoSaveDict): 123 value = AutoSaveDict( 124 value, save_callback=self.save_callback, register_timestamp=False 125 ) 126 super().__setitem__(key, value) 127 128 if self.__register_timestamp and key == AutoSaveDict.timestamp_key: 129 return 130 self.__save() 131 132 def __getitem__(self, key): 133 value = super().__getitem__(key) 134 if isinstance(value, (MutableMapping, MutableSequence)): 135 return ContainerProxy(value, self.__save) 136 return value 137 138 def __delitem__(self, key): 139 super().__delitem__(key) 140 self.__save() 141 142 def pop(self, *args, **kwargs): 143 result = super().pop(*args, **kwargs) 144 self.__save() 145 return result 146 147 def update(self, *args, **kwargs): 148 super().update(*args, **kwargs) 149 self.__wrap_dictionaries() 150 self.__save() 151 152 153class StructuredLoggerStrategy(Protocol): 154 def load_data(self, file_path: Path) -> dict: 155 pass 156 157 def save_data(self, file_path: Path, data: dict) -> None: 158 pass 159 160 161class CSVStrategy: 162 def __init__(self) -> None: 163 if CSV_LIB_EXCEPTION: 164 raise RuntimeError( 165 "Can't parse CSV files. Missing library" 166 ) from CSV_LIB_EXCEPTION 167 168 def load_data(self, file_path: Path) -> dict: 169 dicts: list[dict[str, Any]] = pl.read_csv( 170 file_path, try_parse_dates=True 171 ).to_dicts() 172 data = {} 173 for d in dicts: 174 for k, v in d.items(): 175 if k != AutoSaveDict.timestamp_key and k in data: 176 if isinstance(data[k], list): 177 data[k].append(v) 178 continue 179 data[k] = [data[k], v] 180 else: 181 data[k] = v 182 return data 183 184 def save_data(self, file_path: Path, data: dict) -> None: 185 pl.DataFrame(data).write_csv(file_path) 186 187 188class JSONStrategy: 189 def load_data(self, file_path: Path) -> dict: 190 return json.loads(file_path.read_text()) 191 192 def save_data(self, file_path: Path, data: dict) -> None: 193 with open(file_path, "w") as f: 194 json.dump(data, f, indent=2) 195 196 197class YAMLStrategy: 198 def __init__(self): 199 if YAML_LIB_EXCEPTION: 200 raise RuntimeError( 201 "Can't parse YAML files. Missing library" 202 ) from YAML_LIB_EXCEPTION 203 self.yaml = YAML() 204 self.yaml.indent(sequence=4, offset=2) 205 self.yaml.default_flow_style = False 206 self.yaml.representer.add_representer(AutoSaveDict, self.represent_dict) 207 208 @classmethod 209 def represent_dict(cls, dumper, data): 210 return dumper.represent_mapping("tag:yaml.org,2002:map", data) 211 212 def load_data(self, file_path: Path) -> dict: 213 return self.yaml.load(file_path.read_text()) 214 215 def save_data(self, file_path: Path, data: dict) -> None: 216 with open(file_path, "w") as f: 217 self.yaml.dump(data, f) 218 219 220class StructuredLogger: 221 def __init__( 222 self, file_name: str, strategy: StructuredLoggerStrategy = None, truncate=False 223 ): 224 self.file_name: str = file_name 225 self.file_path = Path(self.file_name) 226 self._data: AutoSaveDict = AutoSaveDict(save_callback=self.save_data) 227 228 if strategy is None: 229 self.strategy: StructuredLoggerStrategy = self.guess_strategy_from_file( 230 self.file_path 231 ) 232 else: 233 self.strategy = strategy 234 235 if not self.file_path.exists(): 236 Path.mkdir(self.file_path.parent, exist_ok=True) 237 self.save_data() 238 return 239 240 if truncate: 241 with self.get_lock(): 242 os.truncate(self.file_path, 0) 243 self.save_data() 244 245 def load_data(self): 246 self._data = self.strategy.load_data(self.file_path) 247 248 def save_data(self): 249 self.strategy.save_data(self.file_path, self._data) 250 251 @property 252 def data(self) -> AutoSaveDict: 253 return self._data 254 255 @contextmanager 256 def get_lock(self): 257 with FileLock(f"{self.file_path}.lock", timeout=10): 258 yield 259 260 @contextmanager 261 def edit_context(self): 262 """ 263 Context manager that ensures proper loading and saving of log data when 264 performing multiple modifications. 265 """ 266 with self.get_lock(): 267 try: 268 self.load_data() 269 yield 270 finally: 271 self.save_data() 272 273 @staticmethod 274 def guess_strategy_from_file(file_path: Path) -> StructuredLoggerStrategy: 275 file_extension = file_path.suffix.lower().lstrip(".") 276 return StructuredLogger.get_strategy(file_extension) 277 278 @staticmethod 279 def get_strategy(strategy_name: str) -> StructuredLoggerStrategy: 280 strategies = { 281 "csv": CSVStrategy, 282 "json": JSONStrategy, 283 "yaml": YAMLStrategy, 284 "yml": YAMLStrategy, 285 } 286 287 try: 288 return strategies[strategy_name]() 289 except KeyError as e: 290 raise ValueError(f"Unknown strategy for: {strategy_name}") from e 291 292 293if __name__ == "__main__": 294 fire.Fire(StructuredLogger) 295