xref: /aosp_15_r20/external/pigweed/pw_log/py/log_decoder_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2023 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14
15"""Log decoder tests."""
16
17from dataclasses import dataclass
18import logging
19from random import randint
20from typing import Any
21from unittest import TestCase, main
22
23from pw_log.log_decoder import (
24    Log,
25    LogStreamDecoder,
26    log_decoded_log,
27    pw_status_code_to_name,
28    timestamp_parser_ms_since_boot,
29    timestamp_parser_ns_since_boot,
30)
31from pw_log.proto import log_pb2
32import pw_tokenizer
33
34_MESSAGE_NO_FILENAME = 'World'
35_MESSAGE_AND_ARGS_NO_FILENAME = f'■msg♦{_MESSAGE_NO_FILENAME}'
36_MESSAGE_TOKEN_NO_FILENAME = pw_tokenizer.tokens.pw_tokenizer_65599_hash(
37    _MESSAGE_AND_ARGS_NO_FILENAME
38)
39
40# Creating a database with tokenized information for the core to detokenize
41# tokenized log entries.
42_TOKEN_DATABASE = pw_tokenizer.tokens.Database(
43    [
44        pw_tokenizer.tokens.TokenizedStringEntry(0x01148A48, 'total_dropped'),
45        pw_tokenizer.tokens.TokenizedStringEntry(
46            0x03796798, 'min_queue_remaining'
47        ),
48        pw_tokenizer.tokens.TokenizedStringEntry(0x2E668CD6, 'Jello, world!'),
49        pw_tokenizer.tokens.TokenizedStringEntry(0x329481A2, 'parser_errors'),
50        pw_tokenizer.tokens.TokenizedStringEntry(0x7F35A9A5, 'TestName'),
51        pw_tokenizer.tokens.TokenizedStringEntry(0xCC6D3131, 'Jello?'),
52        pw_tokenizer.tokens.TokenizedStringEntry(
53            0x144C501D, '■msg♦SampleMessage■module♦MODULE■file♦file/path.cc'
54        ),
55        pw_tokenizer.tokens.TokenizedStringEntry(0x0000106A, 'ModuleOrMessage'),
56        pw_tokenizer.tokens.TokenizedStringEntry(
57            pw_tokenizer.tokens.pw_tokenizer_65599_hash(
58                '■msg♦World■module♦wifi■file♦/path/to/file.cc'
59            ),
60            '■msg♦World■module♦wifi■file♦/path/to/file.cc',
61        ),
62        pw_tokenizer.tokens.TokenizedStringEntry(
63            _MESSAGE_TOKEN_NO_FILENAME, _MESSAGE_AND_ARGS_NO_FILENAME
64        ),
65    ]
66)
67_DETOKENIZER = pw_tokenizer.Detokenizer(_TOKEN_DATABASE)
68
69_ZERO_NS_TIMESTAMP_STR = '00:00:00.000000'
70
71
72def _create_log_entry_with_tokenized_fields(
73    message: str, module: str, file: str, thread: str, line: int, level: int
74) -> log_pb2.LogEntry:
75    """Tokenizing tokenizable LogEntry fields to become a detoknized log."""
76    tokenized_message = pw_tokenizer.encode.encode_token_and_args(
77        pw_tokenizer.tokens.pw_tokenizer_65599_hash(message)
78    )
79    tokenized_module = pw_tokenizer.encode.encode_token_and_args(
80        pw_tokenizer.tokens.pw_tokenizer_65599_hash(module)
81    )
82    tokenized_file = pw_tokenizer.encode.encode_token_and_args(
83        pw_tokenizer.tokens.pw_tokenizer_65599_hash(file)
84    )
85    tokenized_thread = pw_tokenizer.encode.encode_token_and_args(
86        pw_tokenizer.tokens.pw_tokenizer_65599_hash(thread)
87    )
88
89    return log_pb2.LogEntry(
90        message=tokenized_message,
91        module=tokenized_module,
92        file=tokenized_file,
93        line_level=Log.pack_line_level(line, level),
94        thread=tokenized_thread,
95    )
96
97
98def _create_random_log_entry() -> log_pb2.LogEntry:
99    return log_pb2.LogEntry(
100        message=bytes(f'message {randint(1,100)}'.encode('utf-8')),
101        line_level=Log.pack_line_level(
102            randint(0, 2000), randint(logging.DEBUG, logging.CRITICAL)
103        ),
104        file=b'main.cc',
105        thread=bytes(f'thread {randint(1,5)}'.encode('utf-8')),
106    )
107
108
109def _create_drop_count_message_log_entry(
110    drop_count: int, reason: str = ''
111) -> log_pb2.LogEntry:
112    log_entry = log_pb2.LogEntry(dropped=drop_count)
113    if reason:
114        log_entry.message = bytes(reason.encode('utf-8'))
115    return log_entry
116
117
118class TestLogStreamDecoderBase(TestCase):
119    """Base Test class for LogStreamDecoder."""
120
121    def setUp(self) -> None:
122        """Set up logs decoder."""
123
124        def parse_pw_status(msg: str) -> str:
125            return pw_status_code_to_name(msg)
126
127        self.captured_logs: list[Log] = []
128
129        def decoded_log_handler(log: Log) -> None:
130            self.captured_logs.append(log)
131
132        self.decoder = LogStreamDecoder(
133            decoded_log_handler=decoded_log_handler,
134            detokenizer=_DETOKENIZER,
135            source_name='source',
136            timestamp_parser=timestamp_parser_ns_since_boot,
137            message_parser=parse_pw_status,
138        )
139
140    def _captured_logs_as_str(self) -> str:
141        return '\n'.join(map(str, self.captured_logs))
142
143
144class TestLogStreamDecoderDecodingFunctionality(TestLogStreamDecoderBase):
145    """Tests LogStreamDecoder decoding functionality."""
146
147    def test_parse_log_entry_valid_non_tokenized(self) -> None:
148        """Test that valid LogEntry protos are parsed correctly."""
149        expected_log = Log(
150            message='Hello',
151            file_and_line='my/path/file.cc:123',
152            level=logging.INFO,
153            source_name=self.decoder.source_name,
154            timestamp=_ZERO_NS_TIMESTAMP_STR,
155        )
156        result = self.decoder.parse_log_entry_proto(
157            log_pb2.LogEntry(
158                message=b'Hello',
159                file=b'my/path/file.cc',
160                line_level=Log.pack_line_level(123, logging.INFO),
161            )
162        )
163        self.assertEqual(result, expected_log)
164
165    def test_parse_log_entry_valid_packed_message(self) -> None:
166        """Test that valid LogEntry protos are parsed correctly."""
167        log_with_metadata_in_message = Log(
168            message='World',
169            file_and_line='/path/to/file.cc',
170            level=logging.DEBUG,
171            source_name=self.decoder.source_name,
172            module_name='wifi',
173            timestamp=_ZERO_NS_TIMESTAMP_STR,
174        )
175        result = self.decoder.parse_log_entry_proto(
176            log_pb2.LogEntry(
177                message=bytes(
178                    '■msg♦World■module♦wifi■file♦/path/to/file.cc'.encode(
179                        'utf-8'
180                    )
181                ),
182                line_level=Log.pack_line_level(0, logging.DEBUG),
183                timestamp=100,
184            )
185        )
186        self.assertEqual(result, log_with_metadata_in_message)
187
188    def test_parse_log_entry_valid_logs_drop_message(self) -> None:
189        """Test that valid LogEntry protos are parsed correctly."""
190        dropped_message = Log(
191            message='Dropped 30 logs due to buffer too small',
192            level=logging.WARNING,
193            source_name=self.decoder.source_name,
194        )
195        result = self.decoder.parse_log_entry_proto(
196            log_pb2.LogEntry(message=b'buffer too small', dropped=30)
197        )
198        self.assertEqual(result, dropped_message)
199
200    def test_parse_log_entry_valid_tokenized(self) -> None:
201        """Test that tokenized LogEntry protos are parsed correctly."""
202        message = 'Jello, world!'
203        module_name = 'TestName'
204        file = 'parser_errors'
205        thread_name = 'Jello?'
206        line = 123
207        level = logging.INFO
208
209        expected_log = Log(
210            message=message,
211            module_name=module_name,
212            file_and_line=file + ':123',
213            level=logging.INFO,
214            source_name=self.decoder.source_name,
215            timestamp=_ZERO_NS_TIMESTAMP_STR,
216            thread_name=thread_name,
217        )
218
219        log_entry = _create_log_entry_with_tokenized_fields(
220            message, module_name, file, thread_name, line, level
221        )
222        result = self.decoder.parse_log_entry_proto(log_entry)
223        self.assertEqual(result, expected_log, msg='Log was not detokenized')
224
225    def test_tokenized_contents_not_detokenized(self):
226        """Test fields with tokens not in the database are not decrypted."""
227        # The following strings do not have tokens in the device token db.
228        message_not_in_db = 'device is shutting down.'
229        module_name_not_in_db = 'Battery'
230        file_not_in_db = 'charger.cc'
231        thread_name_not_in_db = 'BatteryStatus'
232        line = 123
233        level = logging.INFO
234
235        log_entry = _create_log_entry_with_tokenized_fields(
236            message_not_in_db,
237            module_name_not_in_db,
238            file_not_in_db,
239            thread_name_not_in_db,
240            line,
241            level,
242        )
243        message = pw_tokenizer.proto.decode_optionally_tokenized(
244            _DETOKENIZER, log_entry.message
245        )
246        module = pw_tokenizer.proto.decode_optionally_tokenized(
247            _DETOKENIZER, log_entry.module
248        )
249        file = pw_tokenizer.proto.decode_optionally_tokenized(
250            _DETOKENIZER, log_entry.file
251        )
252        thread = pw_tokenizer.proto.decode_optionally_tokenized(
253            _DETOKENIZER, log_entry.thread
254        )
255        expected_log = Log(
256            message=message,
257            module_name=module,
258            file_and_line=file + ':123',
259            level=logging.INFO,
260            source_name=self.decoder.source_name,
261            timestamp=_ZERO_NS_TIMESTAMP_STR,
262            thread_name=thread,
263        )
264        result = self.decoder.parse_log_entry_proto(log_entry)
265        self.assertEqual(
266            result, expected_log, msg='Log was unexpectedly detokenized'
267        )
268
269    def test_extracting_log_entry_fields_from_tokenized_metadata(self):
270        """Test that tokenized metadata can be used to extract other fields."""
271        metadata = '■msg♦World■module♦wifi■file♦/path/to/file.cc'
272        thread_name = 'M0Log'
273
274        log_entry = log_pb2.LogEntry(
275            message=pw_tokenizer.encode.encode_token_and_args(
276                pw_tokenizer.tokens.pw_tokenizer_65599_hash(metadata)
277            ),
278            line_level=Log.pack_line_level(0, logging.DEBUG),
279            thread=bytes(thread_name.encode('utf-8')),
280        )
281
282        log_with_metadata_in_message = Log(
283            message='World',
284            file_and_line='/path/to/file.cc',
285            level=logging.DEBUG,
286            source_name=self.decoder.source_name,
287            module_name='wifi',
288            timestamp=_ZERO_NS_TIMESTAMP_STR,
289            thread_name=thread_name,
290        )
291
292        result = self.decoder.parse_log_entry_proto(log_entry)
293        self.assertEqual(
294            result, log_with_metadata_in_message, msg='Log was detokenized.'
295        )
296
297    def test_extracting_status_argument_from_log_message(self):
298        """Test extract status from log message."""
299        expected_log = Log(
300            message='Could not start flux capacitor: PERMISSION_DENIED',
301            file_and_line='my/path/file.cc:123',
302            level=logging.INFO,
303            source_name=self.decoder.source_name,
304            timestamp=_ZERO_NS_TIMESTAMP_STR,
305        )
306        result = self.decoder.parse_log_entry_proto(
307            log_pb2.LogEntry(
308                message=b'Could not start flux capacitor: pw::Status=7',
309                file=b'my/path/file.cc',
310                line_level=Log.pack_line_level(123, logging.INFO),
311            )
312        )
313        self.assertEqual(
314            result,
315            expected_log,
316            msg='Status was not extracted from log message.',
317        )
318
319        expected_log = Log(
320            message='Error connecting to server: UNAVAILABLE',
321            file_and_line='my/path/file.cc:123',
322            level=logging.INFO,
323            source_name=self.decoder.source_name,
324            timestamp=_ZERO_NS_TIMESTAMP_STR,
325        )
326        result = self.decoder.parse_log_entry_proto(
327            log_pb2.LogEntry(
328                message=b'Error connecting to server: pw::Status=14',
329                file=b'my/path/file.cc',
330                line_level=Log.pack_line_level(123, logging.INFO),
331            )
332        )
333        self.assertEqual(
334            result,
335            expected_log,
336            msg='Status was not extracted from log message.',
337        )
338
339    def test_extracting_status_with_improper_spacing(self):
340        """Test spaces before pw::Status are ignored."""
341        expected_log = Log(
342            message='Error connecting to server:UNAVAILABLE',
343            file_and_line='my/path/file.cc:123',
344            level=logging.INFO,
345            source_name=self.decoder.source_name,
346            timestamp=_ZERO_NS_TIMESTAMP_STR,
347        )
348
349        result = self.decoder.parse_log_entry_proto(
350            log_pb2.LogEntry(
351                message=b'Error connecting to server:pw::Status=14',
352                file=b'my/path/file.cc',
353                line_level=Log.pack_line_level(123, logging.INFO),
354            )
355        )
356        self.assertEqual(
357            result,
358            expected_log,
359            msg='Status was not extracted from log message.',
360        )
361
362        expected_log = Log(
363            message='Error connecting to server:    UNAVAILABLE',
364            file_and_line='my/path/file.cc:123',
365            level=logging.INFO,
366            source_name=self.decoder.source_name,
367            timestamp=_ZERO_NS_TIMESTAMP_STR,
368        )
369
370        result = self.decoder.parse_log_entry_proto(
371            log_pb2.LogEntry(
372                message=b'Error connecting to server:    pw::Status=14',
373                file=b'my/path/file.cc',
374                line_level=Log.pack_line_level(123, logging.INFO),
375            )
376        )
377        self.assertEqual(
378            result,
379            expected_log,
380            msg='Status was not extracted from log message.',
381        )
382
383    def test_not_extracting_status_extra_space_before_code(self):
384        """Test spaces after pw::Status are not allowed."""
385        expected_log = Log(
386            message='Could not start flux capacitor: pw::Status=    7',
387            file_and_line='my/path/file.cc:123',
388            level=logging.INFO,
389            source_name=self.decoder.source_name,
390            timestamp=_ZERO_NS_TIMESTAMP_STR,
391        )
392
393        result = self.decoder.parse_log_entry_proto(
394            log_pb2.LogEntry(
395                message=b'Could not start flux capacitor: pw::Status=    7',
396                file=b'my/path/file.cc',
397                line_level=Log.pack_line_level(123, logging.INFO),
398            )
399        )
400        self.assertEqual(
401            result,
402            expected_log,
403            msg='Status was not extracted from log message.',
404        )
405
406    def test_not_extracting_status_new_line_before_code(self):
407        """Test new line characters after pw::Status are not allowed."""
408        expected_log = Log(
409            message='Could not start flux capacitor: pw::Status=\n7',
410            file_and_line='my/path/file.cc:123',
411            level=logging.INFO,
412            source_name=self.decoder.source_name,
413            timestamp=_ZERO_NS_TIMESTAMP_STR,
414        )
415        result = self.decoder.parse_log_entry_proto(
416            log_pb2.LogEntry(
417                message=b'Could not start flux capacitor: pw::Status=\n7',
418                file=b'my/path/file.cc',
419                line_level=Log.pack_line_level(123, logging.INFO),
420            )
421        )
422        self.assertEqual(
423            result,
424            expected_log,
425            msg='Status was not extracted from log message.',
426        )
427
428    def test_not_extracting_status_from_log_message_with_improper_format(self):
429        """Test status not extracted from log message with incorrect format."""
430        expected_log = Log(
431            message='Could not start flux capacitor: pw::Status12',
432            file_and_line='my/path/file.cc:123',
433            level=logging.INFO,
434            source_name=self.decoder.source_name,
435            timestamp=_ZERO_NS_TIMESTAMP_STR,
436        )
437        result = self.decoder.parse_log_entry_proto(
438            log_pb2.LogEntry(
439                message=b'Could not start flux capacitor: pw::Status12',
440                file=b'my/path/file.cc',
441                line_level=Log.pack_line_level(123, logging.INFO),
442            )
443        )
444        self.assertEqual(
445            result,
446            expected_log,
447            msg='Status was not extracted from log message.',
448        )
449
450    def test_status_code_in_message_does_not_exist(self):
451        """Test status does not exist in pw_status."""
452        expected_log = Log(
453            message='Could not start flux capacitor: pw::Status=17',
454            file_and_line='my/path/file.cc:123',
455            level=logging.INFO,
456            source_name=self.decoder.source_name,
457            timestamp=_ZERO_NS_TIMESTAMP_STR,
458        )
459        result = self.decoder.parse_log_entry_proto(
460            log_pb2.LogEntry(
461                message=b'Could not start flux capacitor: pw::Status=17',
462                file=b'my/path/file.cc',
463                line_level=Log.pack_line_level(123, logging.INFO),
464            )
465        )
466        self.assertEqual(
467            result,
468            expected_log,
469            msg='Status was not extracted from log message.',
470        )
471
472    def test_status_code_in_message_is_negative(self):
473        """Test status code is negative."""
474        expected_log = Log(
475            message='Could not start flux capacitor: pw::Status=-1',
476            file_and_line='my/path/file.cc:123',
477            level=logging.INFO,
478            source_name=self.decoder.source_name,
479            timestamp=_ZERO_NS_TIMESTAMP_STR,
480        )
481        result = self.decoder.parse_log_entry_proto(
482            log_pb2.LogEntry(
483                message=b'Could not start flux capacitor: pw::Status=-1',
484                file=b'my/path/file.cc',
485                line_level=Log.pack_line_level(123, logging.INFO),
486            )
487        )
488        self.assertEqual(
489            result,
490            expected_log,
491            msg='Status was not extracted from log message.',
492        )
493
494    def test_status_code_is_name(self):
495        """Test if the status code format includes the name instead."""
496        expected_log = Log(
497            message='Cannot use flux capacitor: pw::Status=PERMISSION_DENIED',
498            file_and_line='my/path/file.cc:123',
499            level=logging.INFO,
500            source_name=self.decoder.source_name,
501            timestamp=_ZERO_NS_TIMESTAMP_STR,
502        )
503        result = self.decoder.parse_log_entry_proto(
504            log_pb2.LogEntry(
505                message=(
506                    b'Cannot use flux capacitor: pw::Status=PERMISSION_DENIED'
507                ),
508                file=b'my/path/file.cc',
509                line_level=Log.pack_line_level(123, logging.INFO),
510            )
511        )
512        self.assertEqual(
513            result,
514            expected_log,
515            msg='Status was not extracted from log message.',
516        )
517
518    def test_spelling_mistakes_with_status_keyword(self):
519        """Test spelling mistakes with status keyword."""
520        expected_log = Log(
521            message='Could not start flux capacitor: pw::Rtatus=12',
522            file_and_line='my/path/file.cc:123',
523            level=logging.INFO,
524            source_name=self.decoder.source_name,
525            timestamp=_ZERO_NS_TIMESTAMP_STR,
526        )
527        result = self.decoder.parse_log_entry_proto(
528            log_pb2.LogEntry(
529                message=b'Could not start flux capacitor: pw::Rtatus=12',
530                file=b'my/path/file.cc',
531                line_level=Log.pack_line_level(123, logging.INFO),
532            )
533        )
534        self.assertEqual(
535            result,
536            expected_log,
537            msg='Status was not extracted from log message.',
538        )
539
540    def test_spelling_mistakes_with_status_keyword_lowercase_s(self):
541        """Test spelling mistakes with status keyword."""
542        expected_log = Log(
543            message='Could not start flux capacitor: pw::status=13',
544            file_and_line='my/path/file.cc:123',
545            level=logging.INFO,
546            source_name=self.decoder.source_name,
547            timestamp=_ZERO_NS_TIMESTAMP_STR,
548        )
549        result = self.decoder.parse_log_entry_proto(
550            log_pb2.LogEntry(
551                message=b'Could not start flux capacitor: pw::status=13',
552                file=b'my/path/file.cc',
553                line_level=Log.pack_line_level(123, logging.INFO),
554            )
555        )
556        self.assertEqual(
557            result,
558            expected_log,
559            msg='Status was not extracted from log message.',
560        )
561
562    def test_status_code_at_beginning_of_message(self):
563        """Test embedded status argument is found."""
564        expected_log = Log(
565            message='UNAVAILABLE to connect to server.',
566            file_and_line='my/path/file.cc:123',
567            level=logging.INFO,
568            source_name=self.decoder.source_name,
569            timestamp=_ZERO_NS_TIMESTAMP_STR,
570        )
571        result = self.decoder.parse_log_entry_proto(
572            log_pb2.LogEntry(
573                message=b'pw::Status=14 to connect to server.',
574                file=b'my/path/file.cc',
575                line_level=Log.pack_line_level(123, logging.INFO),
576            )
577        )
578        self.assertEqual(
579            result,
580            expected_log,
581            msg='Status was not extracted from log message.',
582        )
583
584    def test_status_code_in_the_middle_of_message(self):
585        """Test embedded status argument is found."""
586        expected_log = Log(
587            message='Connection error: UNAVAILABLE connecting to server.',
588            file_and_line='my/path/file.cc:123',
589            level=logging.INFO,
590            source_name=self.decoder.source_name,
591            timestamp=_ZERO_NS_TIMESTAMP_STR,
592        )
593        result = self.decoder.parse_log_entry_proto(
594            log_pb2.LogEntry(
595                message=(
596                    b'Connection error: pw::Status=14 connecting to server.'
597                ),
598                file=b'my/path/file.cc',
599                line_level=Log.pack_line_level(123, logging.INFO),
600            )
601        )
602        self.assertEqual(
603            result,
604            expected_log,
605            msg='Status was not extracted from log message.',
606        )
607
608    def test_status_code_with_no_surrounding_spaces(self):
609        """Test embedded status argument is found."""
610        expected_log = Log(
611            message='Connection error:UNAVAILABLEconnecting to server.',
612            file_and_line='my/path/file.cc:123',
613            level=logging.INFO,
614            source_name=self.decoder.source_name,
615            timestamp=_ZERO_NS_TIMESTAMP_STR,
616        )
617        result = self.decoder.parse_log_entry_proto(
618            log_pb2.LogEntry(
619                message=b'Connection error:pw::Status=14connecting to server.',
620                file=b'my/path/file.cc',
621                line_level=Log.pack_line_level(123, logging.INFO),
622            )
623        )
624        self.assertEqual(
625            result,
626            expected_log,
627            msg='Status was not extracted from log message.',
628        )
629
630    def test_multiple_status_arguments_in_log_message(self):
631        """Test replacement of multiple status arguments into status string."""
632        expected_log = Log(
633            message='Connection error: UNAVAILABLE and PERMISSION_DENIED.',
634            file_and_line='my/path/file.cc:123',
635            level=logging.INFO,
636            source_name=self.decoder.source_name,
637            timestamp=_ZERO_NS_TIMESTAMP_STR,
638        )
639        result = self.decoder.parse_log_entry_proto(
640            log_pb2.LogEntry(
641                message=b'Connection error: pw::Status=14 and pw::Status=7.',
642                file=b'my/path/file.cc',
643                line_level=Log.pack_line_level(123, logging.INFO),
644            )
645        )
646        self.assertEqual(
647            result,
648            expected_log,
649            msg='Status was not extracted from log message.',
650        )
651
652    def test_no_filename_in_message_parses_successfully(self):
653        """Test that if the file name is not present the log entry is parsed."""
654        thread_name = 'thread'
655
656        log_entry = log_pb2.LogEntry(
657            message=pw_tokenizer.encode.encode_token_and_args(
658                _MESSAGE_TOKEN_NO_FILENAME
659            ),
660            line_level=Log.pack_line_level(123, logging.DEBUG),
661            thread=bytes(thread_name.encode('utf-8')),
662        )
663        expected_log = Log(
664            message=_MESSAGE_NO_FILENAME,
665            file_and_line=':123',
666            level=logging.DEBUG,
667            source_name=self.decoder.source_name,
668            timestamp=_ZERO_NS_TIMESTAMP_STR,
669            thread_name=thread_name,
670        )
671        result = self.decoder.parse_log_entry_proto(log_entry)
672        self.assertEqual(result, expected_log)
673
674    def test_log_decoded_log(self):
675        """Test that the logger correctly formats a decoded log."""
676        test_log = Log(
677            message="SampleMessage",
678            level=logging.DEBUG,
679            timestamp='1:30',
680            module_name="MyModule",
681            source_name="MySource",
682            thread_name="MyThread",
683            file_and_line='my_file.cc:123',
684            metadata_fields={'field1': 432, 'field2': 'value'},
685        )
686
687        class CapturingLogger(logging.Logger):
688            """Captures values passed to log().
689
690            Tests that calls to log() have the correct level, extra arguments
691            and the message format string and arguments match.
692            """
693
694            @dataclass(frozen=True)
695            class LoggerLog:
696                """Represents a process log() call."""
697
698                level: int
699                message: str
700                kwargs: Any
701
702            def __init__(self):
703                super().__init__(name="CapturingLogger")
704                self.log_calls: list[CapturingLogger.LoggerLog] = []
705
706            def log(self, level, msg, *args, **kwargs) -> None:
707                log = CapturingLogger.LoggerLog(
708                    level=level, message=msg % args, kwargs=kwargs
709                )
710                self.log_calls.append(log)
711
712        test_logger = CapturingLogger()
713        log_decoded_log(test_log, test_logger)
714        self.assertEqual(len(test_logger.log_calls), 1)
715        self.assertEqual(test_logger.log_calls[0].level, test_log.level)
716        self.assertEqual(
717            test_logger.log_calls[0].message,
718            '[%s] %s %s %s %s'
719            % (
720                test_log.source_name,
721                test_log.module_name,
722                test_log.timestamp,
723                test_log.message,
724                test_log.file_and_line,
725            ),
726        )
727        self.assertEqual(
728            test_logger.log_calls[0].kwargs['extra']['extra_metadata_fields'],
729            test_log.metadata_fields,
730        )
731
732
733class TestLogStreamDecoderLogDropDetectionFunctionality(
734    TestLogStreamDecoderBase
735):
736    """Tests LogStreamDecoder log drop detection functionality."""
737
738    def test_log_drops_transport_error(self):
739        """Tests log drops at transport."""
740        log_entry_in_log_entries_1 = 3
741        log_entries_1 = log_pb2.LogEntries(
742            first_entry_sequence_id=0,
743            entries=[
744                _create_random_log_entry()
745                for _ in range(log_entry_in_log_entries_1)
746            ],
747        )
748        self.decoder.parse_log_entries_proto(log_entries_1)
749        # Assume a second LogEntries was dropped with 5 log entries. I.e. a
750        # log_entries_2 with sequence_ie = 4 and 5 log entries.
751        log_entry_in_log_entries_2 = 5
752        log_entry_in_log_entries_3 = 3
753        log_entries_3 = log_pb2.LogEntries(
754            first_entry_sequence_id=log_entry_in_log_entries_2
755            + log_entry_in_log_entries_3,
756            entries=[
757                _create_random_log_entry()
758                for _ in range(log_entry_in_log_entries_3)
759            ],
760        )
761        self.decoder.parse_log_entries_proto(log_entries_3)
762
763        # The drop message is placed where the dropped logs were detected.
764        self.assertEqual(
765            len(self.captured_logs),
766            log_entry_in_log_entries_1 + 1 + log_entry_in_log_entries_3,
767            msg=(
768                'Unexpected number of messages received: '
769                f'{self._captured_logs_as_str()}'
770            ),
771        )
772        self.assertEqual(
773            (
774                f'Dropped {log_entry_in_log_entries_2} logs due to '
775                f'{LogStreamDecoder.DROP_REASON_LOSS_AT_TRANSPORT}'
776            ),
777            self.captured_logs[log_entry_in_log_entries_1].message,
778        )
779
780    def test_log_drops_source_not_connected(self):
781        """Tests log drops when source of the logs was not connected."""
782        log_entry_in_log_entries = 4
783        drop_count = 7
784        log_entries = log_pb2.LogEntries(
785            first_entry_sequence_id=drop_count,
786            entries=[
787                _create_random_log_entry()
788                for _ in range(log_entry_in_log_entries)
789            ],
790        )
791        self.decoder.parse_log_entries_proto(log_entries)
792
793        # The drop message is placed where the log drops was detected.
794        self.assertEqual(
795            len(self.captured_logs),
796            1 + log_entry_in_log_entries,
797            msg=(
798                'Unexpected number of messages received: '
799                f'{self._captured_logs_as_str()}'
800            ),
801        )
802        self.assertEqual(
803            (
804                f'Dropped {drop_count} logs due to '
805                f'{LogStreamDecoder.DROP_REASON_SOURCE_NOT_CONNECTED}'
806            ),
807            self.captured_logs[0].message,
808        )
809
810    def test_log_drops_source_enqueue_failure_no_message(self):
811        """Tests log drops when source reports log drops."""
812        drop_count = 5
813        log_entries = log_pb2.LogEntries(
814            first_entry_sequence_id=0,
815            entries=[
816                _create_random_log_entry(),
817                _create_random_log_entry(),
818                _create_drop_count_message_log_entry(drop_count),
819                _create_random_log_entry(),
820            ],
821        )
822        self.decoder.parse_log_entries_proto(log_entries)
823
824        # The drop message is placed where the log drops was detected.
825        self.assertEqual(
826            len(self.captured_logs),
827            4,
828            msg=(
829                'Unexpected number of messages received: '
830                f'{self._captured_logs_as_str()}'
831            ),
832        )
833        self.assertEqual(
834            (
835                f'Dropped {drop_count} logs due to '
836                f'{LogStreamDecoder.DROP_REASON_SOURCE_ENQUEUE_FAILURE}'
837            ),
838            self.captured_logs[2].message,
839        )
840
841    def test_log_drops_source_enqueue_failure_with_message(self):
842        """Tests log drops when source reports log drops."""
843        drop_count = 8
844        reason = 'Flux Capacitor exploded'
845        log_entries = log_pb2.LogEntries(
846            first_entry_sequence_id=0,
847            entries=[
848                _create_random_log_entry(),
849                _create_random_log_entry(),
850                _create_drop_count_message_log_entry(drop_count, reason),
851                _create_random_log_entry(),
852            ],
853        )
854        self.decoder.parse_log_entries_proto(log_entries)
855
856        # The drop message is placed where the log drops was detected.
857        self.assertEqual(
858            len(self.captured_logs),
859            4,
860            msg=(
861                'Unexpected number of messages received: '
862                f'{self._captured_logs_as_str()}'
863            ),
864        )
865        self.assertEqual(
866            f'Dropped {drop_count} logs due to {reason.lower()}',
867            self.captured_logs[2].message,
868        )
869
870
871class TestTimestampFormatting(TestCase):
872    """Tests for log_decoder.timestamp_parser_* functions."""
873
874    maxDiff = None
875
876    def test_zero_millisecond_resolution(self) -> None:
877        # Check for 3 trailing 0s on a millisecond resolution timestamp.
878        self.assertEqual(
879            '00:45:45.587', timestamp_parser_ms_since_boot(2745587)
880        )
881        self.assertEqual(
882            '00:45:45.000', timestamp_parser_ms_since_boot(2745000)
883        )
884
885    def test_zero_nanosecond_resolution(self) -> None:
886        # Check for 6 trailing 0s on a nanosecond resolution timestamp.
887        self.assertEqual(
888            '00:45:45.587123', timestamp_parser_ns_since_boot(2745587123456)
889        )
890        self.assertEqual(
891            '00:45:45.000000', timestamp_parser_ns_since_boot(2745000000000)
892        )
893
894
895if __name__ == '__main__':
896    main()
897