1# Copyright 2024 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Sensor schema validation tooling.""" 15 16from collections.abc import Sequence 17import importlib.resources 18import logging 19from pathlib import Path 20 21import jsonschema # type: ignore 22import jsonschema.exceptions # type: ignore 23import yaml 24 25_METADATA_SCHEMA = yaml.safe_load( 26 importlib.resources.read_text("pw_sensor", "metadata_schema.json") 27) 28 29_DEPENDENCY_SCHEMA = yaml.safe_load( 30 importlib.resources.read_text("pw_sensor", "dependency_schema.json") 31) 32 33_RESOLVED_SCHEMA = yaml.safe_load( 34 importlib.resources.read_text("pw_sensor", "resolved_schema.json") 35) 36 37 38class Validator: 39 """ 40 Context used for validating metadata dictionaries. 41 42 What the validator is: 43 - A system to resolve and verify that declared sensor metadata is well 44 defined and formatted 45 - A utility to resolve any and all dependencies when using a specified 46 metadata file. 47 48 What the validator is NOT: 49 - Code generator 50 """ 51 52 def __init__( 53 self, 54 include_paths: Sequence[Path] | None = None, 55 log_level: int = logging.WARNING, 56 ) -> None: 57 """ 58 Construct a Validator with some context of the current run. 59 60 Args: 61 include_paths: An optional list of directories in which to resolve 62 dependencies 63 log_level: A desired logging level (defaults to logging.WARNING) 64 """ 65 self._include_paths = include_paths if include_paths else [] 66 self._logger = logging.getLogger(self.__class__.__name__) 67 self._logger.setLevel(log_level) 68 69 def validate(self, metadata: dict) -> dict: 70 """ 71 Accept a structured metadata description. This dictionary should first 72 pass the schema provided in metadata_schema.yaml. Then, every channel 73 used by the sensor should be defined in exactly one of the dependencies. 74 Example YAML: 75 76 deps: 77 - "pw_sensor/channels.yaml" 78 compatible: 79 org: "Bosch" 80 part: "BMA4xx 81 supported-buses: 82 - i2c 83 channels: 84 acceleration: [] 85 die_temperature: [] 86 87 Args: 88 metadata: Structured sensor data, this will NOT be modified 89 90 Returns: 91 A set of attributes, channels, triggers, and units along with a single 92 sensor which match the schema in resolved_schema.json. 93 94 Raises: 95 RuntimeError: An error in the schema validation or a missing 96 definition. 97 FileNotFoundError: One of the dependencies was not found. 98 """ 99 result: dict = { 100 "attributes": {}, 101 "channels": {}, 102 "triggers": {}, 103 "units": {}, 104 "sensors": {}, 105 } 106 metadata = metadata.copy() 107 108 # Validate the incoming schema 109 try: 110 jsonschema.validate(instance=metadata, schema=_METADATA_SCHEMA) 111 except jsonschema.exceptions.ValidationError as e: 112 raise RuntimeError( 113 "ERROR: Malformed sensor metadata YAML:\n" 114 f"{yaml.safe_dump(metadata, indent=2)}" 115 ) from e 116 117 # Resolve all the dependencies, after this, 'result' will have all the 118 # missing properties for which defaults can be provided 119 self._resolve_dependencies(metadata=metadata, out=result) 120 121 self._logger.debug( 122 "Resolved dependencies:\n%s", yaml.safe_dump(result, indent=2) 123 ) 124 125 # Resolve all channel entries 126 self._resolve_channels(metadata=metadata, out=result) 127 128 # Resolve all attribute entries 129 self._resolve_attributes(metadata=metadata, out=result) 130 131 # Resolve all trigger entries 132 self._resolve_triggers(metadata=metadata, out=result) 133 134 compatible = metadata.pop("compatible") 135 supported_buses = metadata.pop("supported-buses") 136 channels = metadata.pop("channels") 137 attributes = metadata.pop("attributes") 138 triggers = metadata.pop("triggers") 139 result["sensors"][f"{compatible['org']},{compatible['part']}"] = { 140 "compatible": compatible, 141 "supported-buses": supported_buses, 142 "channels": channels, 143 "attributes": attributes, 144 "triggers": triggers, 145 "description": metadata.get("description", ""), 146 "extras": metadata.get("extras", {}), 147 } 148 149 # Validate the final output before returning 150 try: 151 jsonschema.validate(instance=result, schema=_RESOLVED_SCHEMA) 152 except jsonschema.exceptions.ValidationError as e: 153 raise RuntimeError( 154 "ERROR: Malformed output YAML: " 155 f"{yaml.safe_dump(result, indent=2)}" 156 ) from e 157 158 return result 159 160 def _resolve_dependencies(self, metadata: dict, out: dict) -> None: 161 """ 162 Given a list of dependencies, ensure that each of them exists and 163 matches the schema provided in dependency_schema.yaml. Once loaded, the 164 content of the definition file will be resolved (filling in any missing 165 fields that can be inherited) and the final result will be placed in the 166 'out' dictionary. 167 168 Args: 169 metadata: The full sensor metadata passed to the validate function 170 out: Output dictionary where the resolved dependencies should be 171 stored 172 173 Raises: 174 RuntimeError: An error in the schema validation or a missing 175 definition. 176 FileNotFoundError: One of the dependencies was not found. 177 """ 178 deps: None | list[str] = metadata.get("deps") 179 if not deps: 180 self._logger.debug("No dependencies found, skipping imports") 181 return 182 183 merged_deps: dict = { 184 "attributes": {}, 185 "channels": {}, 186 "triggers": {}, 187 "units": {}, 188 } 189 for dep in deps: 190 # Load each of the dependencies, then merge them. This avoids any 191 # include dependency order issues. 192 dep_file = self._get_dependency_file(dep) 193 with open(dep_file, mode="r", encoding="utf-8") as dep_yaml_file: 194 dep_yaml = yaml.safe_load(dep_yaml_file) 195 try: 196 jsonschema.validate( 197 instance=dep_yaml, schema=_DEPENDENCY_SCHEMA 198 ) 199 except jsonschema.exceptions.ValidationError as e: 200 raise RuntimeError( 201 "ERROR: Malformed dependency YAML: " 202 f"{yaml.safe_dump(dep_yaml, indent=2)}" 203 ) from e 204 # Merge all the loaded values into 'merged_deps' 205 for category in merged_deps: 206 self._merge_deps( 207 category=category, 208 dep_yaml=dep_yaml, 209 merged_deps=merged_deps, 210 ) 211 # Backfill any default values from the merged dependencies and put them 212 # into 'out' 213 self._backfill_declarations(declarations=merged_deps, out=out) 214 215 @staticmethod 216 def _merge_deps(category: str, dep_yaml: dict, merged_deps: dict) -> None: 217 """ 218 Pull all properties from dep_yaml[category] and put them into 219 merged_deps after validating that no key duplicates exist. 220 221 Args: 222 category: The index of dep_yaml and merged_deps to merge 223 dep_yaml: The newly loaded dependency YAML 224 merged_deps: The accumulated dependency map 225 """ 226 for key, value in dep_yaml.get(category, {}).items(): 227 assert ( 228 key not in merged_deps[category] 229 ), f"'{key}' was already found under '{category}'" 230 merged_deps[category][key] = value 231 232 def _backfill_declarations(self, declarations: dict, out: dict) -> None: 233 """ 234 Add any missing properties of a declaration object. 235 236 Args: 237 declarations: The top level declarations dictionary loaded from the 238 dependency file. 239 out: The already resolved map of all defined dependencies 240 """ 241 self._backfill_units(declarations=declarations, out=out) 242 self._backfill_channels(declarations=declarations, out=out) 243 self._backfill_attributes(declarations=declarations, out=out) 244 self._backfill_triggers(declarations=declarations, out=out) 245 246 @staticmethod 247 def _backfill_units(declarations: dict, out: dict) -> None: 248 """ 249 Move units from 'declarations' to 'out' while also filling in any 250 default values. 251 252 Args: 253 declarations: The original YAML declaring units. 254 out: Output dictionary where we'll add the key "units" wit the result. 255 """ 256 if out.get("units") is None: 257 out["units"] = {} 258 resolved_units: dict = out["units"] 259 if not declarations.get("units"): 260 return 261 262 for units_id, unit in declarations["units"].items(): 263 # Copy unit to resolved_units and fill any default values 264 assert resolved_units.get(units_id) is None 265 resolved_units[units_id] = unit 266 if not unit.get("name"): 267 unit["name"] = unit["symbol"] 268 if unit.get("description") is None: 269 unit["description"] = "" 270 271 @staticmethod 272 def _backfill_attributes(declarations: dict, out: dict) -> None: 273 """ 274 Move attributes from 'delcarations' to 'out' while also filling in any 275 default values. 276 277 Args: 278 declarations: The original YAML declaring attributes. 279 out: Output dictionary where we'll add the key "attributes" with the 280 result. 281 """ 282 if out.get("attributes") is None: 283 out["attributes"] = {} 284 resolved_attributes: dict = out["attributes"] 285 if not declarations.get("attributes"): 286 return 287 288 for attr_id, attribute in declarations["attributes"].items(): 289 # Copy attribute to resolved_attributes and fill any default values 290 assert resolved_attributes.get(attr_id) is None 291 resolved_attributes[attr_id] = attribute 292 if not attribute.get("name"): 293 attribute["name"] = attr_id 294 if not attribute.get("description"): 295 attribute["description"] = "" 296 297 @staticmethod 298 def _backfill_channels(declarations: dict, out: dict) -> None: 299 """ 300 Move channels from 'declarations' to 'out' while also filling in any 301 default values. 302 303 Args: 304 declarations: The original YAML declaring channels. 305 out: Output dictionary where we'll add the key "channels" with the 306 result. 307 """ 308 if out.get("channels") is None: 309 out["channels"] = {} 310 resolved_channels: dict = out["channels"] 311 if not declarations.get("channels"): 312 return 313 314 for chan_id, channel in declarations["channels"].items(): 315 # Copy channel to resolved_channels and fill any default values 316 assert resolved_channels.get(chan_id) is None 317 resolved_channels[chan_id] = channel 318 if not channel.get("name"): 319 channel["name"] = chan_id 320 if not channel.get("description"): 321 channel["description"] = "" 322 assert channel["units"] in out["units"], ( 323 f"'{channel['units']}' not found in\n" 324 + f"{yaml.safe_dump(out.get('units', {}), indent=2)}" 325 ) 326 327 @staticmethod 328 def _backfill_triggers(declarations: dict, out: dict) -> None: 329 """ 330 Move triggers from 'delcarations' to 'out' while also filling in any 331 default values. 332 333 Args: 334 declarations: The original YAML declaring triggers. 335 out: Output dictionary where we'll add the key "triggers" with the 336 result. 337 """ 338 if out.get("triggers") is None: 339 out["triggers"] = {} 340 resolved_triggers: dict = out["triggers"] 341 if not declarations.get("triggers"): 342 return 343 344 for trigger_id, trigger in declarations["triggers"].items(): 345 # Copy trigger to resolved_triggers and fill any default values 346 assert resolved_triggers.get(trigger_id) is None 347 resolved_triggers[trigger_id] = trigger 348 if not trigger.get("name"): 349 trigger["name"] = trigger_id 350 if not trigger.get("description"): 351 trigger["description"] = "" 352 353 def _resolve_attributes(self, metadata: dict, out: dict) -> None: 354 """ 355 For each attribute in the metadta, find the matching definition in the 356 'out/attributes' entry and use the data to fill any missing information. 357 For example, if an entry exists that looks like: 358 sample_rate: {} 359 360 We would then try and find the 'sample_rate' key in the out/attributes 361 list (which was already validated by _resolve_dependencies). Since the 362 example above does not override any fields, we would copy the 'name', 363 'description', and 'units' from the definition into the attribute entry. 364 365 Args: 366 metadata: The full sensor metadata passed to the validate function 367 out: The current output, used to get channel definitions 368 369 Raises: 370 RuntimeError: An error in the schema validation or a missing 371 definition. 372 """ 373 attributes: list | None = metadata.get("attributes") 374 if not attributes: 375 metadata["attributes"] = [] 376 self._logger.debug("No attributes found, skipping") 377 return 378 379 attribute: dict 380 for attribute in attributes: 381 assert attribute["attribute"] in out["attributes"] 382 assert attribute["channel"] in out["channels"] 383 assert attribute["units"] in out["units"] 384 385 def _resolve_channels(self, metadata: dict, out: dict) -> None: 386 """ 387 For each channel in the metadata, find the matching definition in the 388 'out/channels' entry and use the data to fill any missing information. 389 For example, if an entry exists that looks like: 390 acceleration: {} 391 392 We would then try and find the 'acceleration' key in the out/channels 393 dict (which was already validated by _resolve_dependencies). Since the 394 example above does not override any fields, we would copy the 'name', 395 'description', and 'units' from the definition into the channel entry. 396 397 Args: 398 metadata: The full sensor metadata passed to the validate function 399 out: The current output, used to get channel definitions 400 401 Raises: 402 RuntimeError: An error in the schema validation or a missing 403 definition. 404 """ 405 channels: dict | None = metadata.get("channels") 406 if not channels: 407 self._logger.debug("No channels found, skipping") 408 metadata["channels"] = {} 409 return 410 411 channel_name: str 412 indices: list[dict] 413 for channel_name, indices in channels.items(): 414 # channel_name must have been resolved by now. 415 if out["channels"].get(channel_name) is None: 416 raise RuntimeError( 417 f"Failed to find a definition for '{channel_name}', did you" 418 " forget a dependency?" 419 ) 420 channel = out["channels"][channel_name] 421 # The content of 'channel' came from the 'out/channels' dict which 422 # was already validated and every field added if missing. At this 423 # point it's safe to access the channel's name, description, and 424 # units. 425 426 if not indices: 427 indices.append({}) 428 429 index: dict 430 for index in indices: 431 if not index.get("name"): 432 index["name"] = channel["name"] 433 if not index.get("description"): 434 index["description"] = channel["description"] 435 # Always use the same units 436 index["units"] = channel["units"] 437 438 def _resolve_triggers(self, metadata: dict, out: dict) -> None: 439 """ 440 For each trigger in the metadata, find the matching definition in the 441 'out/triggers' entry and use the data to fill any missing information. 442 For example, if an entry exists that looks like: 443 data_ready: {} 444 445 We would then try and find the 'data_ready' key in the out/triggers 446 dict (which was already validated by _resolve_dependencies). Since the 447 example above does not override any fields, we would copy the 'name' and 448 'description' from the definition into the trigger entry. 449 450 Args: 451 metadata: The full sensor metadata passed to the validate function 452 out: The current output, used to get trigger definitions 453 454 Raises: 455 RuntimeError: An error in the schema validation or a missing 456 definition. 457 """ 458 triggers: list | None = metadata.get("triggers") 459 if not triggers: 460 metadata["triggers"] = [] 461 self._logger.debug("No triggers found, skipping") 462 return 463 464 for trigger_name in triggers: 465 assert trigger_name in out["triggers"] 466 467 def _get_dependency_file(self, dep: str) -> Path: 468 """ 469 Search for a dependency file and return the full path to it if found. 470 471 Args: 472 dep: The dependency string as provided by the metadata yaml. 473 474 Returns: 475 The dependency file as a Path object if found. 476 477 Raises: 478 FileNotFoundError: One of the dependencies was not found. 479 """ 480 error_string = f"Failed to find {dep} using search paths:" 481 # Check if a full path was used 482 if Path(dep).is_file(): 483 return Path(dep) 484 485 # Search all the include paths 486 for path in self._include_paths: 487 if (path / dep).is_file(): 488 return path / dep 489 error_string += f"\n- {path}" 490 491 raise FileNotFoundError(error_string) 492