xref: /aosp_15_r20/external/pigweed/pw_log/py/pw_log/log_decoder.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2023 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14
15"""Utils to decode logs."""
16
17from dataclasses import dataclass
18import datetime
19import logging
20import re
21from typing import Any, Callable
22
23from pw_log.proto import log_pb2
24import pw_log_tokenized
25import pw_status
26from pw_tokenizer import Detokenizer
27from pw_tokenizer.proto import decode_optionally_tokenized
28
29_LOG = logging.getLogger(__name__)
30
31
32@dataclass(frozen=True)
33class LogLineLevel:
34    """Tuple of line number and level packed in LogEntry."""
35
36    line: int
37    level: int
38
39
40class Log:
41    """A decoded, human-readable representation of a LogEntry.
42
43    Contains fields to represent a decoded pw_log/log.proto LogEntry message in
44    a human readable way.
45
46    Attributes:
47        message: The log message as a string.
48        level: A integer representing the log level, follows logging levels.
49        flags: An integer with the bit flags.
50        timestamp: A string representation of a timestamp.
51        module_name: The module name as a string.
52        thread_name: The thread name as a string.
53        source_name: The source name as a string.
54        file_and_line: The filepath and line as a string.
55        metadata_fields: Extra fields with string-string mapping.
56    """
57
58    _LOG_LEVEL_NAMES = {
59        logging.DEBUG: 'DBG',
60        logging.INFO: 'INF',
61        logging.WARNING: 'WRN',
62        logging.ERROR: 'ERR',
63        logging.CRITICAL: 'CRT',
64        # The logging module does not have a FATAL level. This module's log
65        # level values are 10 * PW_LOG_LEVEL values. PW_LOG_LEVEL_FATAL is 7.
66        70: 'FTL',
67    }
68    _LEVEL_MASK = 0x7  # pylint: disable=C0103
69    _LINE_OFFSET = 3  # pylint: disable=C0103
70
71    def __init__(
72        self,
73        message: str = '',
74        level: int = logging.NOTSET,
75        flags: int = 0,
76        timestamp: str = '',
77        module_name: str = '',
78        thread_name: str = '',
79        source_name: str = '',
80        file_and_line: str = '',
81        metadata_fields: dict[str, str] | None = None,
82    ) -> None:
83        self.message = message
84        self.level = level  # Value from logging levels.
85        self.flags = flags
86        self.timestamp = timestamp  # A human readable value.
87        self.module_name = module_name
88        self.thread_name = thread_name
89        self.source_name = source_name
90        self.file_and_line = file_and_line
91        self.metadata_fields = dict()
92        if metadata_fields:
93            self.metadata_fields.update(metadata_fields)
94
95    def __eq__(self, other: Any) -> bool:
96        if not isinstance(other, Log):
97            return False
98
99        return (
100            self.message == other.message
101            and self.level == other.level
102            and self.flags == other.flags
103            and self.timestamp == other.timestamp
104            and self.module_name == other.module_name
105            and self.thread_name == other.thread_name
106            and self.source_name == other.source_name
107            and self.file_and_line == other.file_and_line
108            and self.metadata_fields == other.metadata_fields
109        )
110
111    def __repr__(self) -> str:
112        return self.__str__()
113
114    def __str__(self) -> str:
115        level_name = self._LOG_LEVEL_NAMES.get(self.level, '')
116        metadata = ' '.join(map(str, self.metadata_fields.values()))
117        return (
118            f'{level_name} [{self.source_name}] {self.module_name} '
119            f'{self.timestamp} {self.message} {self.file_and_line} '
120            f'{metadata}'
121        ).strip()
122
123    @staticmethod
124    def pack_line_level(line: int, logging_log_level: int) -> int:
125        """Packs the line and level values into an integer as done in LogEntry.
126
127        Args:
128            line: the line number
129            level: the logging level using logging levels.
130        Returns:
131            An integer with the two values bitpacked.
132        """
133        return (line << Log._LINE_OFFSET) | (
134            Log.logging_level_to_pw_log_level(logging_log_level)
135            & Log._LEVEL_MASK
136        )
137
138    @staticmethod
139    def unpack_line_level(packed_line_level: int) -> LogLineLevel:
140        """Unpacks the line and level values packed as done in LogEntry.
141
142        Args:
143            An integer with the two values bitpacked.
144        Returns:
145            line: the line number
146            level: the logging level using logging levels.
147        """
148        line = packed_line_level >> Log._LINE_OFFSET
149        # Convert to logging module level number.
150        level = Log.pw_log_level_to_logging_level(
151            packed_line_level & Log._LEVEL_MASK
152        )
153        return LogLineLevel(line=line, level=level)
154
155    @staticmethod
156    def pw_log_level_to_logging_level(pw_log_level: int) -> int:
157        """Maps a pw_log/levels.h value to Python logging level value."""
158        return pw_log_level * 10
159
160    @staticmethod
161    def logging_level_to_pw_log_level(logging_log_level: int) -> int:
162        """Maps a Python logging level value to a pw_log/levels.h value."""
163        return int(logging_log_level / 10)
164
165
166def pw_status_code_to_name(
167    message: str, status_pattern: str = 'pw::Status=([0-9]+)'
168) -> str:
169    """Replaces the pw::Status code number with the status name.
170
171    Searches for a matching pattern encapsulating the status code number and
172    replaces it with the status name. This is useful when logging the status
173    code instead of the string to save space.
174
175    Args:
176        message: The string to look for the status code.
177        status_pattern: The regex pattern describing the format encapsulating
178          the status code.
179    Returns:
180       The message string with the status name if found, else the original
181       string.
182    """
183    # Min and max values for Status.
184    max_status_value = max(status.value for status in pw_status.Status)
185    min_status_value = min(status.value for status in pw_status.Status)
186
187    def replacement_callback(match: re.Match) -> str:
188        status_code = int(match.group(1))
189        if min_status_value <= status_code <= max_status_value:
190            return pw_status.Status(status_code).name
191        return match.group(0)
192
193    return re.sub(
194        pattern=status_pattern, repl=replacement_callback, string=message
195    )
196
197
198def log_decoded_log(
199    log: Log, logger: logging.Logger | logging.LoggerAdapter
200) -> None:
201    """Formats and saves the log information in a pw console compatible way.
202
203    Arg:
204        logger: The logger to emit the log information to.
205    """
206    # Fields used for pw console table view.
207    log.metadata_fields['module'] = log.module_name
208    log.metadata_fields['source_name'] = log.source_name
209    log.metadata_fields['timestamp'] = log.timestamp
210    log.metadata_fields['msg'] = log.message
211    log.metadata_fields['file'] = log.file_and_line
212
213    logger.log(
214        log.level,
215        '[%s] %s %s %s %s',
216        log.source_name,
217        log.module_name,
218        log.timestamp,
219        log.message,
220        log.file_and_line,
221        extra=dict(extra_metadata_fields=log.metadata_fields),
222    )
223
224
225def _timestamp_format(
226    timestamp: int,
227    input_resolution: int,
228    truncate_microseconds_to_three_digits: bool = False,
229) -> str:
230    time_delta = datetime.timedelta(seconds=timestamp / input_resolution)
231
232    minutes, seconds = divmod(time_delta.seconds, 60)
233    hours, minutes = divmod(minutes, 60)
234    microseconds = time_delta.microseconds
235
236    hms_str = f'{hours:02}:{minutes:02}:{seconds:02}'
237    microseconds_str = f'{microseconds:06}'
238    if truncate_microseconds_to_three_digits:
239        microseconds_str = microseconds_str[:-3]
240
241    return f'{hms_str}.{microseconds_str}'
242
243
244def timestamp_parser_ms_since_boot(timestamp: int) -> str:
245    """Decodes timestamp as milliseconds since boot.
246
247    Args:
248        timestamp: The timestamp as an integer.
249    Returns:
250        A string representation of the timestamp.
251    """
252    return _timestamp_format(
253        timestamp,
254        input_resolution=10**3,
255        truncate_microseconds_to_three_digits=True,
256    )
257
258
259def timestamp_parser_ns_since_boot(timestamp: int) -> str:
260    """Decodes timestamp as nanoseconds since boot.
261
262    Args:
263        timestamp: The timestamp as an integer.
264    Returns:
265        A string representation of the timestamp.
266    """
267    return _timestamp_format(timestamp, input_resolution=10**9)
268
269
270class LogStreamDecoder:
271    """Decodes an RPC stream of LogEntries packets.
272
273    Performs log drop detection on the stream of LogEntries proto messages.
274
275    Args:
276        decoded_log_handler: Callback called on each decoded log.
277        detokenizer: Detokenizes log messages if tokenized when provided.
278        source_name: Optional string to identify the logs source.
279        timestamp_parser: Optional timestamp parser number to a string.
280        message_parser: Optional message parser called after detokenization is
281          attempted on a log message.
282    """
283
284    DROP_REASON_LOSS_AT_TRANSPORT = 'loss at transport'
285    DROP_REASON_SOURCE_NOT_CONNECTED = 'source not connected'
286    DROP_REASON_SOURCE_ENQUEUE_FAILURE = 'enqueue failure at source'
287
288    def __init__(
289        self,
290        decoded_log_handler: Callable[[Log], None],
291        detokenizer: Detokenizer | None = None,
292        source_name: str = '',
293        timestamp_parser: Callable[[int], str] | None = None,
294        message_parser: Callable[[str], str] | None = None,
295    ):
296        self.decoded_log_handler = decoded_log_handler
297        self.detokenizer = detokenizer
298        self.source_name = source_name
299        self.timestamp_parser = timestamp_parser
300        self.message_parser = message_parser
301        self._expected_log_sequence_id = 0
302
303    def parse_log_entries_proto(
304        self, log_entries_proto: log_pb2.LogEntries
305    ) -> None:
306        """Parses each LogEntry in log_entries_proto.
307
308        Args:
309            log_entry_proto: A LogEntry message proto.
310        Returns:
311            A Log object with the decoded log_entry_proto.
312        """
313        has_received_logs = self._expected_log_sequence_id > 0
314        dropped_log_count = self._calculate_dropped_logs(log_entries_proto)
315        if dropped_log_count > 0:
316            reason = (
317                self.DROP_REASON_LOSS_AT_TRANSPORT
318                if has_received_logs
319                else self.DROP_REASON_SOURCE_NOT_CONNECTED
320            )
321            self.decoded_log_handler(
322                self._handle_log_drop_count(dropped_log_count, reason)
323            )
324        elif dropped_log_count < 0:
325            _LOG.error('Log sequence ID is smaller than expected')
326
327        for i, log_entry_proto in enumerate(log_entries_proto.entries):
328            # Handle dropped count first.
329            if log_entry_proto.dropped:
330                # Avoid duplicating drop reports since the device will report
331                # a drop count due to a transmission failure, of the last
332                # attempted transmission only, in the first entry of the next
333                # successful transmission.
334                if i == 0 and dropped_log_count >= log_entry_proto.dropped:
335                    continue
336            parsed_log = self.parse_log_entry_proto(log_entry_proto)
337            self.decoded_log_handler(parsed_log)
338
339    def parse_log_entry_proto(self, log_entry_proto: log_pb2.LogEntry) -> Log:
340        """Parses the log_entry_proto contents into a human readable format.
341
342        Args:
343            log_entry_proto: A LogEntry message proto.
344        Returns:
345            A Log object with the decoded log_entry_proto.
346        """
347        detokenized_message = decode_optionally_tokenized(
348            self.detokenizer, log_entry_proto.message
349        )
350        # Handle dropped count first.
351        if log_entry_proto.dropped:
352            drop_reason = self.DROP_REASON_SOURCE_ENQUEUE_FAILURE
353            if detokenized_message:
354                drop_reason = detokenized_message.lower()
355            return self._handle_log_drop_count(
356                log_entry_proto.dropped, drop_reason
357            )
358
359        # Parse message and metadata, if any, encoded in a key-value format as
360        # described in pw_log/log.proto LogEntry::message field.
361        message_and_metadata = pw_log_tokenized.FormatStringWithMetadata(
362            detokenized_message
363        )
364        module_name = decode_optionally_tokenized(
365            self.detokenizer, log_entry_proto.module
366        )
367        if not module_name:
368            module_name = message_and_metadata.module
369
370        line_level_tuple = Log.unpack_line_level(log_entry_proto.line_level)
371        file_and_line = decode_optionally_tokenized(
372            self.detokenizer, log_entry_proto.file
373        )
374        if not file_and_line:
375            # Set file name if found in the metadata.
376            if message_and_metadata.file is not None:
377                file_and_line = message_and_metadata.file
378            else:
379                file_and_line = ''
380
381        # Add line number to filepath if needed.
382        if line_level_tuple.line and ':' not in file_and_line:
383            file_and_line += f':{line_level_tuple.line}'
384        # Add extra log information avoiding duplicated data.
385        metadata_fields = {
386            k: v
387            for k, v in message_and_metadata.fields.items()
388            if k not in ['file', 'module', 'msg']
389        }
390
391        message = message_and_metadata.message
392        if self.message_parser:
393            message = self.message_parser(message)
394        if self.timestamp_parser:
395            timestamp = self.timestamp_parser(log_entry_proto.timestamp)
396        else:
397            timestamp = str(log_entry_proto.timestamp)
398        log = Log(
399            message=message,
400            level=line_level_tuple.level,
401            flags=log_entry_proto.flags,
402            timestamp=timestamp,
403            module_name=module_name,
404            thread_name=decode_optionally_tokenized(
405                self.detokenizer, log_entry_proto.thread
406            ),
407            source_name=self.source_name,
408            file_and_line=file_and_line,
409            metadata_fields=metadata_fields,
410        )
411
412        return log
413
414    def _handle_log_drop_count(self, drop_count: int, reason: str) -> Log:
415        log_word = 'logs' if drop_count > 1 else 'log'
416        log = Log(
417            message=f'Dropped {drop_count} {log_word} due to {reason}',
418            level=logging.WARNING,
419            source_name=self.source_name,
420        )
421        return log
422
423    def _calculate_dropped_logs(
424        self, log_entries_proto: log_pb2.LogEntries
425    ) -> int:
426        # Count log messages received that don't use the dropped field.
427        messages_received = sum(
428            1 if not log_proto.dropped else 0
429            for log_proto in log_entries_proto.entries
430        )
431        dropped_log_count = (
432            log_entries_proto.first_entry_sequence_id
433            - self._expected_log_sequence_id
434        )
435        self._expected_log_sequence_id = (
436            log_entries_proto.first_entry_sequence_id + messages_received
437        )
438        return dropped_log_count
439