1# Copyright 2023 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14 15"""Utils to decode logs.""" 16 17from dataclasses import dataclass 18import datetime 19import logging 20import re 21from typing import Any, Callable 22 23from pw_log.proto import log_pb2 24import pw_log_tokenized 25import pw_status 26from pw_tokenizer import Detokenizer 27from pw_tokenizer.proto import decode_optionally_tokenized 28 29_LOG = logging.getLogger(__name__) 30 31 32@dataclass(frozen=True) 33class LogLineLevel: 34 """Tuple of line number and level packed in LogEntry.""" 35 36 line: int 37 level: int 38 39 40class Log: 41 """A decoded, human-readable representation of a LogEntry. 42 43 Contains fields to represent a decoded pw_log/log.proto LogEntry message in 44 a human readable way. 45 46 Attributes: 47 message: The log message as a string. 48 level: A integer representing the log level, follows logging levels. 49 flags: An integer with the bit flags. 50 timestamp: A string representation of a timestamp. 51 module_name: The module name as a string. 52 thread_name: The thread name as a string. 53 source_name: The source name as a string. 54 file_and_line: The filepath and line as a string. 55 metadata_fields: Extra fields with string-string mapping. 56 """ 57 58 _LOG_LEVEL_NAMES = { 59 logging.DEBUG: 'DBG', 60 logging.INFO: 'INF', 61 logging.WARNING: 'WRN', 62 logging.ERROR: 'ERR', 63 logging.CRITICAL: 'CRT', 64 # The logging module does not have a FATAL level. This module's log 65 # level values are 10 * PW_LOG_LEVEL values. PW_LOG_LEVEL_FATAL is 7. 66 70: 'FTL', 67 } 68 _LEVEL_MASK = 0x7 # pylint: disable=C0103 69 _LINE_OFFSET = 3 # pylint: disable=C0103 70 71 def __init__( 72 self, 73 message: str = '', 74 level: int = logging.NOTSET, 75 flags: int = 0, 76 timestamp: str = '', 77 module_name: str = '', 78 thread_name: str = '', 79 source_name: str = '', 80 file_and_line: str = '', 81 metadata_fields: dict[str, str] | None = None, 82 ) -> None: 83 self.message = message 84 self.level = level # Value from logging levels. 85 self.flags = flags 86 self.timestamp = timestamp # A human readable value. 87 self.module_name = module_name 88 self.thread_name = thread_name 89 self.source_name = source_name 90 self.file_and_line = file_and_line 91 self.metadata_fields = dict() 92 if metadata_fields: 93 self.metadata_fields.update(metadata_fields) 94 95 def __eq__(self, other: Any) -> bool: 96 if not isinstance(other, Log): 97 return False 98 99 return ( 100 self.message == other.message 101 and self.level == other.level 102 and self.flags == other.flags 103 and self.timestamp == other.timestamp 104 and self.module_name == other.module_name 105 and self.thread_name == other.thread_name 106 and self.source_name == other.source_name 107 and self.file_and_line == other.file_and_line 108 and self.metadata_fields == other.metadata_fields 109 ) 110 111 def __repr__(self) -> str: 112 return self.__str__() 113 114 def __str__(self) -> str: 115 level_name = self._LOG_LEVEL_NAMES.get(self.level, '') 116 metadata = ' '.join(map(str, self.metadata_fields.values())) 117 return ( 118 f'{level_name} [{self.source_name}] {self.module_name} ' 119 f'{self.timestamp} {self.message} {self.file_and_line} ' 120 f'{metadata}' 121 ).strip() 122 123 @staticmethod 124 def pack_line_level(line: int, logging_log_level: int) -> int: 125 """Packs the line and level values into an integer as done in LogEntry. 126 127 Args: 128 line: the line number 129 level: the logging level using logging levels. 130 Returns: 131 An integer with the two values bitpacked. 132 """ 133 return (line << Log._LINE_OFFSET) | ( 134 Log.logging_level_to_pw_log_level(logging_log_level) 135 & Log._LEVEL_MASK 136 ) 137 138 @staticmethod 139 def unpack_line_level(packed_line_level: int) -> LogLineLevel: 140 """Unpacks the line and level values packed as done in LogEntry. 141 142 Args: 143 An integer with the two values bitpacked. 144 Returns: 145 line: the line number 146 level: the logging level using logging levels. 147 """ 148 line = packed_line_level >> Log._LINE_OFFSET 149 # Convert to logging module level number. 150 level = Log.pw_log_level_to_logging_level( 151 packed_line_level & Log._LEVEL_MASK 152 ) 153 return LogLineLevel(line=line, level=level) 154 155 @staticmethod 156 def pw_log_level_to_logging_level(pw_log_level: int) -> int: 157 """Maps a pw_log/levels.h value to Python logging level value.""" 158 return pw_log_level * 10 159 160 @staticmethod 161 def logging_level_to_pw_log_level(logging_log_level: int) -> int: 162 """Maps a Python logging level value to a pw_log/levels.h value.""" 163 return int(logging_log_level / 10) 164 165 166def pw_status_code_to_name( 167 message: str, status_pattern: str = 'pw::Status=([0-9]+)' 168) -> str: 169 """Replaces the pw::Status code number with the status name. 170 171 Searches for a matching pattern encapsulating the status code number and 172 replaces it with the status name. This is useful when logging the status 173 code instead of the string to save space. 174 175 Args: 176 message: The string to look for the status code. 177 status_pattern: The regex pattern describing the format encapsulating 178 the status code. 179 Returns: 180 The message string with the status name if found, else the original 181 string. 182 """ 183 # Min and max values for Status. 184 max_status_value = max(status.value for status in pw_status.Status) 185 min_status_value = min(status.value for status in pw_status.Status) 186 187 def replacement_callback(match: re.Match) -> str: 188 status_code = int(match.group(1)) 189 if min_status_value <= status_code <= max_status_value: 190 return pw_status.Status(status_code).name 191 return match.group(0) 192 193 return re.sub( 194 pattern=status_pattern, repl=replacement_callback, string=message 195 ) 196 197 198def log_decoded_log( 199 log: Log, logger: logging.Logger | logging.LoggerAdapter 200) -> None: 201 """Formats and saves the log information in a pw console compatible way. 202 203 Arg: 204 logger: The logger to emit the log information to. 205 """ 206 # Fields used for pw console table view. 207 log.metadata_fields['module'] = log.module_name 208 log.metadata_fields['source_name'] = log.source_name 209 log.metadata_fields['timestamp'] = log.timestamp 210 log.metadata_fields['msg'] = log.message 211 log.metadata_fields['file'] = log.file_and_line 212 213 logger.log( 214 log.level, 215 '[%s] %s %s %s %s', 216 log.source_name, 217 log.module_name, 218 log.timestamp, 219 log.message, 220 log.file_and_line, 221 extra=dict(extra_metadata_fields=log.metadata_fields), 222 ) 223 224 225def _timestamp_format( 226 timestamp: int, 227 input_resolution: int, 228 truncate_microseconds_to_three_digits: bool = False, 229) -> str: 230 time_delta = datetime.timedelta(seconds=timestamp / input_resolution) 231 232 minutes, seconds = divmod(time_delta.seconds, 60) 233 hours, minutes = divmod(minutes, 60) 234 microseconds = time_delta.microseconds 235 236 hms_str = f'{hours:02}:{minutes:02}:{seconds:02}' 237 microseconds_str = f'{microseconds:06}' 238 if truncate_microseconds_to_three_digits: 239 microseconds_str = microseconds_str[:-3] 240 241 return f'{hms_str}.{microseconds_str}' 242 243 244def timestamp_parser_ms_since_boot(timestamp: int) -> str: 245 """Decodes timestamp as milliseconds since boot. 246 247 Args: 248 timestamp: The timestamp as an integer. 249 Returns: 250 A string representation of the timestamp. 251 """ 252 return _timestamp_format( 253 timestamp, 254 input_resolution=10**3, 255 truncate_microseconds_to_three_digits=True, 256 ) 257 258 259def timestamp_parser_ns_since_boot(timestamp: int) -> str: 260 """Decodes timestamp as nanoseconds since boot. 261 262 Args: 263 timestamp: The timestamp as an integer. 264 Returns: 265 A string representation of the timestamp. 266 """ 267 return _timestamp_format(timestamp, input_resolution=10**9) 268 269 270class LogStreamDecoder: 271 """Decodes an RPC stream of LogEntries packets. 272 273 Performs log drop detection on the stream of LogEntries proto messages. 274 275 Args: 276 decoded_log_handler: Callback called on each decoded log. 277 detokenizer: Detokenizes log messages if tokenized when provided. 278 source_name: Optional string to identify the logs source. 279 timestamp_parser: Optional timestamp parser number to a string. 280 message_parser: Optional message parser called after detokenization is 281 attempted on a log message. 282 """ 283 284 DROP_REASON_LOSS_AT_TRANSPORT = 'loss at transport' 285 DROP_REASON_SOURCE_NOT_CONNECTED = 'source not connected' 286 DROP_REASON_SOURCE_ENQUEUE_FAILURE = 'enqueue failure at source' 287 288 def __init__( 289 self, 290 decoded_log_handler: Callable[[Log], None], 291 detokenizer: Detokenizer | None = None, 292 source_name: str = '', 293 timestamp_parser: Callable[[int], str] | None = None, 294 message_parser: Callable[[str], str] | None = None, 295 ): 296 self.decoded_log_handler = decoded_log_handler 297 self.detokenizer = detokenizer 298 self.source_name = source_name 299 self.timestamp_parser = timestamp_parser 300 self.message_parser = message_parser 301 self._expected_log_sequence_id = 0 302 303 def parse_log_entries_proto( 304 self, log_entries_proto: log_pb2.LogEntries 305 ) -> None: 306 """Parses each LogEntry in log_entries_proto. 307 308 Args: 309 log_entry_proto: A LogEntry message proto. 310 Returns: 311 A Log object with the decoded log_entry_proto. 312 """ 313 has_received_logs = self._expected_log_sequence_id > 0 314 dropped_log_count = self._calculate_dropped_logs(log_entries_proto) 315 if dropped_log_count > 0: 316 reason = ( 317 self.DROP_REASON_LOSS_AT_TRANSPORT 318 if has_received_logs 319 else self.DROP_REASON_SOURCE_NOT_CONNECTED 320 ) 321 self.decoded_log_handler( 322 self._handle_log_drop_count(dropped_log_count, reason) 323 ) 324 elif dropped_log_count < 0: 325 _LOG.error('Log sequence ID is smaller than expected') 326 327 for i, log_entry_proto in enumerate(log_entries_proto.entries): 328 # Handle dropped count first. 329 if log_entry_proto.dropped: 330 # Avoid duplicating drop reports since the device will report 331 # a drop count due to a transmission failure, of the last 332 # attempted transmission only, in the first entry of the next 333 # successful transmission. 334 if i == 0 and dropped_log_count >= log_entry_proto.dropped: 335 continue 336 parsed_log = self.parse_log_entry_proto(log_entry_proto) 337 self.decoded_log_handler(parsed_log) 338 339 def parse_log_entry_proto(self, log_entry_proto: log_pb2.LogEntry) -> Log: 340 """Parses the log_entry_proto contents into a human readable format. 341 342 Args: 343 log_entry_proto: A LogEntry message proto. 344 Returns: 345 A Log object with the decoded log_entry_proto. 346 """ 347 detokenized_message = decode_optionally_tokenized( 348 self.detokenizer, log_entry_proto.message 349 ) 350 # Handle dropped count first. 351 if log_entry_proto.dropped: 352 drop_reason = self.DROP_REASON_SOURCE_ENQUEUE_FAILURE 353 if detokenized_message: 354 drop_reason = detokenized_message.lower() 355 return self._handle_log_drop_count( 356 log_entry_proto.dropped, drop_reason 357 ) 358 359 # Parse message and metadata, if any, encoded in a key-value format as 360 # described in pw_log/log.proto LogEntry::message field. 361 message_and_metadata = pw_log_tokenized.FormatStringWithMetadata( 362 detokenized_message 363 ) 364 module_name = decode_optionally_tokenized( 365 self.detokenizer, log_entry_proto.module 366 ) 367 if not module_name: 368 module_name = message_and_metadata.module 369 370 line_level_tuple = Log.unpack_line_level(log_entry_proto.line_level) 371 file_and_line = decode_optionally_tokenized( 372 self.detokenizer, log_entry_proto.file 373 ) 374 if not file_and_line: 375 # Set file name if found in the metadata. 376 if message_and_metadata.file is not None: 377 file_and_line = message_and_metadata.file 378 else: 379 file_and_line = '' 380 381 # Add line number to filepath if needed. 382 if line_level_tuple.line and ':' not in file_and_line: 383 file_and_line += f':{line_level_tuple.line}' 384 # Add extra log information avoiding duplicated data. 385 metadata_fields = { 386 k: v 387 for k, v in message_and_metadata.fields.items() 388 if k not in ['file', 'module', 'msg'] 389 } 390 391 message = message_and_metadata.message 392 if self.message_parser: 393 message = self.message_parser(message) 394 if self.timestamp_parser: 395 timestamp = self.timestamp_parser(log_entry_proto.timestamp) 396 else: 397 timestamp = str(log_entry_proto.timestamp) 398 log = Log( 399 message=message, 400 level=line_level_tuple.level, 401 flags=log_entry_proto.flags, 402 timestamp=timestamp, 403 module_name=module_name, 404 thread_name=decode_optionally_tokenized( 405 self.detokenizer, log_entry_proto.thread 406 ), 407 source_name=self.source_name, 408 file_and_line=file_and_line, 409 metadata_fields=metadata_fields, 410 ) 411 412 return log 413 414 def _handle_log_drop_count(self, drop_count: int, reason: str) -> Log: 415 log_word = 'logs' if drop_count > 1 else 'log' 416 log = Log( 417 message=f'Dropped {drop_count} {log_word} due to {reason}', 418 level=logging.WARNING, 419 source_name=self.source_name, 420 ) 421 return log 422 423 def _calculate_dropped_logs( 424 self, log_entries_proto: log_pb2.LogEntries 425 ) -> int: 426 # Count log messages received that don't use the dropped field. 427 messages_received = sum( 428 1 if not log_proto.dropped else 0 429 for log_proto in log_entries_proto.entries 430 ) 431 dropped_log_count = ( 432 log_entries_proto.first_entry_sequence_id 433 - self._expected_log_sequence_id 434 ) 435 self._expected_log_sequence_id = ( 436 log_entries_proto.first_entry_sequence_id + messages_received 437 ) 438 return dropped_log_count 439