1#!/usr/bin/env python3 2# Copyright 2020 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests the trace module.""" 16 17import json 18import struct 19import unittest 20 21from pw_trace import trace 22 23test_events = [ 24 trace.TraceEvent(trace.TraceType.INSTANTANEOUS, "m1", "L1", 1), 25 trace.TraceEvent(trace.TraceType.INSTANTANEOUS_GROUP, "m2", "L2", 2, "G2"), 26 trace.TraceEvent(trace.TraceType.ASYNC_STEP, "m3", "L3", 3, "G3", 103), 27 trace.TraceEvent(trace.TraceType.DURATION_START, "m4", "L4", 4), 28 trace.TraceEvent(trace.TraceType.DURATION_GROUP_START, "m5", "L5", 5, "G5"), 29 trace.TraceEvent(trace.TraceType.ASYNC_START, "m6", "L6", 6, "G6", 106), 30 trace.TraceEvent(trace.TraceType.DURATION_END, "m7", "L7", 7), 31 trace.TraceEvent(trace.TraceType.DURATION_GROUP_END, "m8", "L8", 8, "G8"), 32 trace.TraceEvent(trace.TraceType.ASYNC_END, "m9", "L9", 9, "G9", 109), 33] 34 35test_json = [ 36 {"ph": "I", "pid": "m1", "name": "L1", "ts": 1, "s": "p"}, 37 {"ph": "I", "pid": "m2", "tid": "G2", "name": "L2", "ts": 2, "s": "t"}, 38 { 39 "ph": "n", 40 "pid": "m3", 41 "tid": "G3", 42 "name": "L3", 43 "ts": 3, 44 "scope": "G3", 45 "cat": "m3", 46 "id": 103, 47 "args": {"id": 103}, 48 }, 49 {"ph": "B", "pid": "m4", "tid": "L4", "name": "L4", "ts": 4}, 50 {"ph": "B", "pid": "m5", "tid": "G5", "name": "L5", "ts": 5}, 51 { 52 "ph": "b", 53 "pid": "m6", 54 "tid": "G6", 55 "name": "L6", 56 "ts": 6, 57 "scope": "G6", 58 "cat": "m6", 59 "id": 106, 60 "args": {"id": 106}, 61 }, 62 {"ph": "E", "pid": "m7", "tid": "L7", "name": "L7", "ts": 7}, 63 {"ph": "E", "pid": "m8", "tid": "G8", "name": "L8", "ts": 8}, 64 { 65 "ph": "e", 66 "pid": "m9", 67 "tid": "G9", 68 "name": "L9", 69 "ts": 9, 70 "scope": "G9", 71 "cat": "m9", 72 "id": 109, 73 "args": {"id": 109}, 74 }, 75] 76 77 78class TestTraceGenerateJson(unittest.TestCase): 79 """Tests generate json with various events.""" 80 81 def test_generate_single_json_event(self): 82 event = trace.TraceEvent( 83 event_type=trace.TraceType.INSTANTANEOUS, 84 module="module", 85 label="label", 86 timestamp_us=10, 87 ) 88 json_lines = trace.generate_trace_json([event]) 89 self.assertEqual(1, len(json_lines)) 90 self.assertEqual( 91 json.loads(json_lines[0]), 92 {"ph": "I", "pid": "module", "name": "label", "ts": 10, "s": "p"}, 93 ) 94 95 def test_generate_multiple_json_events(self): 96 json_lines = trace.generate_trace_json(test_events) 97 self.assertEqual(len(test_json), len(json_lines)) 98 for actual, expected in zip(json_lines, test_json): 99 self.assertEqual(expected, json.loads(actual)) 100 101 def test_generate_json_data_arg_label(self): 102 event = trace.TraceEvent( 103 event_type=trace.TraceType.INSTANTANEOUS, 104 module="module", 105 label="", # Is replaced by data string 106 timestamp_us=10, 107 has_data=True, 108 data_fmt="@pw_arg_label", 109 data=bytes("arg", "utf-8"), 110 ) 111 json_lines = trace.generate_trace_json([event]) 112 self.assertEqual(1, len(json_lines)) 113 self.assertEqual( 114 json.loads(json_lines[0]), 115 {"ph": "I", "pid": "module", "name": "arg", "ts": 10, "s": "p"}, 116 ) 117 118 def test_generate_json_data_arg_group(self): 119 event = trace.TraceEvent( 120 event_type=trace.TraceType.INSTANTANEOUS_GROUP, 121 module="module", 122 label="label", 123 timestamp_us=10, 124 has_data=True, 125 data_fmt="@pw_arg_group", 126 data=bytes("arg", "utf-8"), 127 ) 128 json_lines = trace.generate_trace_json([event]) 129 self.assertEqual(1, len(json_lines)) 130 self.assertEqual( 131 json.loads(json_lines[0]), 132 { 133 "ph": "I", 134 "pid": "module", 135 "name": "label", 136 "tid": "arg", 137 "ts": 10, 138 "s": "t", 139 }, 140 ) 141 142 def test_generate_json_data_counter(self): 143 event = trace.TraceEvent( 144 event_type=trace.TraceType.INSTANTANEOUS, 145 module="module", 146 label="counter", 147 timestamp_us=10, 148 has_data=True, 149 data_fmt="@pw_arg_counter", 150 data=(5).to_bytes(4, byteorder="little"), 151 ) 152 json_lines = trace.generate_trace_json([event]) 153 self.assertEqual(1, len(json_lines)) 154 self.assertEqual( 155 json.loads(json_lines[0]), 156 { 157 "ph": "C", 158 "pid": "module", 159 "name": "counter", 160 "ts": 10, 161 "s": "p", 162 "args": {"counter": 5}, 163 }, 164 ) 165 166 def test_generate_json_data_struct_fmt_single(self): 167 event = trace.TraceEvent( 168 event_type=trace.TraceType.INSTANTANEOUS, 169 module="module", 170 label="counter", 171 timestamp_us=10, 172 has_data=True, 173 data_fmt="@pw_py_struct_fmt:H", 174 data=(5).to_bytes(2, byteorder="little"), 175 ) 176 json_lines = trace.generate_trace_json([event]) 177 self.assertEqual(1, len(json_lines)) 178 self.assertEqual( 179 json.loads(json_lines[0]), 180 { 181 "ph": "I", 182 "pid": "module", 183 "name": "counter", 184 "ts": 10, 185 "s": "p", 186 "args": {"data_0": 5}, 187 }, 188 ) 189 190 def test_generate_json_data_struct_fmt_multi(self): 191 event = trace.TraceEvent( 192 event_type=trace.TraceType.INSTANTANEOUS, 193 module="module", 194 label="counter", 195 timestamp_us=10, 196 has_data=True, 197 data_fmt="@pw_py_struct_fmt:Hl3s", 198 data=struct.pack("<Hl3s", 5, 2, b'abc'), 199 ) 200 json_lines = trace.generate_trace_json([event]) 201 self.assertEqual(1, len(json_lines)) 202 self.assertEqual( 203 json.loads(json_lines[0]), 204 { 205 "ph": "I", 206 "pid": "module", 207 "name": "counter", 208 "ts": 10, 209 "s": "p", 210 "args": {"data_0": 5, "data_1": 2, "data_2": 'abc'}, 211 }, 212 ) 213 214 def test_generate_error_json_data_struct_invalid_small_buffer(self): 215 event = trace.TraceEvent( 216 event_type=trace.TraceType.INSTANTANEOUS, 217 module="module", 218 label="counter", 219 timestamp_us=10, 220 has_data=True, 221 data_fmt="@pw_py_struct_fmt:Hl", 222 data=struct.pack("<H", 5), 223 ) 224 json_lines = trace.generate_trace_json([event]) 225 self.assertEqual(1, len(json_lines)) 226 self.assertEqual( 227 json.loads(json_lines[0]), 228 { 229 "ph": "I", 230 "pid": "module", 231 "name": "counter", 232 "ts": 10, 233 "s": "p", 234 "args": { 235 "error": f"Mismatched struct/data format {event.data_fmt} " 236 f"expected data len {struct.calcsize('<Hl')} data " 237 f"{event.data.hex()} data len {len(event.data)}" 238 }, 239 }, 240 ) 241 242 def test_generate_error_json_data_struct_invalid_large_buffer(self): 243 event = trace.TraceEvent( 244 event_type=trace.TraceType.INSTANTANEOUS, 245 module="module", 246 label="counter", 247 timestamp_us=10, 248 has_data=True, 249 data_fmt="@pw_py_struct_fmt:Hl", 250 data=struct.pack("<Hll", 5, 2, 5), 251 ) 252 json_lines = trace.generate_trace_json([event]) 253 self.assertEqual(1, len(json_lines)) 254 self.assertEqual( 255 json.loads(json_lines[0]), 256 { 257 "ph": "I", 258 "pid": "module", 259 "name": "counter", 260 "ts": 10, 261 "s": "p", 262 "args": { 263 "error": f"Mismatched struct/data format {event.data_fmt} " 264 f"expected data len {struct.calcsize('<Hl')} data " 265 f"{event.data.hex()} data len {len(event.data)}" 266 }, 267 }, 268 ) 269 270 def test_generate_json_data_map_fmt_single(self): 271 event = trace.TraceEvent( 272 event_type=trace.TraceType.INSTANTANEOUS, 273 module="module", 274 label="label", 275 timestamp_us=10, 276 has_data=True, 277 data_fmt="@pw_py_map_fmt:{Field:l}", 278 data=struct.pack("<l", 20), 279 ) 280 json_lines = trace.generate_trace_json([event]) 281 self.assertEqual(1, len(json_lines)) 282 self.assertEqual( 283 json.loads(json_lines[0]), 284 { 285 "ph": "I", 286 "pid": "module", 287 "name": "label", 288 "ts": 10, 289 "s": "p", 290 "args": {"Field": 20}, 291 }, 292 ) 293 294 def test_generate_json_data_map_fmt_multi(self): 295 event = trace.TraceEvent( 296 event_type=trace.TraceType.INSTANTANEOUS, 297 module="module", 298 label="label", 299 timestamp_us=10, 300 has_data=True, 301 data_fmt="@pw_py_map_fmt:{Field: l, Field2: l , Field3: 3s}", 302 data=struct.pack("<ll3s", 20, 40, b'abc'), 303 ) 304 json_lines = trace.generate_trace_json([event]) 305 self.assertEqual(1, len(json_lines)) 306 self.assertEqual( 307 json.loads(json_lines[0]), 308 { 309 "ph": "I", 310 "pid": "module", 311 "name": "label", 312 "ts": 10, 313 "s": "p", 314 "args": {"Field": 20, "Field2": 40, "Field3": 'abc'}, 315 }, 316 ) 317 318 def test_generate_error_json_data_map_bad_fmt(self): 319 event = trace.TraceEvent( 320 event_type=trace.TraceType.INSTANTANEOUS, 321 module="module", 322 label="label", 323 timestamp_us=10, 324 has_data=True, 325 data_fmt="@pw_py_map_fmt:{Field;l,Field2;l}", 326 data=struct.pack("<ll", 20, 40), 327 ) 328 json_lines = trace.generate_trace_json([event]) 329 self.assertEqual(1, len(json_lines)) 330 self.assertEqual( 331 json.loads(json_lines[0]), 332 { 333 "ph": "I", 334 "pid": "module", 335 "name": "label", 336 "ts": 10, 337 "s": "p", 338 "args": {"error": f"Invalid map format {event.data_fmt}"}, 339 }, 340 ) 341 342 def test_generate_error_json_data_map_invalid_small_buffer(self): 343 event = trace.TraceEvent( 344 event_type=trace.TraceType.INSTANTANEOUS, 345 module="module", 346 label="label", 347 timestamp_us=10, 348 has_data=True, 349 data_fmt="@pw_py_map_fmt:{Field:l,Field2:l}", 350 data=struct.pack("<l", 20), 351 ) 352 json_lines = trace.generate_trace_json([event]) 353 self.assertEqual(1, len(json_lines)) 354 self.assertEqual( 355 json.loads(json_lines[0]), 356 { 357 "ph": "I", 358 "pid": "module", 359 "name": "label", 360 "ts": 10, 361 "s": "p", 362 "args": { 363 "error": f"Mismatched map/data format {event.data_fmt} " 364 f"expected data len {struct.calcsize('<ll')} data " 365 f"{event.data.hex()} data len {len(event.data)}" 366 }, 367 }, 368 ) 369 370 def test_generate_error_json_data_map_invalid_large_buffer(self): 371 event = trace.TraceEvent( 372 event_type=trace.TraceType.INSTANTANEOUS, 373 module="module", 374 label="label", 375 timestamp_us=10, 376 has_data=True, 377 data_fmt="@pw_py_map_fmt:{Field:H,Field2:H}", 378 data=struct.pack("<ll", 20, 40), 379 ) 380 json_lines = trace.generate_trace_json([event]) 381 self.assertEqual(1, len(json_lines)) 382 self.assertEqual( 383 json.loads(json_lines[0]), 384 { 385 "ph": "I", 386 "pid": "module", 387 "name": "label", 388 "ts": 10, 389 "s": "p", 390 "args": { 391 "error": f"Mismatched map/data format {event.data_fmt} " 392 f"expected data len {struct.calcsize('<HH')} data " 393 f"{event.data.hex()} data len {len(event.data)}" 394 }, 395 }, 396 ) 397 398 399if __name__ == '__main__': 400 unittest.main() 401