xref: /aosp_15_r20/external/pigweed/pw_trace/py/trace_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2020 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests the trace module."""
16
17import json
18import struct
19import unittest
20
21from pw_trace import trace
22
23test_events = [
24    trace.TraceEvent(trace.TraceType.INSTANTANEOUS, "m1", "L1", 1),
25    trace.TraceEvent(trace.TraceType.INSTANTANEOUS_GROUP, "m2", "L2", 2, "G2"),
26    trace.TraceEvent(trace.TraceType.ASYNC_STEP, "m3", "L3", 3, "G3", 103),
27    trace.TraceEvent(trace.TraceType.DURATION_START, "m4", "L4", 4),
28    trace.TraceEvent(trace.TraceType.DURATION_GROUP_START, "m5", "L5", 5, "G5"),
29    trace.TraceEvent(trace.TraceType.ASYNC_START, "m6", "L6", 6, "G6", 106),
30    trace.TraceEvent(trace.TraceType.DURATION_END, "m7", "L7", 7),
31    trace.TraceEvent(trace.TraceType.DURATION_GROUP_END, "m8", "L8", 8, "G8"),
32    trace.TraceEvent(trace.TraceType.ASYNC_END, "m9", "L9", 9, "G9", 109),
33]
34
35test_json = [
36    {"ph": "I", "pid": "m1", "name": "L1", "ts": 1, "s": "p"},
37    {"ph": "I", "pid": "m2", "tid": "G2", "name": "L2", "ts": 2, "s": "t"},
38    {
39        "ph": "n",
40        "pid": "m3",
41        "tid": "G3",
42        "name": "L3",
43        "ts": 3,
44        "scope": "G3",
45        "cat": "m3",
46        "id": 103,
47        "args": {"id": 103},
48    },
49    {"ph": "B", "pid": "m4", "tid": "L4", "name": "L4", "ts": 4},
50    {"ph": "B", "pid": "m5", "tid": "G5", "name": "L5", "ts": 5},
51    {
52        "ph": "b",
53        "pid": "m6",
54        "tid": "G6",
55        "name": "L6",
56        "ts": 6,
57        "scope": "G6",
58        "cat": "m6",
59        "id": 106,
60        "args": {"id": 106},
61    },
62    {"ph": "E", "pid": "m7", "tid": "L7", "name": "L7", "ts": 7},
63    {"ph": "E", "pid": "m8", "tid": "G8", "name": "L8", "ts": 8},
64    {
65        "ph": "e",
66        "pid": "m9",
67        "tid": "G9",
68        "name": "L9",
69        "ts": 9,
70        "scope": "G9",
71        "cat": "m9",
72        "id": 109,
73        "args": {"id": 109},
74    },
75]
76
77
78class TestTraceGenerateJson(unittest.TestCase):
79    """Tests generate json with various events."""
80
81    def test_generate_single_json_event(self):
82        event = trace.TraceEvent(
83            event_type=trace.TraceType.INSTANTANEOUS,
84            module="module",
85            label="label",
86            timestamp_us=10,
87        )
88        json_lines = trace.generate_trace_json([event])
89        self.assertEqual(1, len(json_lines))
90        self.assertEqual(
91            json.loads(json_lines[0]),
92            {"ph": "I", "pid": "module", "name": "label", "ts": 10, "s": "p"},
93        )
94
95    def test_generate_multiple_json_events(self):
96        json_lines = trace.generate_trace_json(test_events)
97        self.assertEqual(len(test_json), len(json_lines))
98        for actual, expected in zip(json_lines, test_json):
99            self.assertEqual(expected, json.loads(actual))
100
101    def test_generate_json_data_arg_label(self):
102        event = trace.TraceEvent(
103            event_type=trace.TraceType.INSTANTANEOUS,
104            module="module",
105            label="",  # Is replaced by data string
106            timestamp_us=10,
107            has_data=True,
108            data_fmt="@pw_arg_label",
109            data=bytes("arg", "utf-8"),
110        )
111        json_lines = trace.generate_trace_json([event])
112        self.assertEqual(1, len(json_lines))
113        self.assertEqual(
114            json.loads(json_lines[0]),
115            {"ph": "I", "pid": "module", "name": "arg", "ts": 10, "s": "p"},
116        )
117
118    def test_generate_json_data_arg_group(self):
119        event = trace.TraceEvent(
120            event_type=trace.TraceType.INSTANTANEOUS_GROUP,
121            module="module",
122            label="label",
123            timestamp_us=10,
124            has_data=True,
125            data_fmt="@pw_arg_group",
126            data=bytes("arg", "utf-8"),
127        )
128        json_lines = trace.generate_trace_json([event])
129        self.assertEqual(1, len(json_lines))
130        self.assertEqual(
131            json.loads(json_lines[0]),
132            {
133                "ph": "I",
134                "pid": "module",
135                "name": "label",
136                "tid": "arg",
137                "ts": 10,
138                "s": "t",
139            },
140        )
141
142    def test_generate_json_data_counter(self):
143        event = trace.TraceEvent(
144            event_type=trace.TraceType.INSTANTANEOUS,
145            module="module",
146            label="counter",
147            timestamp_us=10,
148            has_data=True,
149            data_fmt="@pw_arg_counter",
150            data=(5).to_bytes(4, byteorder="little"),
151        )
152        json_lines = trace.generate_trace_json([event])
153        self.assertEqual(1, len(json_lines))
154        self.assertEqual(
155            json.loads(json_lines[0]),
156            {
157                "ph": "C",
158                "pid": "module",
159                "name": "counter",
160                "ts": 10,
161                "s": "p",
162                "args": {"counter": 5},
163            },
164        )
165
166    def test_generate_json_data_struct_fmt_single(self):
167        event = trace.TraceEvent(
168            event_type=trace.TraceType.INSTANTANEOUS,
169            module="module",
170            label="counter",
171            timestamp_us=10,
172            has_data=True,
173            data_fmt="@pw_py_struct_fmt:H",
174            data=(5).to_bytes(2, byteorder="little"),
175        )
176        json_lines = trace.generate_trace_json([event])
177        self.assertEqual(1, len(json_lines))
178        self.assertEqual(
179            json.loads(json_lines[0]),
180            {
181                "ph": "I",
182                "pid": "module",
183                "name": "counter",
184                "ts": 10,
185                "s": "p",
186                "args": {"data_0": 5},
187            },
188        )
189
190    def test_generate_json_data_struct_fmt_multi(self):
191        event = trace.TraceEvent(
192            event_type=trace.TraceType.INSTANTANEOUS,
193            module="module",
194            label="counter",
195            timestamp_us=10,
196            has_data=True,
197            data_fmt="@pw_py_struct_fmt:Hl3s",
198            data=struct.pack("<Hl3s", 5, 2, b'abc'),
199        )
200        json_lines = trace.generate_trace_json([event])
201        self.assertEqual(1, len(json_lines))
202        self.assertEqual(
203            json.loads(json_lines[0]),
204            {
205                "ph": "I",
206                "pid": "module",
207                "name": "counter",
208                "ts": 10,
209                "s": "p",
210                "args": {"data_0": 5, "data_1": 2, "data_2": 'abc'},
211            },
212        )
213
214    def test_generate_error_json_data_struct_invalid_small_buffer(self):
215        event = trace.TraceEvent(
216            event_type=trace.TraceType.INSTANTANEOUS,
217            module="module",
218            label="counter",
219            timestamp_us=10,
220            has_data=True,
221            data_fmt="@pw_py_struct_fmt:Hl",
222            data=struct.pack("<H", 5),
223        )
224        json_lines = trace.generate_trace_json([event])
225        self.assertEqual(1, len(json_lines))
226        self.assertEqual(
227            json.loads(json_lines[0]),
228            {
229                "ph": "I",
230                "pid": "module",
231                "name": "counter",
232                "ts": 10,
233                "s": "p",
234                "args": {
235                    "error": f"Mismatched struct/data format {event.data_fmt} "
236                    f"expected data len {struct.calcsize('<Hl')} data "
237                    f"{event.data.hex()} data len {len(event.data)}"
238                },
239            },
240        )
241
242    def test_generate_error_json_data_struct_invalid_large_buffer(self):
243        event = trace.TraceEvent(
244            event_type=trace.TraceType.INSTANTANEOUS,
245            module="module",
246            label="counter",
247            timestamp_us=10,
248            has_data=True,
249            data_fmt="@pw_py_struct_fmt:Hl",
250            data=struct.pack("<Hll", 5, 2, 5),
251        )
252        json_lines = trace.generate_trace_json([event])
253        self.assertEqual(1, len(json_lines))
254        self.assertEqual(
255            json.loads(json_lines[0]),
256            {
257                "ph": "I",
258                "pid": "module",
259                "name": "counter",
260                "ts": 10,
261                "s": "p",
262                "args": {
263                    "error": f"Mismatched struct/data format {event.data_fmt} "
264                    f"expected data len {struct.calcsize('<Hl')} data "
265                    f"{event.data.hex()} data len {len(event.data)}"
266                },
267            },
268        )
269
270    def test_generate_json_data_map_fmt_single(self):
271        event = trace.TraceEvent(
272            event_type=trace.TraceType.INSTANTANEOUS,
273            module="module",
274            label="label",
275            timestamp_us=10,
276            has_data=True,
277            data_fmt="@pw_py_map_fmt:{Field:l}",
278            data=struct.pack("<l", 20),
279        )
280        json_lines = trace.generate_trace_json([event])
281        self.assertEqual(1, len(json_lines))
282        self.assertEqual(
283            json.loads(json_lines[0]),
284            {
285                "ph": "I",
286                "pid": "module",
287                "name": "label",
288                "ts": 10,
289                "s": "p",
290                "args": {"Field": 20},
291            },
292        )
293
294    def test_generate_json_data_map_fmt_multi(self):
295        event = trace.TraceEvent(
296            event_type=trace.TraceType.INSTANTANEOUS,
297            module="module",
298            label="label",
299            timestamp_us=10,
300            has_data=True,
301            data_fmt="@pw_py_map_fmt:{Field: l, Field2: l , Field3: 3s}",
302            data=struct.pack("<ll3s", 20, 40, b'abc'),
303        )
304        json_lines = trace.generate_trace_json([event])
305        self.assertEqual(1, len(json_lines))
306        self.assertEqual(
307            json.loads(json_lines[0]),
308            {
309                "ph": "I",
310                "pid": "module",
311                "name": "label",
312                "ts": 10,
313                "s": "p",
314                "args": {"Field": 20, "Field2": 40, "Field3": 'abc'},
315            },
316        )
317
318    def test_generate_error_json_data_map_bad_fmt(self):
319        event = trace.TraceEvent(
320            event_type=trace.TraceType.INSTANTANEOUS,
321            module="module",
322            label="label",
323            timestamp_us=10,
324            has_data=True,
325            data_fmt="@pw_py_map_fmt:{Field;l,Field2;l}",
326            data=struct.pack("<ll", 20, 40),
327        )
328        json_lines = trace.generate_trace_json([event])
329        self.assertEqual(1, len(json_lines))
330        self.assertEqual(
331            json.loads(json_lines[0]),
332            {
333                "ph": "I",
334                "pid": "module",
335                "name": "label",
336                "ts": 10,
337                "s": "p",
338                "args": {"error": f"Invalid map format {event.data_fmt}"},
339            },
340        )
341
342    def test_generate_error_json_data_map_invalid_small_buffer(self):
343        event = trace.TraceEvent(
344            event_type=trace.TraceType.INSTANTANEOUS,
345            module="module",
346            label="label",
347            timestamp_us=10,
348            has_data=True,
349            data_fmt="@pw_py_map_fmt:{Field:l,Field2:l}",
350            data=struct.pack("<l", 20),
351        )
352        json_lines = trace.generate_trace_json([event])
353        self.assertEqual(1, len(json_lines))
354        self.assertEqual(
355            json.loads(json_lines[0]),
356            {
357                "ph": "I",
358                "pid": "module",
359                "name": "label",
360                "ts": 10,
361                "s": "p",
362                "args": {
363                    "error": f"Mismatched map/data format {event.data_fmt} "
364                    f"expected data len {struct.calcsize('<ll')} data "
365                    f"{event.data.hex()} data len {len(event.data)}"
366                },
367            },
368        )
369
370    def test_generate_error_json_data_map_invalid_large_buffer(self):
371        event = trace.TraceEvent(
372            event_type=trace.TraceType.INSTANTANEOUS,
373            module="module",
374            label="label",
375            timestamp_us=10,
376            has_data=True,
377            data_fmt="@pw_py_map_fmt:{Field:H,Field2:H}",
378            data=struct.pack("<ll", 20, 40),
379        )
380        json_lines = trace.generate_trace_json([event])
381        self.assertEqual(1, len(json_lines))
382        self.assertEqual(
383            json.loads(json_lines[0]),
384            {
385                "ph": "I",
386                "pid": "module",
387                "name": "label",
388                "ts": 10,
389                "s": "p",
390                "args": {
391                    "error": f"Mismatched map/data format {event.data_fmt} "
392                    f"expected data len {struct.calcsize('<HH')} data "
393                    f"{event.data.hex()} data len {len(event.data)}"
394                },
395            },
396        )
397
398
399if __name__ == '__main__':
400    unittest.main()
401