xref: /aosp_15_r20/external/pigweed/pw_sensor/py/pw_sensor/validator.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2024 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Sensor schema validation tooling."""
15
16from collections.abc import Sequence
17import importlib.resources
18import logging
19from pathlib import Path
20
21import jsonschema  # type: ignore
22import jsonschema.exceptions  # type: ignore
23import yaml
24
25_METADATA_SCHEMA = yaml.safe_load(
26    importlib.resources.read_text("pw_sensor", "metadata_schema.json")
27)
28
29_DEPENDENCY_SCHEMA = yaml.safe_load(
30    importlib.resources.read_text("pw_sensor", "dependency_schema.json")
31)
32
33_RESOLVED_SCHEMA = yaml.safe_load(
34    importlib.resources.read_text("pw_sensor", "resolved_schema.json")
35)
36
37
38class Validator:
39    """
40    Context used for validating metadata dictionaries.
41
42    What the validator is:
43    - A system to resolve and verify that declared sensor metadata is well
44      defined and formatted
45    - A utility to resolve any and all dependencies when using a specified
46      metadata file.
47
48    What the validator is NOT:
49    - Code generator
50    """
51
52    def __init__(
53        self,
54        include_paths: Sequence[Path] | None = None,
55        log_level: int = logging.WARNING,
56    ) -> None:
57        """
58        Construct a Validator with some context of the current run.
59
60        Args:
61          include_paths: An optional list of directories in which to resolve
62            dependencies
63          log_level: A desired logging level (defaults to logging.WARNING)
64        """
65        self._include_paths = include_paths if include_paths else []
66        self._logger = logging.getLogger(self.__class__.__name__)
67        self._logger.setLevel(log_level)
68
69    def validate(self, metadata: dict) -> dict:
70        """
71        Accept a structured metadata description. This dictionary should first
72        pass the schema provided in metadata_schema.yaml. Then, every channel
73        used by the sensor should be defined in exactly one of the dependencies.
74        Example YAML:
75
76          deps:
77            - "pw_sensor/channels.yaml"
78          compatible:
79            org: "Bosch"
80            part: "BMA4xx
81          supported-buses:
82            - i2c
83          channels:
84            acceleration: []
85            die_temperature: []
86
87        Args:
88          metadata: Structured sensor data, this will NOT be modified
89
90        Returns:
91          A set of attributes, channels, triggers, and units along with a single
92          sensor which match the schema in resolved_schema.json.
93
94        Raises:
95          RuntimeError: An error in the schema validation or a missing
96            definition.
97          FileNotFoundError: One of the dependencies was not found.
98        """
99        result: dict = {
100            "attributes": {},
101            "channels": {},
102            "triggers": {},
103            "units": {},
104            "sensors": {},
105        }
106        metadata = metadata.copy()
107
108        # Validate the incoming schema
109        try:
110            jsonschema.validate(instance=metadata, schema=_METADATA_SCHEMA)
111        except jsonschema.exceptions.ValidationError as e:
112            raise RuntimeError(
113                "ERROR: Malformed sensor metadata YAML:\n"
114                f"{yaml.safe_dump(metadata, indent=2)}"
115            ) from e
116
117        # Resolve all the dependencies, after this, 'result' will have all the
118        # missing properties for which defaults can be provided
119        self._resolve_dependencies(metadata=metadata, out=result)
120
121        self._logger.debug(
122            "Resolved dependencies:\n%s", yaml.safe_dump(result, indent=2)
123        )
124
125        # Resolve all channel entries
126        self._resolve_channels(metadata=metadata, out=result)
127
128        # Resolve all attribute entries
129        self._resolve_attributes(metadata=metadata, out=result)
130
131        # Resolve all trigger entries
132        self._resolve_triggers(metadata=metadata, out=result)
133
134        compatible = metadata.pop("compatible")
135        supported_buses = metadata.pop("supported-buses")
136        channels = metadata.pop("channels")
137        attributes = metadata.pop("attributes")
138        triggers = metadata.pop("triggers")
139        result["sensors"][f"{compatible['org']},{compatible['part']}"] = {
140            "compatible": compatible,
141            "supported-buses": supported_buses,
142            "channels": channels,
143            "attributes": attributes,
144            "triggers": triggers,
145            "description": metadata.get("description", ""),
146            "extras": metadata.get("extras", {}),
147        }
148
149        # Validate the final output before returning
150        try:
151            jsonschema.validate(instance=result, schema=_RESOLVED_SCHEMA)
152        except jsonschema.exceptions.ValidationError as e:
153            raise RuntimeError(
154                "ERROR: Malformed output YAML: "
155                f"{yaml.safe_dump(result, indent=2)}"
156            ) from e
157
158        return result
159
160    def _resolve_dependencies(self, metadata: dict, out: dict) -> None:
161        """
162        Given a list of dependencies, ensure that each of them exists and
163        matches the schema provided in dependency_schema.yaml. Once loaded, the
164        content of the definition file will be resolved (filling in any missing
165        fields that can be inherited) and the final result will be placed in the
166        'out' dictionary.
167
168        Args:
169          metadata: The full sensor metadata passed to the validate function
170          out: Output dictionary where the resolved dependencies should be
171            stored
172
173        Raises:
174          RuntimeError: An error in the schema validation or a missing
175            definition.
176          FileNotFoundError: One of the dependencies was not found.
177        """
178        deps: None | list[str] = metadata.get("deps")
179        if not deps:
180            self._logger.debug("No dependencies found, skipping imports")
181            return
182
183        merged_deps: dict = {
184            "attributes": {},
185            "channels": {},
186            "triggers": {},
187            "units": {},
188        }
189        for dep in deps:
190            # Load each of the dependencies, then merge them. This avoids any
191            # include dependency order issues.
192            dep_file = self._get_dependency_file(dep)
193            with open(dep_file, mode="r", encoding="utf-8") as dep_yaml_file:
194                dep_yaml = yaml.safe_load(dep_yaml_file)
195                try:
196                    jsonschema.validate(
197                        instance=dep_yaml, schema=_DEPENDENCY_SCHEMA
198                    )
199                except jsonschema.exceptions.ValidationError as e:
200                    raise RuntimeError(
201                        "ERROR: Malformed dependency YAML: "
202                        f"{yaml.safe_dump(dep_yaml, indent=2)}"
203                    ) from e
204                # Merge all the loaded values into 'merged_deps'
205                for category in merged_deps:
206                    self._merge_deps(
207                        category=category,
208                        dep_yaml=dep_yaml,
209                        merged_deps=merged_deps,
210                    )
211        # Backfill any default values from the merged dependencies and put them
212        # into 'out'
213        self._backfill_declarations(declarations=merged_deps, out=out)
214
215    @staticmethod
216    def _merge_deps(category: str, dep_yaml: dict, merged_deps: dict) -> None:
217        """
218        Pull all properties from dep_yaml[category] and put them into
219        merged_deps after validating that no key duplicates exist.
220
221        Args:
222          category: The index of dep_yaml and merged_deps to merge
223          dep_yaml: The newly loaded dependency YAML
224          merged_deps: The accumulated dependency map
225        """
226        for key, value in dep_yaml.get(category, {}).items():
227            assert (
228                key not in merged_deps[category]
229            ), f"'{key}' was already found under '{category}'"
230            merged_deps[category][key] = value
231
232    def _backfill_declarations(self, declarations: dict, out: dict) -> None:
233        """
234        Add any missing properties of a declaration object.
235
236        Args:
237          declarations: The top level declarations dictionary loaded from the
238            dependency file.
239          out: The already resolved map of all defined dependencies
240        """
241        self._backfill_units(declarations=declarations, out=out)
242        self._backfill_channels(declarations=declarations, out=out)
243        self._backfill_attributes(declarations=declarations, out=out)
244        self._backfill_triggers(declarations=declarations, out=out)
245
246    @staticmethod
247    def _backfill_units(declarations: dict, out: dict) -> None:
248        """
249        Move units from 'declarations' to 'out' while also filling in any
250        default values.
251
252        Args:
253          declarations: The original YAML declaring units.
254          out: Output dictionary where we'll add the key "units" wit the result.
255        """
256        if out.get("units") is None:
257            out["units"] = {}
258        resolved_units: dict = out["units"]
259        if not declarations.get("units"):
260            return
261
262        for units_id, unit in declarations["units"].items():
263            # Copy unit to resolved_units and fill any default values
264            assert resolved_units.get(units_id) is None
265            resolved_units[units_id] = unit
266            if not unit.get("name"):
267                unit["name"] = unit["symbol"]
268            if unit.get("description") is None:
269                unit["description"] = ""
270
271    @staticmethod
272    def _backfill_attributes(declarations: dict, out: dict) -> None:
273        """
274        Move attributes from 'delcarations' to 'out' while also filling in any
275        default values.
276
277        Args:
278          declarations: The original YAML declaring attributes.
279          out: Output dictionary where we'll add the key "attributes" with the
280            result.
281        """
282        if out.get("attributes") is None:
283            out["attributes"] = {}
284        resolved_attributes: dict = out["attributes"]
285        if not declarations.get("attributes"):
286            return
287
288        for attr_id, attribute in declarations["attributes"].items():
289            # Copy attribute to resolved_attributes and fill any default values
290            assert resolved_attributes.get(attr_id) is None
291            resolved_attributes[attr_id] = attribute
292            if not attribute.get("name"):
293                attribute["name"] = attr_id
294            if not attribute.get("description"):
295                attribute["description"] = ""
296
297    @staticmethod
298    def _backfill_channels(declarations: dict, out: dict) -> None:
299        """
300        Move channels from 'declarations' to 'out' while also filling in any
301        default values.
302
303        Args:
304          declarations: The original YAML declaring channels.
305          out: Output dictionary where we'll add the key "channels" with the
306            result.
307        """
308        if out.get("channels") is None:
309            out["channels"] = {}
310        resolved_channels: dict = out["channels"]
311        if not declarations.get("channels"):
312            return
313
314        for chan_id, channel in declarations["channels"].items():
315            # Copy channel to resolved_channels and fill any default values
316            assert resolved_channels.get(chan_id) is None
317            resolved_channels[chan_id] = channel
318            if not channel.get("name"):
319                channel["name"] = chan_id
320            if not channel.get("description"):
321                channel["description"] = ""
322            assert channel["units"] in out["units"], (
323                f"'{channel['units']}' not found in\n"
324                + f"{yaml.safe_dump(out.get('units', {}), indent=2)}"
325            )
326
327    @staticmethod
328    def _backfill_triggers(declarations: dict, out: dict) -> None:
329        """
330        Move triggers from 'delcarations' to 'out' while also filling in any
331        default values.
332
333        Args:
334          declarations: The original YAML declaring triggers.
335          out: Output dictionary where we'll add the key "triggers" with the
336            result.
337        """
338        if out.get("triggers") is None:
339            out["triggers"] = {}
340        resolved_triggers: dict = out["triggers"]
341        if not declarations.get("triggers"):
342            return
343
344        for trigger_id, trigger in declarations["triggers"].items():
345            # Copy trigger to resolved_triggers and fill any default values
346            assert resolved_triggers.get(trigger_id) is None
347            resolved_triggers[trigger_id] = trigger
348            if not trigger.get("name"):
349                trigger["name"] = trigger_id
350            if not trigger.get("description"):
351                trigger["description"] = ""
352
353    def _resolve_attributes(self, metadata: dict, out: dict) -> None:
354        """
355        For each attribute in the metadta, find the matching definition in the
356        'out/attributes' entry and use the data to fill any missing information.
357        For example, if an entry exists that looks like:
358            sample_rate: {}
359
360        We would then try and find the 'sample_rate' key in the out/attributes
361        list (which was already validated by _resolve_dependencies). Since the
362        example above does not override any fields, we would copy the 'name',
363        'description', and 'units' from the definition into the attribute entry.
364
365        Args:
366          metadata: The full sensor metadata passed to the validate function
367          out: The current output, used to get channel definitions
368
369        Raises:
370          RuntimeError: An error in the schema validation or a missing
371            definition.
372        """
373        attributes: list | None = metadata.get("attributes")
374        if not attributes:
375            metadata["attributes"] = []
376            self._logger.debug("No attributes found, skipping")
377            return
378
379        attribute: dict
380        for attribute in attributes:
381            assert attribute["attribute"] in out["attributes"]
382            assert attribute["channel"] in out["channels"]
383            assert attribute["units"] in out["units"]
384
385    def _resolve_channels(self, metadata: dict, out: dict) -> None:
386        """
387        For each channel in the metadata, find the matching definition in the
388        'out/channels' entry and use the data to fill any missing information.
389        For example, if an entry exists that looks like:
390            acceleration: {}
391
392        We would then try and find the 'acceleration' key in the out/channels
393        dict (which was already validated by _resolve_dependencies). Since the
394        example above does not override any fields, we would copy the 'name',
395        'description', and 'units' from the definition into the channel entry.
396
397        Args:
398          metadata: The full sensor metadata passed to the validate function
399          out: The current output, used to get channel definitions
400
401        Raises:
402          RuntimeError: An error in the schema validation or a missing
403            definition.
404        """
405        channels: dict | None = metadata.get("channels")
406        if not channels:
407            self._logger.debug("No channels found, skipping")
408            metadata["channels"] = {}
409            return
410
411        channel_name: str
412        indices: list[dict]
413        for channel_name, indices in channels.items():
414            # channel_name must have been resolved by now.
415            if out["channels"].get(channel_name) is None:
416                raise RuntimeError(
417                    f"Failed to find a definition for '{channel_name}', did you"
418                    " forget a dependency?"
419                )
420            channel = out["channels"][channel_name]
421            # The content of 'channel' came from the 'out/channels' dict which
422            # was already validated and every field added if missing. At this
423            # point it's safe to access the channel's name, description, and
424            # units.
425
426            if not indices:
427                indices.append({})
428
429            index: dict
430            for index in indices:
431                if not index.get("name"):
432                    index["name"] = channel["name"]
433                if not index.get("description"):
434                    index["description"] = channel["description"]
435                # Always use the same units
436                index["units"] = channel["units"]
437
438    def _resolve_triggers(self, metadata: dict, out: dict) -> None:
439        """
440        For each trigger in the metadata, find the matching definition in the
441        'out/triggers' entry and use the data to fill any missing information.
442        For example, if an entry exists that looks like:
443            data_ready: {}
444
445        We would then try and find the 'data_ready' key in the out/triggers
446        dict (which was already validated by _resolve_dependencies). Since the
447        example above does not override any fields, we would copy the 'name' and
448        'description' from the definition into the trigger entry.
449
450        Args:
451          metadata: The full sensor metadata passed to the validate function
452          out: The current output, used to get trigger definitions
453
454        Raises:
455          RuntimeError: An error in the schema validation or a missing
456            definition.
457        """
458        triggers: list | None = metadata.get("triggers")
459        if not triggers:
460            metadata["triggers"] = []
461            self._logger.debug("No triggers found, skipping")
462            return
463
464        for trigger_name in triggers:
465            assert trigger_name in out["triggers"]
466
467    def _get_dependency_file(self, dep: str) -> Path:
468        """
469        Search for a dependency file and return the full path to it if found.
470
471        Args:
472          dep: The dependency string as provided by the metadata yaml.
473
474        Returns:
475          The dependency file as a Path object if found.
476
477        Raises:
478          FileNotFoundError: One of the dependencies was not found.
479        """
480        error_string = f"Failed to find {dep} using search paths:"
481        # Check if a full path was used
482        if Path(dep).is_file():
483            return Path(dep)
484
485        # Search all the include paths
486        for path in self._include_paths:
487            if (path / dep).is_file():
488                return path / dep
489            error_string += f"\n- {path}"
490
491        raise FileNotFoundError(error_string)
492