1# File generated from <stdin>, with the command: 2# ./pdl-compiler/scripts/generate_python_backend.py 3# /!\ Do not edit by hand. 4from dataclasses import dataclass, field, fields 5from typing import Optional, List, Tuple, Union 6import enum 7import inspect 8import math 9 10@dataclass 11class Packet: 12 payload: Optional[bytes] = field(repr=False, default_factory=bytes, compare=False) 13 14 @classmethod 15 def parse_all(cls, span: bytes) -> 'Packet': 16 packet, remain = getattr(cls, 'parse')(span) 17 if len(remain) > 0: 18 raise Exception('Unexpected parsing remainder') 19 return packet 20 21 @property 22 def size(self) -> int: 23 pass 24 25 def show(self, prefix: str = ''): 26 print(f'{self.__class__.__name__}') 27 28 def print_val(p: str, pp: str, name: str, align: int, typ, val): 29 if name == 'payload': 30 pass 31 32 # Scalar fields. 33 elif typ is int: 34 print(f'{p}{name:{align}} = {val} (0x{val:x})') 35 36 # Byte fields. 37 elif typ is bytes: 38 print(f'{p}{name:{align}} = [', end='') 39 line = '' 40 n_pp = '' 41 for (idx, b) in enumerate(val): 42 if idx > 0 and idx % 8 == 0: 43 print(f'{n_pp}{line}') 44 line = '' 45 n_pp = pp + (' ' * (align + 4)) 46 line += f' {b:02x}' 47 print(f'{n_pp}{line} ]') 48 49 # Enum fields. 50 elif inspect.isclass(typ) and issubclass(typ, enum.IntEnum): 51 print(f'{p}{name:{align}} = {typ.__name__}::{val.name} (0x{val:x})') 52 53 # Struct fields. 54 elif inspect.isclass(typ) and issubclass(typ, globals().get('Packet')): 55 print(f'{p}{name:{align}} = ', end='') 56 val.show(prefix=pp) 57 58 # Array fields. 59 elif getattr(typ, '__origin__', None) == list: 60 print(f'{p}{name:{align}}') 61 last = len(val) - 1 62 align = 5 63 for (idx, elt) in enumerate(val): 64 n_p = pp + ('├── ' if idx != last else '└── ') 65 n_pp = pp + ('│ ' if idx != last else ' ') 66 print_val(n_p, n_pp, f'[{idx}]', align, typ.__args__[0], val[idx]) 67 68 # Custom fields. 69 elif inspect.isclass(typ): 70 print(f'{p}{name:{align}} = {repr(val)}') 71 72 else: 73 print(f'{p}{name:{align}} = ##{typ}##') 74 75 last = len(fields(self)) - 1 76 align = max(len(f.name) for f in fields(self) if f.name != 'payload') 77 78 for (idx, f) in enumerate(fields(self)): 79 p = prefix + ('├── ' if idx != last else '└── ') 80 pp = prefix + ('│ ' if idx != last else ' ') 81 val = getattr(self, f.name) 82 83 print_val(p, pp, f.name, align, f.type, val) 84 85class PacketBoundaryFlag(enum.IntEnum): 86 COMPLETE = 0x0 87 NOT_COMPLETE = 0x1 88 89 @staticmethod 90 def from_int(v: int) -> Union[int, 'PacketBoundaryFlag']: 91 try: 92 return PacketBoundaryFlag(v) 93 except ValueError as exn: 94 raise exn 95 96 97class MessageType(enum.IntEnum): 98 DATA = 0x0 99 COMMAND = 0x1 100 RESPONSE = 0x2 101 NOTIFICATION = 0x3 102 103 @staticmethod 104 def from_int(v: int) -> Union[int, 'MessageType']: 105 try: 106 return MessageType(v) 107 except ValueError as exn: 108 raise exn 109 110 111class GroupId(enum.IntEnum): 112 CORE = 0x0 113 SESSION_CONFIG = 0x1 114 SESSION_CONTROL = 0x2 115 DATA_CONTROL = 0x3 116 VENDOR_RESERVED_9 = 0x9 117 VENDOR_RESERVED_A = 0xa 118 VENDOR_RESERVED_B = 0xb 119 VENDOR_ANDROID = 0xc 120 TEST = 0xd 121 VENDOR_RESERVED_E = 0xe 122 VENDOR_RESERVED_F = 0xf 123 124 @staticmethod 125 def from_int(v: int) -> Union[int, 'GroupId']: 126 try: 127 return GroupId(v) 128 except ValueError as exn: 129 raise exn 130 131 132class DataPacketFormat(enum.IntEnum): 133 DATA_SND = 0x1 134 DATA_RCV = 0x2 135 136 @staticmethod 137 def from_int(v: int) -> Union[int, 'DataPacketFormat']: 138 try: 139 return DataPacketFormat(v) 140 except ValueError as exn: 141 raise exn 142 143 144class CoreOpcodeId(enum.IntEnum): 145 DEVICE_RESET = 0x0 146 DEVICE_STATUS = 0x1 147 GET_DEVICE_INFO = 0x2 148 GET_CAPS_INFO = 0x3 149 SET_CONFIG = 0x4 150 GET_CONFIG = 0x5 151 GENERIC_ERROR = 0x7 152 QUERY_UWBS_TIMESTAMP = 0x8 153 154 @staticmethod 155 def from_int(v: int) -> Union[int, 'CoreOpcodeId']: 156 try: 157 return CoreOpcodeId(v) 158 except ValueError as exn: 159 raise exn 160 161 162class SessionConfigOpcodeId(enum.IntEnum): 163 INIT = 0x0 164 DEINIT = 0x1 165 STATUS = 0x2 166 SET_APP_CONFIG = 0x3 167 GET_APP_CONFIG = 0x4 168 GET_COUNT = 0x5 169 GET_STATE = 0x6 170 UPDATE_CONTROLLER_MULTICAST_LIST = 0x7 171 UPDATE_DT_ANCHOR_RANGING_ROUNDS = 0x8 172 UPDATE_DT_TAG_RANGING_ROUNDS = 0x9 173 QUERY_DATA_SIZE_IN_RANGING = 0xb 174 175 @staticmethod 176 def from_int(v: int) -> Union[int, 'SessionConfigOpcodeId']: 177 try: 178 return SessionConfigOpcodeId(v) 179 except ValueError as exn: 180 raise exn 181 182 183class SessionControlOpcodeId(enum.IntEnum): 184 START = 0x0 185 STOP = 0x1 186 GET_RANGING_COUNT = 0x3 187 DATA_CREDIT = 0x4 188 DATA_TRANSFER_STATUS = 0x5 189 190 @staticmethod 191 def from_int(v: int) -> Union[int, 'SessionControlOpcodeId']: 192 try: 193 return SessionControlOpcodeId(v) 194 except ValueError as exn: 195 raise exn 196 197 198class AndroidOpcodeId(enum.IntEnum): 199 GET_POWER_STATS = 0x0 200 SET_COUNTRY_CODE = 0x1 201 FIRA_RANGE_DIAGNOSTICS = 0x2 202 203 @staticmethod 204 def from_int(v: int) -> Union[int, 'AndroidOpcodeId']: 205 try: 206 return AndroidOpcodeId(v) 207 except ValueError as exn: 208 raise exn 209 210 211class Status(enum.IntEnum): 212 OK = 0x0 213 REJECTED = 0x1 214 FAILED = 0x2 215 SYNTAX_ERROR = 0x3 216 INVALID_PARAM = 0x4 217 INVALID_RANGE = 0x5 218 INVALID_MESSAGE_SIZE = 0x6 219 UNKNOWN_GID = 0x7 220 UNKNOWN_OID = 0x8 221 READ_ONLY = 0x9 222 UCI_MESSAGE_RETRY = 0xa 223 UNKNOWN = 0xb 224 NOT_APPLICABLE = 0xc 225 ERROR_SESSION_NOT_EXIST = 0x11 226 ERROR_SESSION_DUPLICATE = 0x12 227 ERROR_SESSION_ACTIVE = 0x13 228 ERROR_MAX_SESSIONS_EXCEEDED = 0x14 229 ERROR_SESSION_NOT_CONFIGURED = 0x15 230 ERROR_ACTIVE_SESSIONS_ONGOING = 0x16 231 ERROR_MULTICAST_LIST_FULL = 0x17 232 ERROR_UWB_INITIATION_TIME_TOO_OLD = 0x1a 233 OK_NEGATIVE_DISTANCE_REPORT = 0x1b 234 RANGING_TX_FAILED = 0x20 235 RANGING_RX_TIMEOUT = 0x21 236 RANGING_RX_PHY_DEC_FAILED = 0x22 237 RANGING_RX_PHY_TOA_FAILED = 0x23 238 RANGING_RX_PHY_STS_FAILED = 0x24 239 RANGING_RX_MAC_DEC_FAILED = 0x25 240 RANGING_RX_MAC_IE_DEC_FAILED = 0x26 241 RANGING_RX_MAC_IE_MISSING = 0x27 242 ERROR_ROUND_INDEX_NOT_ACTIVATED = 0x28 243 ERROR_NUMBER_OF_ACTIVE_RANGING_ROUNDS_EXCEEDED = 0x29 244 ERROR_DL_TDOA_DEVICE_ADDRESS_NOT_MATCHING_IN_REPLY_TIME_LIST = 0x2a 245 246 @staticmethod 247 def from_int(v: int) -> Union[int, 'Status']: 248 try: 249 return Status(v) 250 except ValueError as exn: 251 return v 252 253 254class DataRcvStatusCode(enum.IntEnum): 255 UCI_STATUS_SUCCESS = 0x0 256 UCI_STATUS_ERROR = 0x1 257 UCI_STATUS_UNKNOWN = 0x2 258 259 @staticmethod 260 def from_int(v: int) -> Union[int, 'DataRcvStatusCode']: 261 try: 262 return DataRcvStatusCode(v) 263 except ValueError as exn: 264 raise exn 265 266 267class CreditAvailability(enum.IntEnum): 268 CREDIT_NOT_AVAILABLE = 0x0 269 CREDIT_AVAILABLE = 0x1 270 271 @staticmethod 272 def from_int(v: int) -> Union[int, 'CreditAvailability']: 273 try: 274 return CreditAvailability(v) 275 except ValueError as exn: 276 raise exn 277 278 279class DataTransferNtfStatusCode(enum.IntEnum): 280 UCI_DATA_TRANSFER_STATUS_REPETITION_OK = 0x0 281 UCI_DATA_TRANSFER_STATUS_OK = 0x1 282 UCI_DATA_TRANSFER_STATUS_ERROR_DATA_TRANSFER = 0x2 283 UCI_DATA_TRANSFER_STATUS_ERROR_NO_CREDIT_AVAILABLE = 0x3 284 UCI_DATA_TRANSFER_STATUS_ERROR_REJECTED = 0x4 285 UCI_DATA_TRANSFER_STATUS_SESSION_TYPE_NOT_SUPPORTED = 0x5 286 UCI_DATA_TRANSFER_STATUS_ERROR_DATA_TRANSFER_IS_ONGOING = 0x6 287 UCI_DATA_TRANSFER_STATUS_INVALID_FORMAT = 0x7 288 289 @staticmethod 290 def from_int(v: int) -> Union[int, 'DataTransferNtfStatusCode']: 291 try: 292 return DataTransferNtfStatusCode(v) 293 except ValueError as exn: 294 raise exn 295 296 297class ResetConfig(enum.IntEnum): 298 UWBS_RESET = 0x0 299 300 @staticmethod 301 def from_int(v: int) -> Union[int, 'ResetConfig']: 302 try: 303 return ResetConfig(v) 304 except ValueError as exn: 305 raise exn 306 307 308class AppConfigTlvType(enum.IntEnum): 309 DEVICE_TYPE = 0x0 310 RANGING_ROUND_USAGE = 0x1 311 STS_CONFIG = 0x2 312 MULTI_NODE_MODE = 0x3 313 CHANNEL_NUMBER = 0x4 314 NUMBER_OF_CONTROLEES = 0x5 315 DEVICE_MAC_ADDRESS = 0x6 316 DST_MAC_ADDRESS = 0x7 317 SLOT_DURATION = 0x8 318 RANGING_DURATION = 0x9 319 STS_INDEX = 0xa 320 MAC_FCS_TYPE = 0xb 321 RANGING_ROUND_CONTROL = 0xc 322 AOA_RESULT_REQ = 0xd 323 SESSION_INFO_NTF_CONFIG = 0xe 324 NEAR_PROXIMITY_CONFIG = 0xf 325 FAR_PROXIMITY_CONFIG = 0x10 326 DEVICE_ROLE = 0x11 327 RFRAME_CONFIG = 0x12 328 RSSI_REPORTING = 0x13 329 PREAMBLE_CODE_INDEX = 0x14 330 SFD_ID = 0x15 331 PSDU_DATA_RATE = 0x16 332 PREAMBLE_DURATION = 0x17 333 LINK_LAYER_MODE = 0x18 334 DATA_REPETITION_COUNT = 0x19 335 RANGING_TIME_STRUCT = 0x1a 336 SLOTS_PER_RR = 0x1b 337 AOA_BOUND_CONFIG = 0x1d 338 PRF_MODE = 0x1f 339 CAP_SIZE_RANGE = 0x20 340 TX_JITTER_WINDOW_SIZE = 0x21 341 SCHEDULE_MODE = 0x22 342 KEY_ROTATION = 0x23 343 KEY_ROTATION_RATE = 0x24 344 SESSION_PRIORITY = 0x25 345 MAC_ADDRESS_MODE = 0x26 346 VENDOR_ID = 0x27 347 STATIC_STS_IV = 0x28 348 NUMBER_OF_STS_SEGMENTS = 0x29 349 MAX_RR_RETRY = 0x2a 350 UWB_INITIATION_TIME = 0x2b 351 HOPPING_MODE = 0x2c 352 BLOCK_STRIDE_LENGTH = 0x2d 353 RESULT_REPORT_CONFIG = 0x2e 354 IN_BAND_TERMINATION_ATTEMPT_COUNT = 0x2f 355 SUB_SESSION_ID = 0x30 356 BPRF_PHR_DATA_RATE = 0x31 357 MAX_NUMBER_OF_MEASUREMENTS = 0x32 358 STS_LENGTH = 0x35 359 MIN_FRAMES_PER_RR = 0x3a 360 MTU_SIZE = 0x3b 361 INTER_FRAME_INTERVAL = 0x3c 362 DL_TDOA_RANGING_METHOD = 0x3d 363 DL_TDOA_TX_TIMESTAMP_CONF = 0x3e 364 DL_TDOA_HOP_COUNT = 0x3f 365 DL_TDOA_ANCHOR_CFO = 0x40 366 DL_TDOA_ANCHOR_LOCATION = 0x41 367 DL_TDOA_TX_ACTIVE_RANGING_ROUNDS = 0x42 368 DL_TDOA_BLOCK_SKIPPING = 0x43 369 DL_TDOA_TIME_REFERENCE_ANCHOR = 0x44 370 SESSION_KEY = 0x45 371 SUB_SESSION_KEY = 0x46 372 SESSION_DATA_TRANSFER_STATUS_NTF_CONFIG = 0x47 373 SESSION_TIME_BASE = 0x48 374 DL_TDOA_RESPONDER_TOF = 0x49 375 SECURE_RANGING_NEFA_LEVEL = 0x4a 376 SECURE_RANGING_CSW_LENGTH = 0x4b 377 APPLICATION_DATA_ENDPOINT = 0x4c 378 OWR_AOA_MEASUREMENT_NTF_PERIOD = 0x4d 379 380 @staticmethod 381 def from_int(v: int) -> Union[int, 'AppConfigTlvType']: 382 try: 383 return AppConfigTlvType(v) 384 except ValueError as exn: 385 return v 386 387 388class DeviceType(enum.IntEnum): 389 CONTROLEE = 0x0 390 CONTROLLER = 0x1 391 392 @staticmethod 393 def from_int(v: int) -> Union[int, 'DeviceType']: 394 try: 395 return DeviceType(v) 396 except ValueError as exn: 397 raise exn 398 399 400class RangingRoundUsage(enum.IntEnum): 401 SS_TWR_DEFERRED_MODE = 0x1 402 DS_TWR_DEFERRED_MODE = 0x2 403 SS_TWR_NON_DEFERRED_MODE = 0x3 404 DS_TWR_NON_DEFERRED_MODE = 0x4 405 ON_WAY_RANGING_DL_TDOA = 0x5 406 OWR_AOA_MEASUREMENT = 0x6 407 ESS_TWR_NON_DEFERRED = 0x7 408 ADS_TWR_NON_DEFERRED = 0x8 409 410 @staticmethod 411 def from_int(v: int) -> Union[int, 'RangingRoundUsage']: 412 try: 413 return RangingRoundUsage(v) 414 except ValueError as exn: 415 raise exn 416 417 418class StsConfig(enum.IntEnum): 419 STATIC = 0x0 420 DYNAMIC = 0x1 421 DYNAMIC_FOR_RESPONDER_SUB_SESSION_KEY = 0x2 422 PROVISIONED = 0x3 423 PROVISIONED_FOR_RESPONDER_SUB_SESSION_KEY = 0x4 424 425 @staticmethod 426 def from_int(v: int) -> Union[int, 'StsConfig']: 427 try: 428 return StsConfig(v) 429 except ValueError as exn: 430 raise exn 431 432 433class MultiNodeMode(enum.IntEnum): 434 ONE_TO_ONE = 0x0 435 ONE_TO_MANY = 0x1 436 437 @staticmethod 438 def from_int(v: int) -> Union[int, 'MultiNodeMode']: 439 try: 440 return MultiNodeMode(v) 441 except ValueError as exn: 442 raise exn 443 444 445class ChannelNumber(enum.IntEnum): 446 CHANNEL_NUMBER_5 = 0x5 447 CHANNEL_NUMBER_6 = 0x6 448 CHANNEL_NUMBER_8 = 0x8 449 CHANNEL_NUMBER_9 = 0x9 450 CHANNEL_NUMBER_10 = 0xa 451 CHANNEL_NUMBER_12 = 0xc 452 CHANNEL_NUMBER_13 = 0xd 453 CHANNEL_NUMBER_14 = 0xe 454 455 @staticmethod 456 def from_int(v: int) -> Union[int, 'ChannelNumber']: 457 try: 458 return ChannelNumber(v) 459 except ValueError as exn: 460 raise exn 461 462 463class MacFcsType(enum.IntEnum): 464 CRC_16 = 0x0 465 CRC_32 = 0x1 466 467 @staticmethod 468 def from_int(v: int) -> Union[int, 'MacFcsType']: 469 try: 470 return MacFcsType(v) 471 except ValueError as exn: 472 raise exn 473 474 475@dataclass 476class RangingRoundControl(Packet): 477 rrrm: int = field(kw_only=True, default=0) 478 rcp: int = field(kw_only=True, default=0) 479 mrp: int = field(kw_only=True, default=0) 480 mrm: int = field(kw_only=True, default=0) 481 482 def __post_init__(self): 483 pass 484 485 @staticmethod 486 def parse(span: bytes) -> Tuple['RangingRoundControl', bytes]: 487 fields = {'payload': None} 488 if len(span) < 1: 489 raise Exception('Invalid packet size') 490 fields['rrrm'] = (span[0] >> 0) & 0x1 491 if (span[0] >> 1) & 0x1 != 0x1: 492 raise Exception('Unexpected fixed field value') 493 fields['rcp'] = (span[0] >> 2) & 0x1 494 fields['mrp'] = (span[0] >> 6) & 0x1 495 fields['mrm'] = (span[0] >> 7) & 0x1 496 span = span[1:] 497 return RangingRoundControl(**fields), span 498 499 def serialize(self, payload: bytes = None) -> bytes: 500 _span = bytearray() 501 if self.rrrm > 1: 502 print(f"Invalid value for field RangingRoundControl::rrrm: {self.rrrm} > 1; the value will be truncated") 503 self.rrrm &= 1 504 if self.rcp > 1: 505 print(f"Invalid value for field RangingRoundControl::rcp: {self.rcp} > 1; the value will be truncated") 506 self.rcp &= 1 507 if self.mrp > 1: 508 print(f"Invalid value for field RangingRoundControl::mrp: {self.mrp} > 1; the value will be truncated") 509 self.mrp &= 1 510 if self.mrm > 1: 511 print(f"Invalid value for field RangingRoundControl::mrm: {self.mrm} > 1; the value will be truncated") 512 self.mrm &= 1 513 _value = ( 514 (self.rrrm << 0) | 515 (1 << 1) | 516 (self.rcp << 2) | 517 (self.mrp << 6) | 518 (self.mrm << 7) 519 ) 520 _span.append(_value) 521 return bytes(_span) 522 523 @property 524 def size(self) -> int: 525 return 1 526 527class AoaResultReq(enum.IntEnum): 528 AOA_DISABLED = 0x0 529 AOA_ENABLED = 0x1 530 AOA_ENABLED_AZIMUTH_ONLY = 0x2 531 AOA_ENABLED_ELEVATION_ONLY = 0x3 532 533 @staticmethod 534 def from_int(v: int) -> Union[int, 'AoaResultReq']: 535 try: 536 return AoaResultReq(v) 537 except ValueError as exn: 538 raise exn 539 540 541class SessionInfoNtfConfig(enum.IntEnum): 542 DISABLE = 0x0 543 ENABLE = 0x1 544 ENABLE_PROXIMITY_TRIGGER = 0x2 545 ENABLE_AOA_TRIGGER = 0x3 546 ENABLE_PROXIMITY_AOA_TRIGGER = 0x4 547 ENABLE_PROXIMITY_EDGE_TRIGGER = 0x5 548 ENABLE_AOA_EDGE_TRIGGER = 0x6 549 ENABLE_PROXIMITY_AOA_EDGE_TRIGGER = 0x7 550 551 @staticmethod 552 def from_int(v: int) -> Union[int, 'SessionInfoNtfConfig']: 553 try: 554 return SessionInfoNtfConfig(v) 555 except ValueError as exn: 556 raise exn 557 558 559class DeviceRole(enum.IntEnum): 560 RESPONDER = 0x0 561 INITIATOR = 0x1 562 ADVERTISER = 0x5 563 OBSERVER = 0x6 564 DT_ANCHOR = 0x7 565 DT_TAG = 0x8 566 567 @staticmethod 568 def from_int(v: int) -> Union[int, 'DeviceRole']: 569 try: 570 return DeviceRole(v) 571 except ValueError as exn: 572 raise exn 573 574 575class RframeConfig(enum.IntEnum): 576 SP0 = 0x0 577 SP1 = 0x1 578 SP3 = 0x3 579 580 @staticmethod 581 def from_int(v: int) -> Union[int, 'RframeConfig']: 582 try: 583 return RframeConfig(v) 584 except ValueError as exn: 585 raise exn 586 587 588class RssiReporting(enum.IntEnum): 589 DISABLE = 0x0 590 ENABLE = 0x1 591 592 @staticmethod 593 def from_int(v: int) -> Union[int, 'RssiReporting']: 594 try: 595 return RssiReporting(v) 596 except ValueError as exn: 597 raise exn 598 599 600class PsduDataRate(enum.IntEnum): 601 DATA_RATE_6M81 = 0x0 602 DATA_RATE_7M80 = 0x1 603 DATA_RATE_27M2 = 0x2 604 DATA_RATE_31M2 = 0x3 605 606 @staticmethod 607 def from_int(v: int) -> Union[int, 'PsduDataRate']: 608 try: 609 return PsduDataRate(v) 610 except ValueError as exn: 611 raise exn 612 613 614class PreambleDuration(enum.IntEnum): 615 DURATION_32_SYMBOLS = 0x0 616 DURATION_64_SYMBOLS = 0x1 617 618 @staticmethod 619 def from_int(v: int) -> Union[int, 'PreambleDuration']: 620 try: 621 return PreambleDuration(v) 622 except ValueError as exn: 623 raise exn 624 625 626class LinkLayerMode(enum.IntEnum): 627 BYPASS_MODE = 0x0 628 629 @staticmethod 630 def from_int(v: int) -> Union[int, 'LinkLayerMode']: 631 try: 632 return LinkLayerMode(v) 633 except ValueError as exn: 634 raise exn 635 636 637class RangingTimeStruct(enum.IntEnum): 638 BLOCK_BASED_SCHEDULING = 0x1 639 640 @staticmethod 641 def from_int(v: int) -> Union[int, 'RangingTimeStruct']: 642 try: 643 return RangingTimeStruct(v) 644 except ValueError as exn: 645 raise exn 646 647 648class PrfMode(enum.IntEnum): 649 BPRF_MODE = 0x0 650 HPRF_MODE_124M8 = 0x1 651 HPRF_MODE_249M6 = 0x2 652 653 @staticmethod 654 def from_int(v: int) -> Union[int, 'PrfMode']: 655 try: 656 return PrfMode(v) 657 except ValueError as exn: 658 raise exn 659 660 661class ScheduleMode(enum.IntEnum): 662 CONTENTION_BASED = 0x0 663 TIME_SCHEDULED = 0x1 664 665 @staticmethod 666 def from_int(v: int) -> Union[int, 'ScheduleMode']: 667 try: 668 return ScheduleMode(v) 669 except ValueError as exn: 670 raise exn 671 672 673class KeyRotation(enum.IntEnum): 674 DISABLE = 0x0 675 ENABLE = 0x1 676 677 @staticmethod 678 def from_int(v: int) -> Union[int, 'KeyRotation']: 679 try: 680 return KeyRotation(v) 681 except ValueError as exn: 682 raise exn 683 684 685class MacAddressMode(enum.IntEnum): 686 MODE_0 = 0x0 687 MODE_1 = 0x1 688 MODE_2 = 0x2 689 690 @staticmethod 691 def from_int(v: int) -> Union[int, 'MacAddressMode']: 692 try: 693 return MacAddressMode(v) 694 except ValueError as exn: 695 raise exn 696 697 698class HoppingMode(enum.IntEnum): 699 DISABLE = 0x0 700 ENABLE = 0x1 701 702 @staticmethod 703 def from_int(v: int) -> Union[int, 'HoppingMode']: 704 try: 705 return HoppingMode(v) 706 except ValueError as exn: 707 raise exn 708 709 710@dataclass 711class ResultReportConfig(Packet): 712 tof: int = field(kw_only=True, default=0) 713 aoa_azimuth: int = field(kw_only=True, default=0) 714 aoa_elevation: int = field(kw_only=True, default=0) 715 aoa_fom: int = field(kw_only=True, default=0) 716 717 def __post_init__(self): 718 pass 719 720 @staticmethod 721 def parse(span: bytes) -> Tuple['ResultReportConfig', bytes]: 722 fields = {'payload': None} 723 if len(span) < 1: 724 raise Exception('Invalid packet size') 725 fields['tof'] = (span[0] >> 0) & 0x1 726 fields['aoa_azimuth'] = (span[0] >> 1) & 0x1 727 fields['aoa_elevation'] = (span[0] >> 2) & 0x1 728 fields['aoa_fom'] = (span[0] >> 3) & 0x1 729 span = span[1:] 730 return ResultReportConfig(**fields), span 731 732 def serialize(self, payload: bytes = None) -> bytes: 733 _span = bytearray() 734 if self.tof > 1: 735 print(f"Invalid value for field ResultReportConfig::tof: {self.tof} > 1; the value will be truncated") 736 self.tof &= 1 737 if self.aoa_azimuth > 1: 738 print(f"Invalid value for field ResultReportConfig::aoa_azimuth: {self.aoa_azimuth} > 1; the value will be truncated") 739 self.aoa_azimuth &= 1 740 if self.aoa_elevation > 1: 741 print(f"Invalid value for field ResultReportConfig::aoa_elevation: {self.aoa_elevation} > 1; the value will be truncated") 742 self.aoa_elevation &= 1 743 if self.aoa_fom > 1: 744 print(f"Invalid value for field ResultReportConfig::aoa_fom: {self.aoa_fom} > 1; the value will be truncated") 745 self.aoa_fom &= 1 746 _value = ( 747 (self.tof << 0) | 748 (self.aoa_azimuth << 1) | 749 (self.aoa_elevation << 2) | 750 (self.aoa_fom << 3) 751 ) 752 _span.append(_value) 753 return bytes(_span) 754 755 @property 756 def size(self) -> int: 757 return 1 758 759class BprfPhrDataRate(enum.IntEnum): 760 DATA_RATE_850K = 0x0 761 DATA_RATE_6M81 = 0x1 762 763 @staticmethod 764 def from_int(v: int) -> Union[int, 'BprfPhrDataRate']: 765 try: 766 return BprfPhrDataRate(v) 767 except ValueError as exn: 768 raise exn 769 770 771class StsLength(enum.IntEnum): 772 LENGTH_32_SYMBOLS = 0x0 773 LENGTH_64_SYMBOLS = 0x1 774 LENGTH_128_SYMBOLS = 0x2 775 776 @staticmethod 777 def from_int(v: int) -> Union[int, 'StsLength']: 778 try: 779 return StsLength(v) 780 except ValueError as exn: 781 raise exn 782 783 784class DlTdoaRangingMethod(enum.IntEnum): 785 SS_TWR = 0x0 786 DS_TWR = 0x1 787 788 @staticmethod 789 def from_int(v: int) -> Union[int, 'DlTdoaRangingMethod']: 790 try: 791 return DlTdoaRangingMethod(v) 792 except ValueError as exn: 793 raise exn 794 795 796class DlTdoaAnchorCfo(enum.IntEnum): 797 ANCHOR_CFO_NOT_INCLUDED = 0x0 798 ANCHOR_CFO_INCLUDED = 0x1 799 800 @staticmethod 801 def from_int(v: int) -> Union[int, 'DlTdoaAnchorCfo']: 802 try: 803 return DlTdoaAnchorCfo(v) 804 except ValueError as exn: 805 raise exn 806 807 808class SessionDataTransferStatusNtfConfig(enum.IntEnum): 809 DISABLE = 0x0 810 ENABLE = 0x1 811 812 @staticmethod 813 def from_int(v: int) -> Union[int, 'SessionDataTransferStatusNtfConfig']: 814 try: 815 return SessionDataTransferStatusNtfConfig(v) 816 except ValueError as exn: 817 raise exn 818 819 820class CapTlvType(enum.IntEnum): 821 SUPPORTED_FIRA_PHY_VERSION_RANGE = 0x0 822 SUPPORTED_FIRA_MAC_VERSION_RANGE = 0x1 823 SUPPORTED_DEVICE_ROLES = 0x2 824 SUPPORTED_RANGING_METHOD = 0x3 825 SUPPORTED_STS_CONFIG = 0x4 826 SUPPORTED_MULTI_NODE_MODES = 0x5 827 SUPPORTED_RANGING_TIME_STRUCT = 0x6 828 SUPPORTED_SCHEDULED_MODE = 0x7 829 SUPPORTED_HOPPING_MODE = 0x8 830 SUPPORTED_BLOCK_STRIDING = 0x9 831 SUPPORTED_UWB_INITIATION_TIME = 0xa 832 SUPPORTED_CHANNELS = 0xb 833 SUPPORTED_RFRAME_CONFIG = 0xc 834 SUPPORTED_CC_CONSTRAINT_LENGTH = 0xd 835 SUPPORTED_BPRF_PARAMETER_SETS = 0xe 836 SUPPORTED_HPRF_PARAMETER_SETS = 0xf 837 SUPPORTED_AOA = 0x10 838 SUPPORTED_EXTENDED_MAC_ADDRESS = 0x11 839 SUPPORTED_MAX_MESSAGE_SIZE = 0x12 840 SUPPORTED_MAX_DATA_PACKET_PAYLOAD_SIZE = 0x13 841 SUPPORTED_POWER_STATS = 0xc0 842 843 @staticmethod 844 def from_int(v: int) -> Union[int, 'CapTlvType']: 845 try: 846 return CapTlvType(v) 847 except ValueError as exn: 848 return v 849 850 851class AoaResultReqType(enum.IntEnum): 852 AOA_DISABLE = 0x0 853 AOA_ENABLE = 0x1 854 AOA_ENABLE_AZIMUTH = 0x2 855 AOA_ENABLE_ELEVATION = 0x3 856 AOA_ENABLE_INTERLEAVED = 0xf0 857 858 @staticmethod 859 def from_int(v: int) -> Union[int, 'AoaResultReqType']: 860 try: 861 return AoaResultReqType(v) 862 except ValueError as exn: 863 raise exn 864 865 866class DeviceState(enum.IntEnum): 867 DEVICE_STATE_READY = 0x1 868 DEVICE_STATE_ACTIVE = 0x2 869 DEVICE_STATE_ERROR = 0xff 870 871 @staticmethod 872 def from_int(v: int) -> Union[int, 'DeviceState']: 873 try: 874 return DeviceState(v) 875 except ValueError as exn: 876 raise exn 877 878 879class SessionState(enum.IntEnum): 880 SESSION_STATE_INIT = 0x0 881 SESSION_STATE_DEINIT = 0x1 882 SESSION_STATE_ACTIVE = 0x2 883 SESSION_STATE_IDLE = 0x3 884 885 @staticmethod 886 def from_int(v: int) -> Union[int, 'SessionState']: 887 try: 888 return SessionState(v) 889 except ValueError as exn: 890 raise exn 891 892 893class ReasonCode(enum.IntEnum): 894 STATE_CHANGE_WITH_SESSION_MANAGEMENT_COMMANDS = 0x0 895 MAX_RANGING_ROUND_RETRY_COUNT_REACHED = 0x1 896 MAX_NUMBER_OF_MEASUREMENTS_REACHED = 0x2 897 SESSION_SUSPENDED_DUE_TO_INBAND_SIGNAL = 0x3 898 SESSION_RESUMED_DUE_TO_INBAND_SIGNAL = 0x4 899 SESSION_STOPPED_DUE_TO_INBAND_SIGNAL = 0x5 900 ERROR_INVALID_UL_TDOA_RANDOM_WINDOW = 0x1d 901 ERROR_MIN_RFRAMES_PER_RR_NOT_SUPPORTED = 0x1e 902 ERROR_TX_DELAY_NOT_SUPPORTED = 0x1f 903 ERROR_SLOT_LENGTH_NOT_SUPPORTED = 0x20 904 ERROR_INSUFFICIENT_SLOTS_PER_RR = 0x21 905 ERROR_MAC_ADDRESS_MODE_NOT_SUPPORTED = 0x22 906 ERROR_INVALID_RANGING_DURATION = 0x23 907 ERROR_INVALID_STS_CONFIG = 0x24 908 ERROR_INVALID_RFRAME_CONFIG = 0x25 909 ERROR_HUS_NOT_ENOUGH_SLOTS = 0x26 910 ERROR_HUS_CFP_PHASE_TOO_SHORT = 0x27 911 ERROR_HUS_CAP_PHASE_TOO_SHORT = 0x28 912 ERROR_HUS_OTHERS = 0x29 913 ERROR_STATUS_SESSION_KEY_NOT_FOUND = 0x2a 914 ERROR_STATUS_SUB_SESSION_KEY_NOT_FOUND = 0x2b 915 ERROR_INVALID_PREAMBLE_CODE_INDEX = 0x2c 916 ERROR_INVALID_SFD_ID = 0x2d 917 ERROR_INVALID_PSDU_DATA_RATE = 0x2e 918 ERROR_INVALID_PHR_DATA_RATE = 0x2f 919 ERROR_INVALID_PREAMBLE_DURATION = 0x30 920 ERROR_INVALID_STS_LENGTH = 0x31 921 ERROR_INVALID_NUM_OF_STS_SEGMENTS = 0x32 922 ERROR_INVALID_NUM_OF_CONTROLEES = 0x33 923 ERROR_MAX_RANGING_REPLY_TIME_EXCEEDED = 0x34 924 ERROR_INVALID_DST_ADDRESS_LIST = 0x35 925 ERROR_INVALID_OR_NOT_FOUND_SUB_SESSION_ID = 0x36 926 ERROR_INVALID_RESULT_REPORT_CONFIG = 0x37 927 ERROR_INVALID_RANGING_ROUND_CONTROL_CONFIG = 0x38 928 ERROR_INVALID_RANGING_ROUND_USAGE = 0x39 929 ERROR_INVALID_MULTI_NODE_MODE = 0x3a 930 ERROR_RDS_FETCH_FAILURE = 0x3b 931 ERROR_REF_UWB_SESSION_DOES_NOT_EXIST = 0x3c 932 ERROR_REF_UWB_SESSION_RANGING_DURATION_MISMATCH = 0x3d 933 ERROR_REF_UWB_SESSION_INVALID_OFFSET_TIME = 0x3e 934 ERROR_REF_UWB_SESSION_LOST = 0x3f 935 VENDOR_SPECIFIC_REASON_CODE_2 = 0xff 936 937 @staticmethod 938 def from_int(v: int) -> Union[int, 'ReasonCode']: 939 try: 940 return ReasonCode(v) 941 except ValueError as exn: 942 return v 943 944 945class MulticastUpdateStatus(enum.IntEnum): 946 OK_MULTICAST_LIST_UPDATE = 0x0 947 ERROR_MULTICAST_LIST_FULL = 0x1 948 ERROR_KEY_FETCH_FAIL = 0x2 949 ERROR_SUB_SESSION_ID_NOT_FOUND = 0x3 950 ERROR_SUB_SESSION_KEY_NOT_FOUND = 0x4 951 ERROR_SUB_SESSION_KEY_NOT_APPLICABLE = 0x5 952 ERROR_SESSION_KEY_NOT_FOUND = 0x6 953 ERROR_ADDRESS_NOT_FOUND = 0x7 954 ERROR_ADDRESS_ALREADY_PRESENT = 0x8 955 956 @staticmethod 957 def from_int(v: int) -> Union[int, 'MulticastUpdateStatus']: 958 try: 959 return MulticastUpdateStatus(v) 960 except ValueError as exn: 961 raise exn 962 963 964class MacAddressIndicator(enum.IntEnum): 965 SHORT_ADDRESS = 0x0 966 EXTENDED_ADDRESS = 0x1 967 968 @staticmethod 969 def from_int(v: int) -> Union[int, 'MacAddressIndicator']: 970 try: 971 return MacAddressIndicator(v) 972 except ValueError as exn: 973 raise exn 974 975 976class SessionType(enum.IntEnum): 977 FIRA_RANGING_SESSION = 0x0 978 FIRA_RANGING_AND_IN_BAND_DATA_SESSION = 0x1 979 FIRA_DATA_TRANSFER_SESSION = 0x2 980 FIRA_RANGING_ONLY_PHASE = 0x3 981 FIRA_IN_BAND_DATA_PHASE = 0x4 982 FIRA_RANGING_WITH_DATA_PHASE = 0x5 983 CCC = 0xa0 984 DEVICE_TEST_MODE = 0xd0 985 986 @staticmethod 987 def from_int(v: int) -> Union[int, 'SessionType']: 988 try: 989 return SessionType(v) 990 except ValueError as exn: 991 raise exn 992 993 994@dataclass 995class CommonPacketHeader(Packet): 996 pbf: PacketBoundaryFlag = field(kw_only=True, default=PacketBoundaryFlag.COMPLETE) 997 mt: MessageType = field(kw_only=True, default=MessageType.DATA) 998 999 def __post_init__(self): 1000 pass 1001 1002 @staticmethod 1003 def parse(span: bytes) -> Tuple['CommonPacketHeader', bytes]: 1004 fields = {'payload': None} 1005 if len(span) < 1: 1006 raise Exception('Invalid packet size') 1007 fields['pbf'] = PacketBoundaryFlag.from_int((span[0] >> 4) & 0x1) 1008 fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7) 1009 span = span[1:] 1010 return CommonPacketHeader(**fields), span 1011 1012 def serialize(self, payload: bytes = None) -> bytes: 1013 _span = bytearray() 1014 _value = ( 1015 (self.pbf << 4) | 1016 (self.mt << 5) 1017 ) 1018 _span.append(_value) 1019 return bytes(_span) 1020 1021 @property 1022 def size(self) -> int: 1023 return 1 1024 1025@dataclass 1026class ControlPacketHeader(Packet): 1027 pbf: PacketBoundaryFlag = field(kw_only=True, default=PacketBoundaryFlag.COMPLETE) 1028 mt: MessageType = field(kw_only=True, default=MessageType.DATA) 1029 payload_length: int = field(kw_only=True, default=0) 1030 1031 def __post_init__(self): 1032 pass 1033 1034 @staticmethod 1035 def parse(span: bytes) -> Tuple['ControlPacketHeader', bytes]: 1036 fields = {'payload': None} 1037 if len(span) < 4: 1038 raise Exception('Invalid packet size') 1039 fields['pbf'] = PacketBoundaryFlag.from_int((span[0] >> 4) & 0x1) 1040 fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7) 1041 value_ = int.from_bytes(span[1:3], byteorder='little') 1042 fields['payload_length'] = span[3] 1043 span = span[4:] 1044 return ControlPacketHeader(**fields), span 1045 1046 def serialize(self, payload: bytes = None) -> bytes: 1047 _span = bytearray() 1048 _value = ( 1049 (self.pbf << 4) | 1050 (self.mt << 5) 1051 ) 1052 _span.append(_value) 1053 _span.extend([0] * 2) 1054 if self.payload_length > 255: 1055 print(f"Invalid value for field ControlPacketHeader::payload_length: {self.payload_length} > 255; the value will be truncated") 1056 self.payload_length &= 255 1057 _span.append((self.payload_length << 0)) 1058 return bytes(_span) 1059 1060 @property 1061 def size(self) -> int: 1062 return 4 1063 1064@dataclass 1065class DataPacketHeader(Packet): 1066 pbf: PacketBoundaryFlag = field(kw_only=True, default=PacketBoundaryFlag.COMPLETE) 1067 mt: MessageType = field(kw_only=True, default=MessageType.DATA) 1068 payload_length: int = field(kw_only=True, default=0) 1069 1070 def __post_init__(self): 1071 pass 1072 1073 @staticmethod 1074 def parse(span: bytes) -> Tuple['DataPacketHeader', bytes]: 1075 fields = {'payload': None} 1076 if len(span) < 4: 1077 raise Exception('Invalid packet size') 1078 fields['pbf'] = PacketBoundaryFlag.from_int((span[0] >> 4) & 0x1) 1079 fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7) 1080 value_ = int.from_bytes(span[2:4], byteorder='little') 1081 fields['payload_length'] = value_ 1082 span = span[4:] 1083 return DataPacketHeader(**fields), span 1084 1085 def serialize(self, payload: bytes = None) -> bytes: 1086 _span = bytearray() 1087 _value = ( 1088 (self.pbf << 4) | 1089 (self.mt << 5) 1090 ) 1091 _span.append(_value) 1092 _span.extend([0] * 1) 1093 if self.payload_length > 65535: 1094 print(f"Invalid value for field DataPacketHeader::payload_length: {self.payload_length} > 65535; the value will be truncated") 1095 self.payload_length &= 65535 1096 _span.extend(int.to_bytes((self.payload_length << 0), length=2, byteorder='little')) 1097 return bytes(_span) 1098 1099 @property 1100 def size(self) -> int: 1101 return 4 1102 1103@dataclass 1104class ControlPacket(Packet): 1105 gid: GroupId = field(kw_only=True, default=GroupId.CORE) 1106 mt: MessageType = field(kw_only=True, default=MessageType.DATA) 1107 1108 def __post_init__(self): 1109 pass 1110 1111 @staticmethod 1112 def parse(span: bytes) -> Tuple['ControlPacket', bytes]: 1113 fields = {'payload': None} 1114 if len(span) < 1: 1115 raise Exception('Invalid packet size') 1116 fields['gid'] = GroupId.from_int((span[0] >> 0) & 0xf) 1117 fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7) 1118 span = span[1:] 1119 payload = span 1120 span = bytes([]) 1121 fields['payload'] = payload 1122 try: 1123 return CorePacket.parse(fields.copy(), payload) 1124 except Exception as exn: 1125 pass 1126 try: 1127 return SessionConfigPacket.parse(fields.copy(), payload) 1128 except Exception as exn: 1129 pass 1130 try: 1131 return SessionControlPacket.parse(fields.copy(), payload) 1132 except Exception as exn: 1133 pass 1134 try: 1135 return AndroidPacket.parse(fields.copy(), payload) 1136 except Exception as exn: 1137 pass 1138 return ControlPacket(**fields), span 1139 1140 def serialize(self, payload: bytes = None) -> bytes: 1141 _span = bytearray() 1142 _value = ( 1143 (self.gid << 0) | 1144 (self.mt << 5) 1145 ) 1146 _span.append(_value) 1147 _span.extend(payload or self.payload or []) 1148 return bytes(_span) 1149 1150 @property 1151 def size(self) -> int: 1152 return len(self.payload) + 1 1153 1154@dataclass 1155class DataPacket(Packet): 1156 dpf: DataPacketFormat = field(kw_only=True, default=DataPacketFormat.DATA_SND) 1157 pbf: PacketBoundaryFlag = field(kw_only=True, default=PacketBoundaryFlag.COMPLETE) 1158 mt: MessageType = field(kw_only=True, default=MessageType.DATA) 1159 1160 def __post_init__(self): 1161 pass 1162 1163 @staticmethod 1164 def parse(span: bytes) -> Tuple['DataPacket', bytes]: 1165 fields = {'payload': None} 1166 if len(span) < 4: 1167 raise Exception('Invalid packet size') 1168 fields['dpf'] = DataPacketFormat.from_int((span[0] >> 0) & 0xf) 1169 fields['pbf'] = PacketBoundaryFlag.from_int((span[0] >> 4) & 0x1) 1170 fields['mt'] = MessageType.from_int((span[0] >> 5) & 0x7) 1171 value_ = int.from_bytes(span[2:4], byteorder='little') 1172 span = span[4:] 1173 payload = span 1174 span = bytes([]) 1175 fields['payload'] = payload 1176 try: 1177 return DataMessageSnd.parse(fields.copy(), payload) 1178 except Exception as exn: 1179 pass 1180 try: 1181 return DataMessageRcv.parse(fields.copy(), payload) 1182 except Exception as exn: 1183 pass 1184 return DataPacket(**fields), span 1185 1186 def serialize(self, payload: bytes = None) -> bytes: 1187 _span = bytearray() 1188 _value = ( 1189 (self.dpf << 0) | 1190 (self.pbf << 4) | 1191 (self.mt << 5) 1192 ) 1193 _span.append(_value) 1194 _span.extend([0] * 1) 1195 _span.extend([0] * 2) 1196 _span.extend(payload or self.payload or []) 1197 return bytes(_span) 1198 1199 @property 1200 def size(self) -> int: 1201 return len(self.payload) + 4 1202 1203@dataclass 1204class DataMessageSnd(DataPacket): 1205 session_handle: int = field(kw_only=True, default=0) 1206 destination_address: int = field(kw_only=True, default=0) 1207 data_sequence_number: int = field(kw_only=True, default=0) 1208 application_data: bytearray = field(kw_only=True, default_factory=bytearray) 1209 1210 def __post_init__(self): 1211 self.dpf = DataPacketFormat.DATA_SND 1212 self.mt = MessageType.DATA 1213 1214 @staticmethod 1215 def parse(fields: dict, span: bytes) -> Tuple['DataMessageSnd', bytes]: 1216 if fields['dpf'] != DataPacketFormat.DATA_SND or fields['mt'] != MessageType.DATA: 1217 raise Exception("Invalid constraint field values") 1218 if len(span) < 16: 1219 raise Exception('Invalid packet size') 1220 value_ = int.from_bytes(span[0:4], byteorder='little') 1221 fields['session_handle'] = value_ 1222 value_ = int.from_bytes(span[4:12], byteorder='little') 1223 fields['destination_address'] = value_ 1224 value_ = int.from_bytes(span[12:14], byteorder='little') 1225 fields['data_sequence_number'] = value_ 1226 value_ = int.from_bytes(span[14:16], byteorder='little') 1227 application_data_size = value_ 1228 span = span[16:] 1229 if len(span) < application_data_size: 1230 raise Exception('Invalid packet size') 1231 fields['application_data'] = list(span[:application_data_size]) 1232 span = span[application_data_size:] 1233 return DataMessageSnd(**fields), span 1234 1235 def serialize(self, payload: bytes = None) -> bytes: 1236 _span = bytearray() 1237 if self.session_handle > 4294967295: 1238 print(f"Invalid value for field DataMessageSnd::session_handle: {self.session_handle} > 4294967295; the value will be truncated") 1239 self.session_handle &= 4294967295 1240 _span.extend(int.to_bytes((self.session_handle << 0), length=4, byteorder='little')) 1241 if self.destination_address > 18446744073709551615: 1242 print(f"Invalid value for field DataMessageSnd::destination_address: {self.destination_address} > 18446744073709551615; the value will be truncated") 1243 self.destination_address &= 18446744073709551615 1244 _span.extend(int.to_bytes((self.destination_address << 0), length=8, byteorder='little')) 1245 if self.data_sequence_number > 65535: 1246 print(f"Invalid value for field DataMessageSnd::data_sequence_number: {self.data_sequence_number} > 65535; the value will be truncated") 1247 self.data_sequence_number &= 65535 1248 _span.extend(int.to_bytes((self.data_sequence_number << 0), length=2, byteorder='little')) 1249 _span.extend(int.to_bytes(((len(self.application_data) * 1) << 0), length=2, byteorder='little')) 1250 _span.extend(self.application_data) 1251 return DataPacket.serialize(self, payload = bytes(_span)) 1252 1253 @property 1254 def size(self) -> int: 1255 return len(self.application_data) * 1 + 16 1256 1257@dataclass 1258class DataMessageRcv(DataPacket): 1259 session_handle: int = field(kw_only=True, default=0) 1260 status: Status = field(kw_only=True, default=Status.OK) 1261 source_address: int = field(kw_only=True, default=0) 1262 data_sequence_number: int = field(kw_only=True, default=0) 1263 application_data: bytearray = field(kw_only=True, default_factory=bytearray) 1264 1265 def __post_init__(self): 1266 self.dpf = DataPacketFormat.DATA_RCV 1267 self.mt = MessageType.DATA 1268 1269 @staticmethod 1270 def parse(fields: dict, span: bytes) -> Tuple['DataMessageRcv', bytes]: 1271 if fields['dpf'] != DataPacketFormat.DATA_RCV or fields['mt'] != MessageType.DATA: 1272 raise Exception("Invalid constraint field values") 1273 if len(span) < 17: 1274 raise Exception('Invalid packet size') 1275 value_ = int.from_bytes(span[0:4], byteorder='little') 1276 fields['session_handle'] = value_ 1277 fields['status'] = Status.from_int(span[4]) 1278 value_ = int.from_bytes(span[5:13], byteorder='little') 1279 fields['source_address'] = value_ 1280 value_ = int.from_bytes(span[13:15], byteorder='little') 1281 fields['data_sequence_number'] = value_ 1282 value_ = int.from_bytes(span[15:17], byteorder='little') 1283 application_data_size = value_ 1284 span = span[17:] 1285 if len(span) < application_data_size: 1286 raise Exception('Invalid packet size') 1287 fields['application_data'] = list(span[:application_data_size]) 1288 span = span[application_data_size:] 1289 return DataMessageRcv(**fields), span 1290 1291 def serialize(self, payload: bytes = None) -> bytes: 1292 _span = bytearray() 1293 if self.session_handle > 4294967295: 1294 print(f"Invalid value for field DataMessageRcv::session_handle: {self.session_handle} > 4294967295; the value will be truncated") 1295 self.session_handle &= 4294967295 1296 _span.extend(int.to_bytes((self.session_handle << 0), length=4, byteorder='little')) 1297 _span.append((self.status << 0)) 1298 if self.source_address > 18446744073709551615: 1299 print(f"Invalid value for field DataMessageRcv::source_address: {self.source_address} > 18446744073709551615; the value will be truncated") 1300 self.source_address &= 18446744073709551615 1301 _span.extend(int.to_bytes((self.source_address << 0), length=8, byteorder='little')) 1302 if self.data_sequence_number > 65535: 1303 print(f"Invalid value for field DataMessageRcv::data_sequence_number: {self.data_sequence_number} > 65535; the value will be truncated") 1304 self.data_sequence_number &= 65535 1305 _span.extend(int.to_bytes((self.data_sequence_number << 0), length=2, byteorder='little')) 1306 _span.extend(int.to_bytes(((len(self.application_data) * 1) << 0), length=2, byteorder='little')) 1307 _span.extend(self.application_data) 1308 return DataPacket.serialize(self, payload = bytes(_span)) 1309 1310 @property 1311 def size(self) -> int: 1312 return len(self.application_data) * 1 + 17 1313 1314@dataclass 1315class CorePacket(ControlPacket): 1316 oid: CoreOpcodeId = field(kw_only=True, default=CoreOpcodeId.DEVICE_RESET) 1317 1318 def __post_init__(self): 1319 self.gid = GroupId.CORE 1320 1321 @staticmethod 1322 def parse(fields: dict, span: bytes) -> Tuple['CorePacket', bytes]: 1323 if fields['gid'] != GroupId.CORE: 1324 raise Exception("Invalid constraint field values") 1325 if len(span) < 3: 1326 raise Exception('Invalid packet size') 1327 fields['oid'] = CoreOpcodeId.from_int((span[0] >> 0) & 0x3f) 1328 value_ = int.from_bytes(span[1:3], byteorder='little') 1329 span = span[3:] 1330 payload = span 1331 span = bytes([]) 1332 fields['payload'] = payload 1333 try: 1334 return CoreDeviceResetCmd.parse(fields.copy(), payload) 1335 except Exception as exn: 1336 pass 1337 try: 1338 return CoreDeviceResetRsp.parse(fields.copy(), payload) 1339 except Exception as exn: 1340 pass 1341 try: 1342 return CoreDeviceStatusNtf.parse(fields.copy(), payload) 1343 except Exception as exn: 1344 pass 1345 try: 1346 return CoreGetDeviceInfoCmd.parse(fields.copy(), payload) 1347 except Exception as exn: 1348 pass 1349 try: 1350 return CoreGetDeviceInfoRsp.parse(fields.copy(), payload) 1351 except Exception as exn: 1352 pass 1353 try: 1354 return CoreGetCapsInfoCmd.parse(fields.copy(), payload) 1355 except Exception as exn: 1356 pass 1357 try: 1358 return CoreGetCapsInfoRsp.parse(fields.copy(), payload) 1359 except Exception as exn: 1360 pass 1361 try: 1362 return CoreSetConfigCmd.parse(fields.copy(), payload) 1363 except Exception as exn: 1364 pass 1365 try: 1366 return CoreSetConfigRsp.parse(fields.copy(), payload) 1367 except Exception as exn: 1368 pass 1369 try: 1370 return CoreGetConfigCmd.parse(fields.copy(), payload) 1371 except Exception as exn: 1372 pass 1373 try: 1374 return CoreGetConfigRsp.parse(fields.copy(), payload) 1375 except Exception as exn: 1376 pass 1377 try: 1378 return CoreGenericErrorNtf.parse(fields.copy(), payload) 1379 except Exception as exn: 1380 pass 1381 try: 1382 return CoreQueryTimeStampCmd.parse(fields.copy(), payload) 1383 except Exception as exn: 1384 pass 1385 try: 1386 return CoreQueryTimeStampRsp.parse(fields.copy(), payload) 1387 except Exception as exn: 1388 pass 1389 return CorePacket(**fields), span 1390 1391 def serialize(self, payload: bytes = None) -> bytes: 1392 _span = bytearray() 1393 _span.append((self.oid << 0)) 1394 _span.extend([0] * 2) 1395 _span.extend(payload or self.payload or []) 1396 return ControlPacket.serialize(self, payload = bytes(_span)) 1397 1398 @property 1399 def size(self) -> int: 1400 return len(self.payload) + 3 1401 1402@dataclass 1403class SessionConfigPacket(ControlPacket): 1404 oid: SessionConfigOpcodeId = field(kw_only=True, default=SessionConfigOpcodeId.INIT) 1405 1406 def __post_init__(self): 1407 self.gid = GroupId.SESSION_CONFIG 1408 1409 @staticmethod 1410 def parse(fields: dict, span: bytes) -> Tuple['SessionConfigPacket', bytes]: 1411 if fields['gid'] != GroupId.SESSION_CONFIG: 1412 raise Exception("Invalid constraint field values") 1413 if len(span) < 3: 1414 raise Exception('Invalid packet size') 1415 fields['oid'] = SessionConfigOpcodeId.from_int((span[0] >> 0) & 0x3f) 1416 value_ = int.from_bytes(span[1:3], byteorder='little') 1417 span = span[3:] 1418 payload = span 1419 span = bytes([]) 1420 fields['payload'] = payload 1421 try: 1422 return SessionInitCmd.parse(fields.copy(), payload) 1423 except Exception as exn: 1424 pass 1425 try: 1426 return SessionInitRsp_V2.parse(fields.copy(), payload) 1427 except Exception as exn: 1428 pass 1429 try: 1430 return SessionInitRsp.parse(fields.copy(), payload) 1431 except Exception as exn: 1432 pass 1433 try: 1434 return SessionDeinitCmd.parse(fields.copy(), payload) 1435 except Exception as exn: 1436 pass 1437 try: 1438 return SessionDeinitRsp.parse(fields.copy(), payload) 1439 except Exception as exn: 1440 pass 1441 try: 1442 return SessionStatusNtf.parse(fields.copy(), payload) 1443 except Exception as exn: 1444 pass 1445 try: 1446 return SessionSetAppConfigCmd.parse(fields.copy(), payload) 1447 except Exception as exn: 1448 pass 1449 try: 1450 return SessionSetAppConfigRsp.parse(fields.copy(), payload) 1451 except Exception as exn: 1452 pass 1453 try: 1454 return SessionGetAppConfigCmd.parse(fields.copy(), payload) 1455 except Exception as exn: 1456 pass 1457 try: 1458 return SessionGetAppConfigRsp.parse(fields.copy(), payload) 1459 except Exception as exn: 1460 pass 1461 try: 1462 return SessionGetCountCmd.parse(fields.copy(), payload) 1463 except Exception as exn: 1464 pass 1465 try: 1466 return SessionGetCountRsp.parse(fields.copy(), payload) 1467 except Exception as exn: 1468 pass 1469 try: 1470 return SessionGetStateCmd.parse(fields.copy(), payload) 1471 except Exception as exn: 1472 pass 1473 try: 1474 return SessionGetStateRsp.parse(fields.copy(), payload) 1475 except Exception as exn: 1476 pass 1477 try: 1478 return SessionUpdateDtAnchorRangingRoundsCmd.parse(fields.copy(), payload) 1479 except Exception as exn: 1480 pass 1481 try: 1482 return SessionUpdateDtAnchorRangingRoundsRsp.parse(fields.copy(), payload) 1483 except Exception as exn: 1484 pass 1485 try: 1486 return SessionUpdateDtTagRangingRoundsCmd.parse(fields.copy(), payload) 1487 except Exception as exn: 1488 pass 1489 try: 1490 return SessionUpdateDtTagRangingRoundsRsp.parse(fields.copy(), payload) 1491 except Exception as exn: 1492 pass 1493 try: 1494 return SessionUpdateControllerMulticastListCmd.parse(fields.copy(), payload) 1495 except Exception as exn: 1496 pass 1497 try: 1498 return SessionUpdateControllerMulticastListRsp.parse(fields.copy(), payload) 1499 except Exception as exn: 1500 pass 1501 try: 1502 return SessionUpdateControllerMulticastListNtf.parse(fields.copy(), payload) 1503 except Exception as exn: 1504 pass 1505 try: 1506 return SessionQueryMaxDataSizeInRangingCmd.parse(fields.copy(), payload) 1507 except Exception as exn: 1508 pass 1509 try: 1510 return SessionQueryMaxDataSizeInRangingRsp.parse(fields.copy(), payload) 1511 except Exception as exn: 1512 pass 1513 return SessionConfigPacket(**fields), span 1514 1515 def serialize(self, payload: bytes = None) -> bytes: 1516 _span = bytearray() 1517 _span.append((self.oid << 0)) 1518 _span.extend([0] * 2) 1519 _span.extend(payload or self.payload or []) 1520 return ControlPacket.serialize(self, payload = bytes(_span)) 1521 1522 @property 1523 def size(self) -> int: 1524 return len(self.payload) + 3 1525 1526@dataclass 1527class SessionControlPacket(ControlPacket): 1528 oid: SessionControlOpcodeId = field(kw_only=True, default=SessionControlOpcodeId.START) 1529 1530 def __post_init__(self): 1531 self.gid = GroupId.SESSION_CONTROL 1532 1533 @staticmethod 1534 def parse(fields: dict, span: bytes) -> Tuple['SessionControlPacket', bytes]: 1535 if fields['gid'] != GroupId.SESSION_CONTROL: 1536 raise Exception("Invalid constraint field values") 1537 if len(span) < 3: 1538 raise Exception('Invalid packet size') 1539 fields['oid'] = SessionControlOpcodeId.from_int((span[0] >> 0) & 0x3f) 1540 value_ = int.from_bytes(span[1:3], byteorder='little') 1541 span = span[3:] 1542 payload = span 1543 span = bytes([]) 1544 fields['payload'] = payload 1545 try: 1546 return SessionDataCreditNtf.parse(fields.copy(), payload) 1547 except Exception as exn: 1548 pass 1549 try: 1550 return SessionDataTransferStatusNtf.parse(fields.copy(), payload) 1551 except Exception as exn: 1552 pass 1553 try: 1554 return SessionStartCmd.parse(fields.copy(), payload) 1555 except Exception as exn: 1556 pass 1557 try: 1558 return SessionStartRsp.parse(fields.copy(), payload) 1559 except Exception as exn: 1560 pass 1561 try: 1562 return SessionInfoNtf.parse(fields.copy(), payload) 1563 except Exception as exn: 1564 pass 1565 try: 1566 return SessionStopCmd.parse(fields.copy(), payload) 1567 except Exception as exn: 1568 pass 1569 try: 1570 return SessionStopRsp.parse(fields.copy(), payload) 1571 except Exception as exn: 1572 pass 1573 try: 1574 return SessionGetRangingCountCmd.parse(fields.copy(), payload) 1575 except Exception as exn: 1576 pass 1577 try: 1578 return SessionGetRangingCountRsp.parse(fields.copy(), payload) 1579 except Exception as exn: 1580 pass 1581 return SessionControlPacket(**fields), span 1582 1583 def serialize(self, payload: bytes = None) -> bytes: 1584 _span = bytearray() 1585 _span.append((self.oid << 0)) 1586 _span.extend([0] * 2) 1587 _span.extend(payload or self.payload or []) 1588 return ControlPacket.serialize(self, payload = bytes(_span)) 1589 1590 @property 1591 def size(self) -> int: 1592 return len(self.payload) + 3 1593 1594@dataclass 1595class AndroidPacket(ControlPacket): 1596 oid: AndroidOpcodeId = field(kw_only=True, default=AndroidOpcodeId.GET_POWER_STATS) 1597 1598 def __post_init__(self): 1599 self.gid = GroupId.VENDOR_ANDROID 1600 1601 @staticmethod 1602 def parse(fields: dict, span: bytes) -> Tuple['AndroidPacket', bytes]: 1603 if fields['gid'] != GroupId.VENDOR_ANDROID: 1604 raise Exception("Invalid constraint field values") 1605 if len(span) < 3: 1606 raise Exception('Invalid packet size') 1607 fields['oid'] = AndroidOpcodeId.from_int((span[0] >> 0) & 0x3f) 1608 value_ = int.from_bytes(span[1:3], byteorder='little') 1609 span = span[3:] 1610 payload = span 1611 span = bytes([]) 1612 fields['payload'] = payload 1613 try: 1614 return AndroidGetPowerStatsCmd.parse(fields.copy(), payload) 1615 except Exception as exn: 1616 pass 1617 try: 1618 return AndroidGetPowerStatsRsp.parse(fields.copy(), payload) 1619 except Exception as exn: 1620 pass 1621 try: 1622 return AndroidSetCountryCodeCmd.parse(fields.copy(), payload) 1623 except Exception as exn: 1624 pass 1625 try: 1626 return AndroidSetCountryCodeRsp.parse(fields.copy(), payload) 1627 except Exception as exn: 1628 pass 1629 try: 1630 return AndroidRangeDiagnosticsNtf.parse(fields.copy(), payload) 1631 except Exception as exn: 1632 pass 1633 return AndroidPacket(**fields), span 1634 1635 def serialize(self, payload: bytes = None) -> bytes: 1636 _span = bytearray() 1637 _span.append((self.oid << 0)) 1638 _span.extend([0] * 2) 1639 _span.extend(payload or self.payload or []) 1640 return ControlPacket.serialize(self, payload = bytes(_span)) 1641 1642 @property 1643 def size(self) -> int: 1644 return len(self.payload) + 3 1645 1646@dataclass 1647class CoreDeviceResetCmd(CorePacket): 1648 reset_config: ResetConfig = field(kw_only=True, default=ResetConfig.UWBS_RESET) 1649 1650 def __post_init__(self): 1651 self.mt = MessageType.COMMAND 1652 self.oid = CoreOpcodeId.DEVICE_RESET 1653 self.gid = GroupId.CORE 1654 1655 @staticmethod 1656 def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceResetCmd', bytes]: 1657 if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.DEVICE_RESET or fields['gid'] != GroupId.CORE: 1658 raise Exception("Invalid constraint field values") 1659 if len(span) < 1: 1660 raise Exception('Invalid packet size') 1661 fields['reset_config'] = ResetConfig.from_int(span[0]) 1662 span = span[1:] 1663 return CoreDeviceResetCmd(**fields), span 1664 1665 def serialize(self, payload: bytes = None) -> bytes: 1666 _span = bytearray() 1667 _span.append((self.reset_config << 0)) 1668 return CorePacket.serialize(self, payload = bytes(_span)) 1669 1670 @property 1671 def size(self) -> int: 1672 return 1 1673 1674@dataclass 1675class CoreDeviceResetRsp(CorePacket): 1676 status: Status = field(kw_only=True, default=Status.OK) 1677 1678 def __post_init__(self): 1679 self.mt = MessageType.RESPONSE 1680 self.oid = CoreOpcodeId.DEVICE_RESET 1681 self.gid = GroupId.CORE 1682 1683 @staticmethod 1684 def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceResetRsp', bytes]: 1685 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.DEVICE_RESET or fields['gid'] != GroupId.CORE: 1686 raise Exception("Invalid constraint field values") 1687 if len(span) < 1: 1688 raise Exception('Invalid packet size') 1689 fields['status'] = Status.from_int(span[0]) 1690 span = span[1:] 1691 return CoreDeviceResetRsp(**fields), span 1692 1693 def serialize(self, payload: bytes = None) -> bytes: 1694 _span = bytearray() 1695 _span.append((self.status << 0)) 1696 return CorePacket.serialize(self, payload = bytes(_span)) 1697 1698 @property 1699 def size(self) -> int: 1700 return 1 1701 1702@dataclass 1703class CoreDeviceStatusNtf(CorePacket): 1704 device_state: DeviceState = field(kw_only=True, default=DeviceState.DEVICE_STATE_READY) 1705 1706 def __post_init__(self): 1707 self.mt = MessageType.NOTIFICATION 1708 self.oid = CoreOpcodeId.DEVICE_STATUS 1709 self.gid = GroupId.CORE 1710 1711 @staticmethod 1712 def parse(fields: dict, span: bytes) -> Tuple['CoreDeviceStatusNtf', bytes]: 1713 if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != CoreOpcodeId.DEVICE_STATUS or fields['gid'] != GroupId.CORE: 1714 raise Exception("Invalid constraint field values") 1715 if len(span) < 1: 1716 raise Exception('Invalid packet size') 1717 fields['device_state'] = DeviceState.from_int(span[0]) 1718 span = span[1:] 1719 return CoreDeviceStatusNtf(**fields), span 1720 1721 def serialize(self, payload: bytes = None) -> bytes: 1722 _span = bytearray() 1723 _span.append((self.device_state << 0)) 1724 return CorePacket.serialize(self, payload = bytes(_span)) 1725 1726 @property 1727 def size(self) -> int: 1728 return 1 1729 1730@dataclass 1731class CoreGetDeviceInfoCmd(CorePacket): 1732 1733 1734 def __post_init__(self): 1735 self.mt = MessageType.COMMAND 1736 self.oid = CoreOpcodeId.GET_DEVICE_INFO 1737 self.gid = GroupId.CORE 1738 1739 @staticmethod 1740 def parse(fields: dict, span: bytes) -> Tuple['CoreGetDeviceInfoCmd', bytes]: 1741 if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_DEVICE_INFO or fields['gid'] != GroupId.CORE: 1742 raise Exception("Invalid constraint field values") 1743 return CoreGetDeviceInfoCmd(**fields), span 1744 1745 def serialize(self, payload: bytes = None) -> bytes: 1746 _span = bytearray() 1747 return CorePacket.serialize(self, payload = bytes(_span)) 1748 1749 @property 1750 def size(self) -> int: 1751 return 0 1752 1753@dataclass 1754class CoreGetDeviceInfoRsp(CorePacket): 1755 status: Status = field(kw_only=True, default=Status.OK) 1756 uci_version: int = field(kw_only=True, default=0) 1757 mac_version: int = field(kw_only=True, default=0) 1758 phy_version: int = field(kw_only=True, default=0) 1759 uci_test_version: int = field(kw_only=True, default=0) 1760 vendor_spec_info: bytearray = field(kw_only=True, default_factory=bytearray) 1761 1762 def __post_init__(self): 1763 self.mt = MessageType.RESPONSE 1764 self.oid = CoreOpcodeId.GET_DEVICE_INFO 1765 self.gid = GroupId.CORE 1766 1767 @staticmethod 1768 def parse(fields: dict, span: bytes) -> Tuple['CoreGetDeviceInfoRsp', bytes]: 1769 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_DEVICE_INFO or fields['gid'] != GroupId.CORE: 1770 raise Exception("Invalid constraint field values") 1771 if len(span) < 10: 1772 raise Exception('Invalid packet size') 1773 fields['status'] = Status.from_int(span[0]) 1774 value_ = int.from_bytes(span[1:3], byteorder='little') 1775 fields['uci_version'] = value_ 1776 value_ = int.from_bytes(span[3:5], byteorder='little') 1777 fields['mac_version'] = value_ 1778 value_ = int.from_bytes(span[5:7], byteorder='little') 1779 fields['phy_version'] = value_ 1780 value_ = int.from_bytes(span[7:9], byteorder='little') 1781 fields['uci_test_version'] = value_ 1782 vendor_spec_info_count = span[9] 1783 span = span[10:] 1784 if len(span) < vendor_spec_info_count: 1785 raise Exception('Invalid packet size') 1786 fields['vendor_spec_info'] = list(span[:vendor_spec_info_count]) 1787 span = span[vendor_spec_info_count:] 1788 return CoreGetDeviceInfoRsp(**fields), span 1789 1790 def serialize(self, payload: bytes = None) -> bytes: 1791 _span = bytearray() 1792 _span.append((self.status << 0)) 1793 if self.uci_version > 65535: 1794 print(f"Invalid value for field CoreGetDeviceInfoRsp::uci_version: {self.uci_version} > 65535; the value will be truncated") 1795 self.uci_version &= 65535 1796 _span.extend(int.to_bytes((self.uci_version << 0), length=2, byteorder='little')) 1797 if self.mac_version > 65535: 1798 print(f"Invalid value for field CoreGetDeviceInfoRsp::mac_version: {self.mac_version} > 65535; the value will be truncated") 1799 self.mac_version &= 65535 1800 _span.extend(int.to_bytes((self.mac_version << 0), length=2, byteorder='little')) 1801 if self.phy_version > 65535: 1802 print(f"Invalid value for field CoreGetDeviceInfoRsp::phy_version: {self.phy_version} > 65535; the value will be truncated") 1803 self.phy_version &= 65535 1804 _span.extend(int.to_bytes((self.phy_version << 0), length=2, byteorder='little')) 1805 if self.uci_test_version > 65535: 1806 print(f"Invalid value for field CoreGetDeviceInfoRsp::uci_test_version: {self.uci_test_version} > 65535; the value will be truncated") 1807 self.uci_test_version &= 65535 1808 _span.extend(int.to_bytes((self.uci_test_version << 0), length=2, byteorder='little')) 1809 if len(self.vendor_spec_info) > 255: 1810 print(f"Invalid length for field CoreGetDeviceInfoRsp::vendor_spec_info: {len(self.vendor_spec_info)} > 255; the array will be truncated") 1811 del self.vendor_spec_info[255:] 1812 _span.append((len(self.vendor_spec_info) << 0)) 1813 _span.extend(self.vendor_spec_info) 1814 return CorePacket.serialize(self, payload = bytes(_span)) 1815 1816 @property 1817 def size(self) -> int: 1818 return len(self.vendor_spec_info) * 1 + 10 1819 1820@dataclass 1821class CoreGetCapsInfoCmd(CorePacket): 1822 1823 1824 def __post_init__(self): 1825 self.mt = MessageType.COMMAND 1826 self.oid = CoreOpcodeId.GET_CAPS_INFO 1827 self.gid = GroupId.CORE 1828 1829 @staticmethod 1830 def parse(fields: dict, span: bytes) -> Tuple['CoreGetCapsInfoCmd', bytes]: 1831 if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_CAPS_INFO or fields['gid'] != GroupId.CORE: 1832 raise Exception("Invalid constraint field values") 1833 return CoreGetCapsInfoCmd(**fields), span 1834 1835 def serialize(self, payload: bytes = None) -> bytes: 1836 _span = bytearray() 1837 return CorePacket.serialize(self, payload = bytes(_span)) 1838 1839 @property 1840 def size(self) -> int: 1841 return 0 1842 1843@dataclass 1844class CapTlv(Packet): 1845 t: CapTlvType = field(kw_only=True, default=CapTlvType.SUPPORTED_FIRA_PHY_VERSION_RANGE) 1846 v: bytearray = field(kw_only=True, default_factory=bytearray) 1847 1848 def __post_init__(self): 1849 pass 1850 1851 @staticmethod 1852 def parse(span: bytes) -> Tuple['CapTlv', bytes]: 1853 fields = {'payload': None} 1854 if len(span) < 2: 1855 raise Exception('Invalid packet size') 1856 fields['t'] = CapTlvType.from_int(span[0]) 1857 v_count = span[1] 1858 span = span[2:] 1859 if len(span) < v_count: 1860 raise Exception('Invalid packet size') 1861 fields['v'] = list(span[:v_count]) 1862 span = span[v_count:] 1863 return CapTlv(**fields), span 1864 1865 def serialize(self, payload: bytes = None) -> bytes: 1866 _span = bytearray() 1867 _span.append((self.t << 0)) 1868 if len(self.v) > 255: 1869 print(f"Invalid length for field CapTlv::v: {len(self.v)} > 255; the array will be truncated") 1870 del self.v[255:] 1871 _span.append((len(self.v) << 0)) 1872 _span.extend(self.v) 1873 return bytes(_span) 1874 1875 @property 1876 def size(self) -> int: 1877 return len(self.v) * 1 + 2 1878 1879@dataclass 1880class CoreGetCapsInfoRsp(CorePacket): 1881 status: Status = field(kw_only=True, default=Status.OK) 1882 tlvs: List[CapTlv] = field(kw_only=True, default_factory=list) 1883 1884 def __post_init__(self): 1885 self.mt = MessageType.RESPONSE 1886 self.oid = CoreOpcodeId.GET_CAPS_INFO 1887 self.gid = GroupId.CORE 1888 1889 @staticmethod 1890 def parse(fields: dict, span: bytes) -> Tuple['CoreGetCapsInfoRsp', bytes]: 1891 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_CAPS_INFO or fields['gid'] != GroupId.CORE: 1892 raise Exception("Invalid constraint field values") 1893 if len(span) < 2: 1894 raise Exception('Invalid packet size') 1895 fields['status'] = Status.from_int(span[0]) 1896 tlvs_count = span[1] 1897 span = span[2:] 1898 tlvs = [] 1899 for n in range(tlvs_count): 1900 element, span = CapTlv.parse(span) 1901 tlvs.append(element) 1902 fields['tlvs'] = tlvs 1903 return CoreGetCapsInfoRsp(**fields), span 1904 1905 def serialize(self, payload: bytes = None) -> bytes: 1906 _span = bytearray() 1907 _span.append((self.status << 0)) 1908 if len(self.tlvs) > 255: 1909 print(f"Invalid length for field CoreGetCapsInfoRsp::tlvs: {len(self.tlvs)} > 255; the array will be truncated") 1910 del self.tlvs[255:] 1911 _span.append((len(self.tlvs) << 0)) 1912 for _elt in self.tlvs: 1913 _span.extend(_elt.serialize()) 1914 return CorePacket.serialize(self, payload = bytes(_span)) 1915 1916 @property 1917 def size(self) -> int: 1918 return sum([elt.size for elt in self.tlvs]) + 2 1919 1920class ConfigParameterId(enum.IntEnum): 1921 DEVICE_STATE = 0x0 1922 LOW_POWER_MODE = 0x1 1923 1924 @staticmethod 1925 def from_int(v: int) -> Union[int, 'ConfigParameterId']: 1926 try: 1927 return ConfigParameterId(v) 1928 except ValueError as exn: 1929 return v 1930 1931 1932@dataclass 1933class ConfigParameter(Packet): 1934 id: ConfigParameterId = field(kw_only=True, default=ConfigParameterId.DEVICE_STATE) 1935 value: bytearray = field(kw_only=True, default_factory=bytearray) 1936 1937 def __post_init__(self): 1938 pass 1939 1940 @staticmethod 1941 def parse(span: bytes) -> Tuple['ConfigParameter', bytes]: 1942 fields = {'payload': None} 1943 if len(span) < 2: 1944 raise Exception('Invalid packet size') 1945 fields['id'] = ConfigParameterId.from_int(span[0]) 1946 value_size = span[1] 1947 span = span[2:] 1948 if len(span) < value_size: 1949 raise Exception('Invalid packet size') 1950 fields['value'] = list(span[:value_size]) 1951 span = span[value_size:] 1952 return ConfigParameter(**fields), span 1953 1954 def serialize(self, payload: bytes = None) -> bytes: 1955 _span = bytearray() 1956 _span.append((self.id << 0)) 1957 _span.append(((len(self.value) * 1) << 0)) 1958 _span.extend(self.value) 1959 return bytes(_span) 1960 1961 @property 1962 def size(self) -> int: 1963 return len(self.value) * 1 + 2 1964 1965@dataclass 1966class CoreSetConfigCmd(CorePacket): 1967 parameters: List[ConfigParameter] = field(kw_only=True, default_factory=list) 1968 1969 def __post_init__(self): 1970 self.mt = MessageType.COMMAND 1971 self.oid = CoreOpcodeId.SET_CONFIG 1972 self.gid = GroupId.CORE 1973 1974 @staticmethod 1975 def parse(fields: dict, span: bytes) -> Tuple['CoreSetConfigCmd', bytes]: 1976 if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.SET_CONFIG or fields['gid'] != GroupId.CORE: 1977 raise Exception("Invalid constraint field values") 1978 if len(span) < 1: 1979 raise Exception('Invalid packet size') 1980 parameters_count = span[0] 1981 span = span[1:] 1982 parameters = [] 1983 for n in range(parameters_count): 1984 element, span = ConfigParameter.parse(span) 1985 parameters.append(element) 1986 fields['parameters'] = parameters 1987 return CoreSetConfigCmd(**fields), span 1988 1989 def serialize(self, payload: bytes = None) -> bytes: 1990 _span = bytearray() 1991 if len(self.parameters) > 255: 1992 print(f"Invalid length for field CoreSetConfigCmd::parameters: {len(self.parameters)} > 255; the array will be truncated") 1993 del self.parameters[255:] 1994 _span.append((len(self.parameters) << 0)) 1995 for _elt in self.parameters: 1996 _span.extend(_elt.serialize()) 1997 return CorePacket.serialize(self, payload = bytes(_span)) 1998 1999 @property 2000 def size(self) -> int: 2001 return sum([elt.size for elt in self.parameters]) + 1 2002 2003@dataclass 2004class ConfigParameterStatus(Packet): 2005 id: ConfigParameterId = field(kw_only=True, default=ConfigParameterId.DEVICE_STATE) 2006 status: Status = field(kw_only=True, default=Status.OK) 2007 2008 def __post_init__(self): 2009 pass 2010 2011 @staticmethod 2012 def parse(span: bytes) -> Tuple['ConfigParameterStatus', bytes]: 2013 fields = {'payload': None} 2014 if len(span) < 2: 2015 raise Exception('Invalid packet size') 2016 fields['id'] = ConfigParameterId.from_int(span[0]) 2017 fields['status'] = Status.from_int(span[1]) 2018 span = span[2:] 2019 return ConfigParameterStatus(**fields), span 2020 2021 def serialize(self, payload: bytes = None) -> bytes: 2022 _span = bytearray() 2023 _span.append((self.id << 0)) 2024 _span.append((self.status << 0)) 2025 return bytes(_span) 2026 2027 @property 2028 def size(self) -> int: 2029 return 2 2030 2031@dataclass 2032class CoreSetConfigRsp(CorePacket): 2033 status: Status = field(kw_only=True, default=Status.OK) 2034 parameters: List[ConfigParameterStatus] = field(kw_only=True, default_factory=list) 2035 2036 def __post_init__(self): 2037 self.mt = MessageType.RESPONSE 2038 self.oid = CoreOpcodeId.SET_CONFIG 2039 self.gid = GroupId.CORE 2040 2041 @staticmethod 2042 def parse(fields: dict, span: bytes) -> Tuple['CoreSetConfigRsp', bytes]: 2043 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.SET_CONFIG or fields['gid'] != GroupId.CORE: 2044 raise Exception("Invalid constraint field values") 2045 if len(span) < 2: 2046 raise Exception('Invalid packet size') 2047 fields['status'] = Status.from_int(span[0]) 2048 parameters_count = span[1] 2049 span = span[2:] 2050 if len(span) < parameters_count * 2: 2051 raise Exception('Invalid packet size') 2052 parameters = [] 2053 for n in range(parameters_count): 2054 parameters.append(ConfigParameterStatus.parse_all(span[n * 2:(n + 1) * 2])) 2055 fields['parameters'] = parameters 2056 span = span[parameters_count * 2:] 2057 return CoreSetConfigRsp(**fields), span 2058 2059 def serialize(self, payload: bytes = None) -> bytes: 2060 _span = bytearray() 2061 _span.append((self.status << 0)) 2062 if len(self.parameters) > 255: 2063 print(f"Invalid length for field CoreSetConfigRsp::parameters: {len(self.parameters)} > 255; the array will be truncated") 2064 del self.parameters[255:] 2065 _span.append((len(self.parameters) << 0)) 2066 for _elt in self.parameters: 2067 _span.extend(_elt.serialize()) 2068 return CorePacket.serialize(self, payload = bytes(_span)) 2069 2070 @property 2071 def size(self) -> int: 2072 return sum([elt.size for elt in self.parameters]) + 2 2073 2074@dataclass 2075class CoreGetConfigCmd(CorePacket): 2076 parameter_ids: List[ConfigParameterId] = field(kw_only=True, default_factory=list) 2077 2078 def __post_init__(self): 2079 self.mt = MessageType.COMMAND 2080 self.oid = CoreOpcodeId.GET_CONFIG 2081 self.gid = GroupId.CORE 2082 2083 @staticmethod 2084 def parse(fields: dict, span: bytes) -> Tuple['CoreGetConfigCmd', bytes]: 2085 if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.GET_CONFIG or fields['gid'] != GroupId.CORE: 2086 raise Exception("Invalid constraint field values") 2087 if len(span) < 1: 2088 raise Exception('Invalid packet size') 2089 parameter_ids_count = span[0] 2090 span = span[1:] 2091 if len(span) < parameter_ids_count: 2092 raise Exception('Invalid packet size') 2093 parameter_ids = [] 2094 for n in range(parameter_ids_count): 2095 parameter_ids.append(ConfigParameterId(int.from_bytes(span[n:n + 1], byteorder='little'))) 2096 fields['parameter_ids'] = parameter_ids 2097 span = span[parameter_ids_count:] 2098 return CoreGetConfigCmd(**fields), span 2099 2100 def serialize(self, payload: bytes = None) -> bytes: 2101 _span = bytearray() 2102 if len(self.parameter_ids) > 255: 2103 print(f"Invalid length for field CoreGetConfigCmd::parameter_ids: {len(self.parameter_ids)} > 255; the array will be truncated") 2104 del self.parameter_ids[255:] 2105 _span.append((len(self.parameter_ids) << 0)) 2106 for _elt in self.parameter_ids: 2107 _span.append(_elt) 2108 return CorePacket.serialize(self, payload = bytes(_span)) 2109 2110 @property 2111 def size(self) -> int: 2112 return len(self.parameter_ids) * 8 + 1 2113 2114@dataclass 2115class CoreGetConfigRsp(CorePacket): 2116 status: Status = field(kw_only=True, default=Status.OK) 2117 parameters: List[ConfigParameter] = field(kw_only=True, default_factory=list) 2118 2119 def __post_init__(self): 2120 self.mt = MessageType.RESPONSE 2121 self.oid = CoreOpcodeId.GET_CONFIG 2122 self.gid = GroupId.CORE 2123 2124 @staticmethod 2125 def parse(fields: dict, span: bytes) -> Tuple['CoreGetConfigRsp', bytes]: 2126 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.GET_CONFIG or fields['gid'] != GroupId.CORE: 2127 raise Exception("Invalid constraint field values") 2128 if len(span) < 2: 2129 raise Exception('Invalid packet size') 2130 fields['status'] = Status.from_int(span[0]) 2131 parameters_count = span[1] 2132 span = span[2:] 2133 parameters = [] 2134 for n in range(parameters_count): 2135 element, span = ConfigParameter.parse(span) 2136 parameters.append(element) 2137 fields['parameters'] = parameters 2138 return CoreGetConfigRsp(**fields), span 2139 2140 def serialize(self, payload: bytes = None) -> bytes: 2141 _span = bytearray() 2142 _span.append((self.status << 0)) 2143 if len(self.parameters) > 255: 2144 print(f"Invalid length for field CoreGetConfigRsp::parameters: {len(self.parameters)} > 255; the array will be truncated") 2145 del self.parameters[255:] 2146 _span.append((len(self.parameters) << 0)) 2147 for _elt in self.parameters: 2148 _span.extend(_elt.serialize()) 2149 return CorePacket.serialize(self, payload = bytes(_span)) 2150 2151 @property 2152 def size(self) -> int: 2153 return sum([elt.size for elt in self.parameters]) + 2 2154 2155@dataclass 2156class CoreGenericErrorNtf(CorePacket): 2157 status: Status = field(kw_only=True, default=Status.OK) 2158 2159 def __post_init__(self): 2160 self.mt = MessageType.NOTIFICATION 2161 self.oid = CoreOpcodeId.GENERIC_ERROR 2162 self.gid = GroupId.CORE 2163 2164 @staticmethod 2165 def parse(fields: dict, span: bytes) -> Tuple['CoreGenericErrorNtf', bytes]: 2166 if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != CoreOpcodeId.GENERIC_ERROR or fields['gid'] != GroupId.CORE: 2167 raise Exception("Invalid constraint field values") 2168 if len(span) < 1: 2169 raise Exception('Invalid packet size') 2170 fields['status'] = Status.from_int(span[0]) 2171 span = span[1:] 2172 return CoreGenericErrorNtf(**fields), span 2173 2174 def serialize(self, payload: bytes = None) -> bytes: 2175 _span = bytearray() 2176 _span.append((self.status << 0)) 2177 return CorePacket.serialize(self, payload = bytes(_span)) 2178 2179 @property 2180 def size(self) -> int: 2181 return 1 2182 2183@dataclass 2184class CoreQueryTimeStampCmd(CorePacket): 2185 2186 2187 def __post_init__(self): 2188 self.mt = MessageType.COMMAND 2189 self.oid = CoreOpcodeId.QUERY_UWBS_TIMESTAMP 2190 self.gid = GroupId.CORE 2191 2192 @staticmethod 2193 def parse(fields: dict, span: bytes) -> Tuple['CoreQueryTimeStampCmd', bytes]: 2194 if fields['mt'] != MessageType.COMMAND or fields['oid'] != CoreOpcodeId.QUERY_UWBS_TIMESTAMP or fields['gid'] != GroupId.CORE: 2195 raise Exception("Invalid constraint field values") 2196 return CoreQueryTimeStampCmd(**fields), span 2197 2198 def serialize(self, payload: bytes = None) -> bytes: 2199 _span = bytearray() 2200 return CorePacket.serialize(self, payload = bytes(_span)) 2201 2202 @property 2203 def size(self) -> int: 2204 return 0 2205 2206@dataclass 2207class CoreQueryTimeStampRsp(CorePacket): 2208 status: Status = field(kw_only=True, default=Status.OK) 2209 timeStamp: int = field(kw_only=True, default=0) 2210 2211 def __post_init__(self): 2212 self.mt = MessageType.RESPONSE 2213 self.oid = CoreOpcodeId.QUERY_UWBS_TIMESTAMP 2214 self.gid = GroupId.CORE 2215 2216 @staticmethod 2217 def parse(fields: dict, span: bytes) -> Tuple['CoreQueryTimeStampRsp', bytes]: 2218 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != CoreOpcodeId.QUERY_UWBS_TIMESTAMP or fields['gid'] != GroupId.CORE: 2219 raise Exception("Invalid constraint field values") 2220 if len(span) < 9: 2221 raise Exception('Invalid packet size') 2222 fields['status'] = Status.from_int(span[0]) 2223 value_ = int.from_bytes(span[1:9], byteorder='little') 2224 fields['timeStamp'] = value_ 2225 span = span[9:] 2226 return CoreQueryTimeStampRsp(**fields), span 2227 2228 def serialize(self, payload: bytes = None) -> bytes: 2229 _span = bytearray() 2230 _span.append((self.status << 0)) 2231 if self.timeStamp > 18446744073709551615: 2232 print(f"Invalid value for field CoreQueryTimeStampRsp::timeStamp: {self.timeStamp} > 18446744073709551615; the value will be truncated") 2233 self.timeStamp &= 18446744073709551615 2234 _span.extend(int.to_bytes((self.timeStamp << 0), length=8, byteorder='little')) 2235 return CorePacket.serialize(self, payload = bytes(_span)) 2236 2237 @property 2238 def size(self) -> int: 2239 return 9 2240 2241@dataclass 2242class SessionInitCmd(SessionConfigPacket): 2243 session_id: int = field(kw_only=True, default=0) 2244 session_type: SessionType = field(kw_only=True, default=SessionType.FIRA_RANGING_SESSION) 2245 2246 def __post_init__(self): 2247 self.mt = MessageType.COMMAND 2248 self.oid = SessionConfigOpcodeId.INIT 2249 self.gid = GroupId.SESSION_CONFIG 2250 2251 @staticmethod 2252 def parse(fields: dict, span: bytes) -> Tuple['SessionInitCmd', bytes]: 2253 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG: 2254 raise Exception("Invalid constraint field values") 2255 if len(span) < 5: 2256 raise Exception('Invalid packet size') 2257 value_ = int.from_bytes(span[0:4], byteorder='little') 2258 fields['session_id'] = value_ 2259 fields['session_type'] = SessionType.from_int(span[4]) 2260 span = span[5:] 2261 return SessionInitCmd(**fields), span 2262 2263 def serialize(self, payload: bytes = None) -> bytes: 2264 _span = bytearray() 2265 if self.session_id > 4294967295: 2266 print(f"Invalid value for field SessionInitCmd::session_id: {self.session_id} > 4294967295; the value will be truncated") 2267 self.session_id &= 4294967295 2268 _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little')) 2269 _span.append((self.session_type << 0)) 2270 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2271 2272 @property 2273 def size(self) -> int: 2274 return 5 2275 2276@dataclass 2277class SessionInitRsp_V2(SessionConfigPacket): 2278 status: Status = field(kw_only=True, default=Status.OK) 2279 session_handle: int = field(kw_only=True, default=0) 2280 2281 def __post_init__(self): 2282 self.mt = MessageType.RESPONSE 2283 self.oid = SessionConfigOpcodeId.INIT 2284 self.gid = GroupId.SESSION_CONFIG 2285 2286 @staticmethod 2287 def parse(fields: dict, span: bytes) -> Tuple['SessionInitRsp_V2', bytes]: 2288 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG: 2289 raise Exception("Invalid constraint field values") 2290 if len(span) < 5: 2291 raise Exception('Invalid packet size') 2292 fields['status'] = Status.from_int(span[0]) 2293 value_ = int.from_bytes(span[1:5], byteorder='little') 2294 fields['session_handle'] = value_ 2295 span = span[5:] 2296 return SessionInitRsp_V2(**fields), span 2297 2298 def serialize(self, payload: bytes = None) -> bytes: 2299 _span = bytearray() 2300 _span.append((self.status << 0)) 2301 if self.session_handle > 4294967295: 2302 print(f"Invalid value for field SessionInitRsp_V2::session_handle: {self.session_handle} > 4294967295; the value will be truncated") 2303 self.session_handle &= 4294967295 2304 _span.extend(int.to_bytes((self.session_handle << 0), length=4, byteorder='little')) 2305 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2306 2307 @property 2308 def size(self) -> int: 2309 return 5 2310 2311@dataclass 2312class SessionInitRsp(SessionConfigPacket): 2313 status: Status = field(kw_only=True, default=Status.OK) 2314 2315 def __post_init__(self): 2316 self.mt = MessageType.RESPONSE 2317 self.oid = SessionConfigOpcodeId.INIT 2318 self.gid = GroupId.SESSION_CONFIG 2319 2320 @staticmethod 2321 def parse(fields: dict, span: bytes) -> Tuple['SessionInitRsp', bytes]: 2322 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.INIT or fields['gid'] != GroupId.SESSION_CONFIG: 2323 raise Exception("Invalid constraint field values") 2324 if len(span) < 1: 2325 raise Exception('Invalid packet size') 2326 fields['status'] = Status.from_int(span[0]) 2327 span = span[1:] 2328 return SessionInitRsp(**fields), span 2329 2330 def serialize(self, payload: bytes = None) -> bytes: 2331 _span = bytearray() 2332 _span.append((self.status << 0)) 2333 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2334 2335 @property 2336 def size(self) -> int: 2337 return 1 2338 2339@dataclass 2340class SessionDeinitCmd(SessionConfigPacket): 2341 session_token: int = field(kw_only=True, default=0) 2342 2343 def __post_init__(self): 2344 self.mt = MessageType.COMMAND 2345 self.oid = SessionConfigOpcodeId.DEINIT 2346 self.gid = GroupId.SESSION_CONFIG 2347 2348 @staticmethod 2349 def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitCmd', bytes]: 2350 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.DEINIT or fields['gid'] != GroupId.SESSION_CONFIG: 2351 raise Exception("Invalid constraint field values") 2352 if len(span) < 4: 2353 raise Exception('Invalid packet size') 2354 value_ = int.from_bytes(span[0:4], byteorder='little') 2355 fields['session_token'] = value_ 2356 span = span[4:] 2357 return SessionDeinitCmd(**fields), span 2358 2359 def serialize(self, payload: bytes = None) -> bytes: 2360 _span = bytearray() 2361 if self.session_token > 4294967295: 2362 print(f"Invalid value for field SessionDeinitCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") 2363 self.session_token &= 4294967295 2364 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 2365 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2366 2367 @property 2368 def size(self) -> int: 2369 return 4 2370 2371@dataclass 2372class SessionDeinitRsp(SessionConfigPacket): 2373 status: Status = field(kw_only=True, default=Status.OK) 2374 2375 def __post_init__(self): 2376 self.mt = MessageType.RESPONSE 2377 self.oid = SessionConfigOpcodeId.DEINIT 2378 self.gid = GroupId.SESSION_CONFIG 2379 2380 @staticmethod 2381 def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitRsp', bytes]: 2382 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.DEINIT or fields['gid'] != GroupId.SESSION_CONFIG: 2383 raise Exception("Invalid constraint field values") 2384 if len(span) < 1: 2385 raise Exception('Invalid packet size') 2386 fields['status'] = Status.from_int(span[0]) 2387 span = span[1:] 2388 return SessionDeinitRsp(**fields), span 2389 2390 def serialize(self, payload: bytes = None) -> bytes: 2391 _span = bytearray() 2392 _span.append((self.status << 0)) 2393 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2394 2395 @property 2396 def size(self) -> int: 2397 return 1 2398 2399@dataclass 2400class SessionStatusNtf(SessionConfigPacket): 2401 session_token: int = field(kw_only=True, default=0) 2402 session_state: SessionState = field(kw_only=True, default=SessionState.SESSION_STATE_INIT) 2403 reason_code: int = field(kw_only=True, default=0) 2404 2405 def __post_init__(self): 2406 self.mt = MessageType.NOTIFICATION 2407 self.oid = SessionConfigOpcodeId.STATUS 2408 self.gid = GroupId.SESSION_CONFIG 2409 2410 @staticmethod 2411 def parse(fields: dict, span: bytes) -> Tuple['SessionStatusNtf', bytes]: 2412 if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionConfigOpcodeId.STATUS or fields['gid'] != GroupId.SESSION_CONFIG: 2413 raise Exception("Invalid constraint field values") 2414 if len(span) < 6: 2415 raise Exception('Invalid packet size') 2416 value_ = int.from_bytes(span[0:4], byteorder='little') 2417 fields['session_token'] = value_ 2418 fields['session_state'] = SessionState.from_int(span[4]) 2419 fields['reason_code'] = span[5] 2420 span = span[6:] 2421 return SessionStatusNtf(**fields), span 2422 2423 def serialize(self, payload: bytes = None) -> bytes: 2424 _span = bytearray() 2425 if self.session_token > 4294967295: 2426 print(f"Invalid value for field SessionStatusNtf::session_token: {self.session_token} > 4294967295; the value will be truncated") 2427 self.session_token &= 4294967295 2428 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 2429 _span.append((self.session_state << 0)) 2430 if self.reason_code > 255: 2431 print(f"Invalid value for field SessionStatusNtf::reason_code: {self.reason_code} > 255; the value will be truncated") 2432 self.reason_code &= 255 2433 _span.append((self.reason_code << 0)) 2434 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2435 2436 @property 2437 def size(self) -> int: 2438 return 6 2439 2440@dataclass 2441class AppConfigTlv(Packet): 2442 cfg_id: AppConfigTlvType = field(kw_only=True, default=AppConfigTlvType.DEVICE_TYPE) 2443 v: bytearray = field(kw_only=True, default_factory=bytearray) 2444 2445 def __post_init__(self): 2446 pass 2447 2448 @staticmethod 2449 def parse(span: bytes) -> Tuple['AppConfigTlv', bytes]: 2450 fields = {'payload': None} 2451 if len(span) < 2: 2452 raise Exception('Invalid packet size') 2453 fields['cfg_id'] = AppConfigTlvType.from_int(span[0]) 2454 v_count = span[1] 2455 span = span[2:] 2456 if len(span) < v_count: 2457 raise Exception('Invalid packet size') 2458 fields['v'] = list(span[:v_count]) 2459 span = span[v_count:] 2460 return AppConfigTlv(**fields), span 2461 2462 def serialize(self, payload: bytes = None) -> bytes: 2463 _span = bytearray() 2464 _span.append((self.cfg_id << 0)) 2465 if len(self.v) > 255: 2466 print(f"Invalid length for field AppConfigTlv::v: {len(self.v)} > 255; the array will be truncated") 2467 del self.v[255:] 2468 _span.append((len(self.v) << 0)) 2469 _span.extend(self.v) 2470 return bytes(_span) 2471 2472 @property 2473 def size(self) -> int: 2474 return len(self.v) * 1 + 2 2475 2476@dataclass 2477class SessionSetAppConfigCmd(SessionConfigPacket): 2478 session_token: int = field(kw_only=True, default=0) 2479 tlvs: List[AppConfigTlv] = field(kw_only=True, default_factory=list) 2480 2481 def __post_init__(self): 2482 self.mt = MessageType.COMMAND 2483 self.oid = SessionConfigOpcodeId.SET_APP_CONFIG 2484 self.gid = GroupId.SESSION_CONFIG 2485 2486 @staticmethod 2487 def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigCmd', bytes]: 2488 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.SET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG: 2489 raise Exception("Invalid constraint field values") 2490 if len(span) < 5: 2491 raise Exception('Invalid packet size') 2492 value_ = int.from_bytes(span[0:4], byteorder='little') 2493 fields['session_token'] = value_ 2494 tlvs_count = span[4] 2495 span = span[5:] 2496 tlvs = [] 2497 for n in range(tlvs_count): 2498 element, span = AppConfigTlv.parse(span) 2499 tlvs.append(element) 2500 fields['tlvs'] = tlvs 2501 return SessionSetAppConfigCmd(**fields), span 2502 2503 def serialize(self, payload: bytes = None) -> bytes: 2504 _span = bytearray() 2505 if self.session_token > 4294967295: 2506 print(f"Invalid value for field SessionSetAppConfigCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") 2507 self.session_token &= 4294967295 2508 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 2509 if len(self.tlvs) > 255: 2510 print(f"Invalid length for field SessionSetAppConfigCmd::tlvs: {len(self.tlvs)} > 255; the array will be truncated") 2511 del self.tlvs[255:] 2512 _span.append((len(self.tlvs) << 0)) 2513 for _elt in self.tlvs: 2514 _span.extend(_elt.serialize()) 2515 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2516 2517 @property 2518 def size(self) -> int: 2519 return sum([elt.size for elt in self.tlvs]) + 5 2520 2521@dataclass 2522class AppConfigStatus(Packet): 2523 cfg_id: AppConfigTlvType = field(kw_only=True, default=AppConfigTlvType.DEVICE_TYPE) 2524 status: Status = field(kw_only=True, default=Status.OK) 2525 2526 def __post_init__(self): 2527 pass 2528 2529 @staticmethod 2530 def parse(span: bytes) -> Tuple['AppConfigStatus', bytes]: 2531 fields = {'payload': None} 2532 if len(span) < 2: 2533 raise Exception('Invalid packet size') 2534 fields['cfg_id'] = AppConfigTlvType.from_int(span[0]) 2535 fields['status'] = Status.from_int(span[1]) 2536 span = span[2:] 2537 return AppConfigStatus(**fields), span 2538 2539 def serialize(self, payload: bytes = None) -> bytes: 2540 _span = bytearray() 2541 _span.append((self.cfg_id << 0)) 2542 _span.append((self.status << 0)) 2543 return bytes(_span) 2544 2545 @property 2546 def size(self) -> int: 2547 return 2 2548 2549@dataclass 2550class SessionSetAppConfigRsp(SessionConfigPacket): 2551 status: Status = field(kw_only=True, default=Status.OK) 2552 cfg_status: List[AppConfigStatus] = field(kw_only=True, default_factory=list) 2553 2554 def __post_init__(self): 2555 self.mt = MessageType.RESPONSE 2556 self.oid = SessionConfigOpcodeId.SET_APP_CONFIG 2557 self.gid = GroupId.SESSION_CONFIG 2558 2559 @staticmethod 2560 def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigRsp', bytes]: 2561 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.SET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG: 2562 raise Exception("Invalid constraint field values") 2563 if len(span) < 2: 2564 raise Exception('Invalid packet size') 2565 fields['status'] = Status.from_int(span[0]) 2566 cfg_status_count = span[1] 2567 span = span[2:] 2568 if len(span) < cfg_status_count * 2: 2569 raise Exception('Invalid packet size') 2570 cfg_status = [] 2571 for n in range(cfg_status_count): 2572 cfg_status.append(AppConfigStatus.parse_all(span[n * 2:(n + 1) * 2])) 2573 fields['cfg_status'] = cfg_status 2574 span = span[cfg_status_count * 2:] 2575 return SessionSetAppConfigRsp(**fields), span 2576 2577 def serialize(self, payload: bytes = None) -> bytes: 2578 _span = bytearray() 2579 _span.append((self.status << 0)) 2580 if len(self.cfg_status) > 255: 2581 print(f"Invalid length for field SessionSetAppConfigRsp::cfg_status: {len(self.cfg_status)} > 255; the array will be truncated") 2582 del self.cfg_status[255:] 2583 _span.append((len(self.cfg_status) << 0)) 2584 for _elt in self.cfg_status: 2585 _span.extend(_elt.serialize()) 2586 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2587 2588 @property 2589 def size(self) -> int: 2590 return sum([elt.size for elt in self.cfg_status]) + 2 2591 2592@dataclass 2593class SessionGetAppConfigCmd(SessionConfigPacket): 2594 session_token: int = field(kw_only=True, default=0) 2595 app_cfg: List[AppConfigTlvType] = field(kw_only=True, default_factory=list) 2596 2597 def __post_init__(self): 2598 self.mt = MessageType.COMMAND 2599 self.oid = SessionConfigOpcodeId.GET_APP_CONFIG 2600 self.gid = GroupId.SESSION_CONFIG 2601 2602 @staticmethod 2603 def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigCmd', bytes]: 2604 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG: 2605 raise Exception("Invalid constraint field values") 2606 if len(span) < 5: 2607 raise Exception('Invalid packet size') 2608 value_ = int.from_bytes(span[0:4], byteorder='little') 2609 fields['session_token'] = value_ 2610 app_cfg_count = span[4] 2611 span = span[5:] 2612 if len(span) < app_cfg_count: 2613 raise Exception('Invalid packet size') 2614 app_cfg = [] 2615 for n in range(app_cfg_count): 2616 app_cfg.append(AppConfigTlvType(int.from_bytes(span[n:n + 1], byteorder='little'))) 2617 fields['app_cfg'] = app_cfg 2618 span = span[app_cfg_count:] 2619 return SessionGetAppConfigCmd(**fields), span 2620 2621 def serialize(self, payload: bytes = None) -> bytes: 2622 _span = bytearray() 2623 if self.session_token > 4294967295: 2624 print(f"Invalid value for field SessionGetAppConfigCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") 2625 self.session_token &= 4294967295 2626 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 2627 if len(self.app_cfg) > 255: 2628 print(f"Invalid length for field SessionGetAppConfigCmd::app_cfg: {len(self.app_cfg)} > 255; the array will be truncated") 2629 del self.app_cfg[255:] 2630 _span.append((len(self.app_cfg) << 0)) 2631 for _elt in self.app_cfg: 2632 _span.append(_elt) 2633 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2634 2635 @property 2636 def size(self) -> int: 2637 return len(self.app_cfg) * 8 + 5 2638 2639@dataclass 2640class SessionGetAppConfigRsp(SessionConfigPacket): 2641 status: Status = field(kw_only=True, default=Status.OK) 2642 tlvs: List[AppConfigTlv] = field(kw_only=True, default_factory=list) 2643 2644 def __post_init__(self): 2645 self.mt = MessageType.RESPONSE 2646 self.oid = SessionConfigOpcodeId.GET_APP_CONFIG 2647 self.gid = GroupId.SESSION_CONFIG 2648 2649 @staticmethod 2650 def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigRsp', bytes]: 2651 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_APP_CONFIG or fields['gid'] != GroupId.SESSION_CONFIG: 2652 raise Exception("Invalid constraint field values") 2653 if len(span) < 2: 2654 raise Exception('Invalid packet size') 2655 fields['status'] = Status.from_int(span[0]) 2656 tlvs_count = span[1] 2657 span = span[2:] 2658 tlvs = [] 2659 for n in range(tlvs_count): 2660 element, span = AppConfigTlv.parse(span) 2661 tlvs.append(element) 2662 fields['tlvs'] = tlvs 2663 return SessionGetAppConfigRsp(**fields), span 2664 2665 def serialize(self, payload: bytes = None) -> bytes: 2666 _span = bytearray() 2667 _span.append((self.status << 0)) 2668 if len(self.tlvs) > 255: 2669 print(f"Invalid length for field SessionGetAppConfigRsp::tlvs: {len(self.tlvs)} > 255; the array will be truncated") 2670 del self.tlvs[255:] 2671 _span.append((len(self.tlvs) << 0)) 2672 for _elt in self.tlvs: 2673 _span.extend(_elt.serialize()) 2674 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2675 2676 @property 2677 def size(self) -> int: 2678 return sum([elt.size for elt in self.tlvs]) + 2 2679 2680@dataclass 2681class SessionGetCountCmd(SessionConfigPacket): 2682 2683 2684 def __post_init__(self): 2685 self.mt = MessageType.COMMAND 2686 self.oid = SessionConfigOpcodeId.GET_COUNT 2687 self.gid = GroupId.SESSION_CONFIG 2688 2689 @staticmethod 2690 def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountCmd', bytes]: 2691 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_COUNT or fields['gid'] != GroupId.SESSION_CONFIG: 2692 raise Exception("Invalid constraint field values") 2693 return SessionGetCountCmd(**fields), span 2694 2695 def serialize(self, payload: bytes = None) -> bytes: 2696 _span = bytearray() 2697 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2698 2699 @property 2700 def size(self) -> int: 2701 return 0 2702 2703@dataclass 2704class SessionGetCountRsp(SessionConfigPacket): 2705 status: Status = field(kw_only=True, default=Status.OK) 2706 session_count: int = field(kw_only=True, default=0) 2707 2708 def __post_init__(self): 2709 self.mt = MessageType.RESPONSE 2710 self.oid = SessionConfigOpcodeId.GET_COUNT 2711 self.gid = GroupId.SESSION_CONFIG 2712 2713 @staticmethod 2714 def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountRsp', bytes]: 2715 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_COUNT or fields['gid'] != GroupId.SESSION_CONFIG: 2716 raise Exception("Invalid constraint field values") 2717 if len(span) < 2: 2718 raise Exception('Invalid packet size') 2719 fields['status'] = Status.from_int(span[0]) 2720 fields['session_count'] = span[1] 2721 span = span[2:] 2722 return SessionGetCountRsp(**fields), span 2723 2724 def serialize(self, payload: bytes = None) -> bytes: 2725 _span = bytearray() 2726 _span.append((self.status << 0)) 2727 if self.session_count > 255: 2728 print(f"Invalid value for field SessionGetCountRsp::session_count: {self.session_count} > 255; the value will be truncated") 2729 self.session_count &= 255 2730 _span.append((self.session_count << 0)) 2731 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2732 2733 @property 2734 def size(self) -> int: 2735 return 2 2736 2737@dataclass 2738class SessionGetStateCmd(SessionConfigPacket): 2739 session_token: int = field(kw_only=True, default=0) 2740 2741 def __post_init__(self): 2742 self.mt = MessageType.COMMAND 2743 self.oid = SessionConfigOpcodeId.GET_STATE 2744 self.gid = GroupId.SESSION_CONFIG 2745 2746 @staticmethod 2747 def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateCmd', bytes]: 2748 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.GET_STATE or fields['gid'] != GroupId.SESSION_CONFIG: 2749 raise Exception("Invalid constraint field values") 2750 if len(span) < 4: 2751 raise Exception('Invalid packet size') 2752 value_ = int.from_bytes(span[0:4], byteorder='little') 2753 fields['session_token'] = value_ 2754 span = span[4:] 2755 return SessionGetStateCmd(**fields), span 2756 2757 def serialize(self, payload: bytes = None) -> bytes: 2758 _span = bytearray() 2759 if self.session_token > 4294967295: 2760 print(f"Invalid value for field SessionGetStateCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") 2761 self.session_token &= 4294967295 2762 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 2763 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2764 2765 @property 2766 def size(self) -> int: 2767 return 4 2768 2769@dataclass 2770class SessionGetStateRsp(SessionConfigPacket): 2771 status: Status = field(kw_only=True, default=Status.OK) 2772 session_state: SessionState = field(kw_only=True, default=SessionState.SESSION_STATE_INIT) 2773 2774 def __post_init__(self): 2775 self.mt = MessageType.RESPONSE 2776 self.oid = SessionConfigOpcodeId.GET_STATE 2777 self.gid = GroupId.SESSION_CONFIG 2778 2779 @staticmethod 2780 def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateRsp', bytes]: 2781 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.GET_STATE or fields['gid'] != GroupId.SESSION_CONFIG: 2782 raise Exception("Invalid constraint field values") 2783 if len(span) < 2: 2784 raise Exception('Invalid packet size') 2785 fields['status'] = Status.from_int(span[0]) 2786 fields['session_state'] = SessionState.from_int(span[1]) 2787 span = span[2:] 2788 return SessionGetStateRsp(**fields), span 2789 2790 def serialize(self, payload: bytes = None) -> bytes: 2791 _span = bytearray() 2792 _span.append((self.status << 0)) 2793 _span.append((self.session_state << 0)) 2794 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2795 2796 @property 2797 def size(self) -> int: 2798 return 2 2799 2800@dataclass 2801class SessionUpdateDtAnchorRangingRoundsCmd(SessionConfigPacket): 2802 2803 2804 def __post_init__(self): 2805 self.mt = MessageType.COMMAND 2806 self.oid = SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS 2807 self.gid = GroupId.SESSION_CONFIG 2808 2809 @staticmethod 2810 def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtAnchorRangingRoundsCmd', bytes]: 2811 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG: 2812 raise Exception("Invalid constraint field values") 2813 return SessionUpdateDtAnchorRangingRoundsCmd(**fields), span 2814 2815 def serialize(self, payload: bytes = None) -> bytes: 2816 _span = bytearray() 2817 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2818 2819 @property 2820 def size(self) -> int: 2821 return 0 2822 2823@dataclass 2824class SessionUpdateDtAnchorRangingRoundsRsp(SessionConfigPacket): 2825 2826 2827 def __post_init__(self): 2828 self.mt = MessageType.RESPONSE 2829 self.oid = SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS 2830 self.gid = GroupId.SESSION_CONFIG 2831 2832 @staticmethod 2833 def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtAnchorRangingRoundsRsp', bytes]: 2834 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_ANCHOR_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG: 2835 raise Exception("Invalid constraint field values") 2836 return SessionUpdateDtAnchorRangingRoundsRsp(**fields), span 2837 2838 def serialize(self, payload: bytes = None) -> bytes: 2839 _span = bytearray() 2840 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2841 2842 @property 2843 def size(self) -> int: 2844 return 0 2845 2846@dataclass 2847class SessionUpdateDtTagRangingRoundsCmd(SessionConfigPacket): 2848 session_token: int = field(kw_only=True, default=0) 2849 ranging_round_indexes: bytearray = field(kw_only=True, default_factory=bytearray) 2850 2851 def __post_init__(self): 2852 self.mt = MessageType.COMMAND 2853 self.oid = SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS 2854 self.gid = GroupId.SESSION_CONFIG 2855 2856 @staticmethod 2857 def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtTagRangingRoundsCmd', bytes]: 2858 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG: 2859 raise Exception("Invalid constraint field values") 2860 if len(span) < 5: 2861 raise Exception('Invalid packet size') 2862 value_ = int.from_bytes(span[0:4], byteorder='little') 2863 fields['session_token'] = value_ 2864 ranging_round_indexes_count = span[4] 2865 span = span[5:] 2866 if len(span) < ranging_round_indexes_count: 2867 raise Exception('Invalid packet size') 2868 fields['ranging_round_indexes'] = list(span[:ranging_round_indexes_count]) 2869 span = span[ranging_round_indexes_count:] 2870 return SessionUpdateDtTagRangingRoundsCmd(**fields), span 2871 2872 def serialize(self, payload: bytes = None) -> bytes: 2873 _span = bytearray() 2874 if self.session_token > 4294967295: 2875 print(f"Invalid value for field SessionUpdateDtTagRangingRoundsCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") 2876 self.session_token &= 4294967295 2877 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 2878 if len(self.ranging_round_indexes) > 255: 2879 print(f"Invalid length for field SessionUpdateDtTagRangingRoundsCmd::ranging_round_indexes: {len(self.ranging_round_indexes)} > 255; the array will be truncated") 2880 del self.ranging_round_indexes[255:] 2881 _span.append((len(self.ranging_round_indexes) << 0)) 2882 _span.extend(self.ranging_round_indexes) 2883 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2884 2885 @property 2886 def size(self) -> int: 2887 return len(self.ranging_round_indexes) * 1 + 5 2888 2889@dataclass 2890class SessionUpdateDtTagRangingRoundsRsp(SessionConfigPacket): 2891 status: Status = field(kw_only=True, default=Status.OK) 2892 ranging_round_indexes: bytearray = field(kw_only=True, default_factory=bytearray) 2893 2894 def __post_init__(self): 2895 self.mt = MessageType.RESPONSE 2896 self.oid = SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS 2897 self.gid = GroupId.SESSION_CONFIG 2898 2899 @staticmethod 2900 def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateDtTagRangingRoundsRsp', bytes]: 2901 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_DT_TAG_RANGING_ROUNDS or fields['gid'] != GroupId.SESSION_CONFIG: 2902 raise Exception("Invalid constraint field values") 2903 if len(span) < 2: 2904 raise Exception('Invalid packet size') 2905 fields['status'] = Status.from_int(span[0]) 2906 ranging_round_indexes_count = span[1] 2907 span = span[2:] 2908 if len(span) < ranging_round_indexes_count: 2909 raise Exception('Invalid packet size') 2910 fields['ranging_round_indexes'] = list(span[:ranging_round_indexes_count]) 2911 span = span[ranging_round_indexes_count:] 2912 return SessionUpdateDtTagRangingRoundsRsp(**fields), span 2913 2914 def serialize(self, payload: bytes = None) -> bytes: 2915 _span = bytearray() 2916 _span.append((self.status << 0)) 2917 if len(self.ranging_round_indexes) > 255: 2918 print(f"Invalid length for field SessionUpdateDtTagRangingRoundsRsp::ranging_round_indexes: {len(self.ranging_round_indexes)} > 255; the array will be truncated") 2919 del self.ranging_round_indexes[255:] 2920 _span.append((len(self.ranging_round_indexes) << 0)) 2921 _span.extend(self.ranging_round_indexes) 2922 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 2923 2924 @property 2925 def size(self) -> int: 2926 return len(self.ranging_round_indexes) * 1 + 2 2927 2928@dataclass 2929class Controlee(Packet): 2930 short_address: bytearray = field(kw_only=True, default_factory=bytearray) 2931 subsession_id: int = field(kw_only=True, default=0) 2932 2933 def __post_init__(self): 2934 pass 2935 2936 @staticmethod 2937 def parse(span: bytes) -> Tuple['Controlee', bytes]: 2938 fields = {'payload': None} 2939 if len(span) < 2: 2940 raise Exception('Invalid packet size') 2941 fields['short_address'] = list(span[:2]) 2942 span = span[2:] 2943 if len(span) < 4: 2944 raise Exception('Invalid packet size') 2945 value_ = int.from_bytes(span[0:4], byteorder='little') 2946 fields['subsession_id'] = value_ 2947 span = span[4:] 2948 return Controlee(**fields), span 2949 2950 def serialize(self, payload: bytes = None) -> bytes: 2951 _span = bytearray() 2952 _span.extend(self.short_address) 2953 if self.subsession_id > 4294967295: 2954 print(f"Invalid value for field Controlee::subsession_id: {self.subsession_id} > 4294967295; the value will be truncated") 2955 self.subsession_id &= 4294967295 2956 _span.extend(int.to_bytes((self.subsession_id << 0), length=4, byteorder='little')) 2957 return bytes(_span) 2958 2959 @property 2960 def size(self) -> int: 2961 return 6 2962 2963@dataclass 2964class Controlee_V2_0_16_Byte_Version(Packet): 2965 short_address: bytearray = field(kw_only=True, default_factory=bytearray) 2966 subsession_id: int = field(kw_only=True, default=0) 2967 subsession_key: bytearray = field(kw_only=True, default_factory=bytearray) 2968 2969 def __post_init__(self): 2970 pass 2971 2972 @staticmethod 2973 def parse(span: bytes) -> Tuple['Controlee_V2_0_16_Byte_Version', bytes]: 2974 fields = {'payload': None} 2975 if len(span) < 2: 2976 raise Exception('Invalid packet size') 2977 fields['short_address'] = list(span[:2]) 2978 span = span[2:] 2979 if len(span) < 4: 2980 raise Exception('Invalid packet size') 2981 value_ = int.from_bytes(span[0:4], byteorder='little') 2982 fields['subsession_id'] = value_ 2983 span = span[4:] 2984 if len(span) < 16: 2985 raise Exception('Invalid packet size') 2986 fields['subsession_key'] = list(span[:16]) 2987 span = span[16:] 2988 return Controlee_V2_0_16_Byte_Version(**fields), span 2989 2990 def serialize(self, payload: bytes = None) -> bytes: 2991 _span = bytearray() 2992 _span.extend(self.short_address) 2993 if self.subsession_id > 4294967295: 2994 print(f"Invalid value for field Controlee_V2_0_16_Byte_Version::subsession_id: {self.subsession_id} > 4294967295; the value will be truncated") 2995 self.subsession_id &= 4294967295 2996 _span.extend(int.to_bytes((self.subsession_id << 0), length=4, byteorder='little')) 2997 _span.extend(self.subsession_key) 2998 return bytes(_span) 2999 3000 @property 3001 def size(self) -> int: 3002 return 22 3003 3004@dataclass 3005class Controlee_V2_0_32_Byte_Version(Packet): 3006 short_address: bytearray = field(kw_only=True, default_factory=bytearray) 3007 subsession_id: int = field(kw_only=True, default=0) 3008 subsession_key: bytearray = field(kw_only=True, default_factory=bytearray) 3009 3010 def __post_init__(self): 3011 pass 3012 3013 @staticmethod 3014 def parse(span: bytes) -> Tuple['Controlee_V2_0_32_Byte_Version', bytes]: 3015 fields = {'payload': None} 3016 if len(span) < 2: 3017 raise Exception('Invalid packet size') 3018 fields['short_address'] = list(span[:2]) 3019 span = span[2:] 3020 if len(span) < 4: 3021 raise Exception('Invalid packet size') 3022 value_ = int.from_bytes(span[0:4], byteorder='little') 3023 fields['subsession_id'] = value_ 3024 span = span[4:] 3025 if len(span) < 32: 3026 raise Exception('Invalid packet size') 3027 fields['subsession_key'] = list(span[:32]) 3028 span = span[32:] 3029 return Controlee_V2_0_32_Byte_Version(**fields), span 3030 3031 def serialize(self, payload: bytes = None) -> bytes: 3032 _span = bytearray() 3033 _span.extend(self.short_address) 3034 if self.subsession_id > 4294967295: 3035 print(f"Invalid value for field Controlee_V2_0_32_Byte_Version::subsession_id: {self.subsession_id} > 4294967295; the value will be truncated") 3036 self.subsession_id &= 4294967295 3037 _span.extend(int.to_bytes((self.subsession_id << 0), length=4, byteorder='little')) 3038 _span.extend(self.subsession_key) 3039 return bytes(_span) 3040 3041 @property 3042 def size(self) -> int: 3043 return 38 3044 3045class UpdateMulticastListAction(enum.IntEnum): 3046 ADD_CONTROLEE = 0x0 3047 REMOVE_CONTROLEE = 0x1 3048 ADD_CONTROLEE_WITH_SHORT_SUB_SESSION_KEY = 0x2 3049 ADD_CONTROLEE_WITH_EXTENDED_SUB_SESSION_KEY = 0x3 3050 3051 @staticmethod 3052 def from_int(v: int) -> Union[int, 'UpdateMulticastListAction']: 3053 try: 3054 return UpdateMulticastListAction(v) 3055 except ValueError as exn: 3056 raise exn 3057 3058 3059@dataclass 3060class SessionUpdateControllerMulticastListCmd(SessionConfigPacket): 3061 session_token: int = field(kw_only=True, default=0) 3062 action: UpdateMulticastListAction = field(kw_only=True, default=UpdateMulticastListAction.ADD_CONTROLEE) 3063 3064 def __post_init__(self): 3065 self.mt = MessageType.COMMAND 3066 self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST 3067 self.gid = GroupId.SESSION_CONFIG 3068 3069 @staticmethod 3070 def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmd', bytes]: 3071 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG: 3072 raise Exception("Invalid constraint field values") 3073 if len(span) < 5: 3074 raise Exception('Invalid packet size') 3075 value_ = int.from_bytes(span[0:4], byteorder='little') 3076 fields['session_token'] = value_ 3077 fields['action'] = UpdateMulticastListAction.from_int(span[4]) 3078 span = span[5:] 3079 payload = span 3080 span = bytes([]) 3081 fields['payload'] = payload 3082 return SessionUpdateControllerMulticastListCmd(**fields), span 3083 3084 def serialize(self, payload: bytes = None) -> bytes: 3085 _span = bytearray() 3086 if self.session_token > 4294967295: 3087 print(f"Invalid value for field SessionUpdateControllerMulticastListCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") 3088 self.session_token &= 4294967295 3089 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 3090 _span.append((self.action << 0)) 3091 _span.extend(payload or self.payload or []) 3092 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 3093 3094 @property 3095 def size(self) -> int: 3096 return len(self.payload) + 5 3097 3098@dataclass 3099class SessionUpdateControllerMulticastListCmdPayload(Packet): 3100 controlees: List[Controlee] = field(kw_only=True, default_factory=list) 3101 3102 def __post_init__(self): 3103 pass 3104 3105 @staticmethod 3106 def parse(span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmdPayload', bytes]: 3107 fields = {'payload': None} 3108 if len(span) < 1: 3109 raise Exception('Invalid packet size') 3110 controlees_count = span[0] 3111 span = span[1:] 3112 if len(span) < controlees_count * 6: 3113 raise Exception('Invalid packet size') 3114 controlees = [] 3115 for n in range(controlees_count): 3116 controlees.append(Controlee.parse_all(span[n * 6:(n + 1) * 6])) 3117 fields['controlees'] = controlees 3118 span = span[controlees_count * 6:] 3119 return SessionUpdateControllerMulticastListCmdPayload(**fields), span 3120 3121 def serialize(self, payload: bytes = None) -> bytes: 3122 _span = bytearray() 3123 if len(self.controlees) > 255: 3124 print(f"Invalid length for field SessionUpdateControllerMulticastListCmdPayload::controlees: {len(self.controlees)} > 255; the array will be truncated") 3125 del self.controlees[255:] 3126 _span.append((len(self.controlees) << 0)) 3127 for _elt in self.controlees: 3128 _span.extend(_elt.serialize()) 3129 return bytes(_span) 3130 3131 @property 3132 def size(self) -> int: 3133 return sum([elt.size for elt in self.controlees]) + 1 3134 3135@dataclass 3136class SessionUpdateControllerMulticastListCmd_2_0_16_Byte_Payload(Packet): 3137 controlees: List[Controlee_V2_0_16_Byte_Version] = field(kw_only=True, default_factory=list) 3138 3139 def __post_init__(self): 3140 pass 3141 3142 @staticmethod 3143 def parse(span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmd_2_0_16_Byte_Payload', bytes]: 3144 fields = {'payload': None} 3145 if len(span) < 1: 3146 raise Exception('Invalid packet size') 3147 controlees_count = span[0] 3148 span = span[1:] 3149 if len(span) < controlees_count * 22: 3150 raise Exception('Invalid packet size') 3151 controlees = [] 3152 for n in range(controlees_count): 3153 controlees.append(Controlee_V2_0_16_Byte_Version.parse_all(span[n * 22:(n + 1) * 22])) 3154 fields['controlees'] = controlees 3155 span = span[controlees_count * 22:] 3156 return SessionUpdateControllerMulticastListCmd_2_0_16_Byte_Payload(**fields), span 3157 3158 def serialize(self, payload: bytes = None) -> bytes: 3159 _span = bytearray() 3160 if len(self.controlees) > 255: 3161 print(f"Invalid length for field SessionUpdateControllerMulticastListCmd_2_0_16_Byte_Payload::controlees: {len(self.controlees)} > 255; the array will be truncated") 3162 del self.controlees[255:] 3163 _span.append((len(self.controlees) << 0)) 3164 for _elt in self.controlees: 3165 _span.extend(_elt.serialize()) 3166 return bytes(_span) 3167 3168 @property 3169 def size(self) -> int: 3170 return sum([elt.size for elt in self.controlees]) + 1 3171 3172@dataclass 3173class SessionUpdateControllerMulticastListCmd_2_0_32_Byte_Payload(Packet): 3174 controlees: List[Controlee_V2_0_32_Byte_Version] = field(kw_only=True, default_factory=list) 3175 3176 def __post_init__(self): 3177 pass 3178 3179 @staticmethod 3180 def parse(span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmd_2_0_32_Byte_Payload', bytes]: 3181 fields = {'payload': None} 3182 if len(span) < 1: 3183 raise Exception('Invalid packet size') 3184 controlees_count = span[0] 3185 span = span[1:] 3186 if len(span) < controlees_count * 38: 3187 raise Exception('Invalid packet size') 3188 controlees = [] 3189 for n in range(controlees_count): 3190 controlees.append(Controlee_V2_0_32_Byte_Version.parse_all(span[n * 38:(n + 1) * 38])) 3191 fields['controlees'] = controlees 3192 span = span[controlees_count * 38:] 3193 return SessionUpdateControllerMulticastListCmd_2_0_32_Byte_Payload(**fields), span 3194 3195 def serialize(self, payload: bytes = None) -> bytes: 3196 _span = bytearray() 3197 if len(self.controlees) > 255: 3198 print(f"Invalid length for field SessionUpdateControllerMulticastListCmd_2_0_32_Byte_Payload::controlees: {len(self.controlees)} > 255; the array will be truncated") 3199 del self.controlees[255:] 3200 _span.append((len(self.controlees) << 0)) 3201 for _elt in self.controlees: 3202 _span.extend(_elt.serialize()) 3203 return bytes(_span) 3204 3205 @property 3206 def size(self) -> int: 3207 return sum([elt.size for elt in self.controlees]) + 1 3208 3209@dataclass 3210class SessionUpdateControllerMulticastListRsp(SessionConfigPacket): 3211 status: Status = field(kw_only=True, default=Status.OK) 3212 3213 def __post_init__(self): 3214 self.mt = MessageType.RESPONSE 3215 self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST 3216 self.gid = GroupId.SESSION_CONFIG 3217 3218 @staticmethod 3219 def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListRsp', bytes]: 3220 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG: 3221 raise Exception("Invalid constraint field values") 3222 if len(span) < 1: 3223 raise Exception('Invalid packet size') 3224 fields['status'] = Status.from_int(span[0]) 3225 span = span[1:] 3226 return SessionUpdateControllerMulticastListRsp(**fields), span 3227 3228 def serialize(self, payload: bytes = None) -> bytes: 3229 _span = bytearray() 3230 _span.append((self.status << 0)) 3231 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 3232 3233 @property 3234 def size(self) -> int: 3235 return 1 3236 3237@dataclass 3238class ControleeStatus(Packet): 3239 mac_address: bytearray = field(kw_only=True, default_factory=bytearray) 3240 subsession_id: int = field(kw_only=True, default=0) 3241 status: MulticastUpdateStatus = field(kw_only=True, default=MulticastUpdateStatus.OK_MULTICAST_LIST_UPDATE) 3242 3243 def __post_init__(self): 3244 pass 3245 3246 @staticmethod 3247 def parse(span: bytes) -> Tuple['ControleeStatus', bytes]: 3248 fields = {'payload': None} 3249 if len(span) < 2: 3250 raise Exception('Invalid packet size') 3251 fields['mac_address'] = list(span[:2]) 3252 span = span[2:] 3253 if len(span) < 5: 3254 raise Exception('Invalid packet size') 3255 value_ = int.from_bytes(span[0:4], byteorder='little') 3256 fields['subsession_id'] = value_ 3257 fields['status'] = MulticastUpdateStatus.from_int(span[4]) 3258 span = span[5:] 3259 return ControleeStatus(**fields), span 3260 3261 def serialize(self, payload: bytes = None) -> bytes: 3262 _span = bytearray() 3263 _span.extend(self.mac_address) 3264 if self.subsession_id > 4294967295: 3265 print(f"Invalid value for field ControleeStatus::subsession_id: {self.subsession_id} > 4294967295; the value will be truncated") 3266 self.subsession_id &= 4294967295 3267 _span.extend(int.to_bytes((self.subsession_id << 0), length=4, byteorder='little')) 3268 _span.append((self.status << 0)) 3269 return bytes(_span) 3270 3271 @property 3272 def size(self) -> int: 3273 return 7 3274 3275@dataclass 3276class SessionUpdateControllerMulticastListNtf(SessionConfigPacket): 3277 session_token: int = field(kw_only=True, default=0) 3278 remaining_multicast_list_size: int = field(kw_only=True, default=0) 3279 controlee_status: List[ControleeStatus] = field(kw_only=True, default_factory=list) 3280 3281 def __post_init__(self): 3282 self.mt = MessageType.NOTIFICATION 3283 self.oid = SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST 3284 self.gid = GroupId.SESSION_CONFIG 3285 3286 @staticmethod 3287 def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListNtf', bytes]: 3288 if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionConfigOpcodeId.UPDATE_CONTROLLER_MULTICAST_LIST or fields['gid'] != GroupId.SESSION_CONFIG: 3289 raise Exception("Invalid constraint field values") 3290 if len(span) < 6: 3291 raise Exception('Invalid packet size') 3292 value_ = int.from_bytes(span[0:4], byteorder='little') 3293 fields['session_token'] = value_ 3294 fields['remaining_multicast_list_size'] = span[4] 3295 controlee_status_count = span[5] 3296 span = span[6:] 3297 if len(span) < controlee_status_count * 7: 3298 raise Exception('Invalid packet size') 3299 controlee_status = [] 3300 for n in range(controlee_status_count): 3301 controlee_status.append(ControleeStatus.parse_all(span[n * 7:(n + 1) * 7])) 3302 fields['controlee_status'] = controlee_status 3303 span = span[controlee_status_count * 7:] 3304 return SessionUpdateControllerMulticastListNtf(**fields), span 3305 3306 def serialize(self, payload: bytes = None) -> bytes: 3307 _span = bytearray() 3308 if self.session_token > 4294967295: 3309 print(f"Invalid value for field SessionUpdateControllerMulticastListNtf::session_token: {self.session_token} > 4294967295; the value will be truncated") 3310 self.session_token &= 4294967295 3311 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 3312 if self.remaining_multicast_list_size > 255: 3313 print(f"Invalid value for field SessionUpdateControllerMulticastListNtf::remaining_multicast_list_size: {self.remaining_multicast_list_size} > 255; the value will be truncated") 3314 self.remaining_multicast_list_size &= 255 3315 _span.append((self.remaining_multicast_list_size << 0)) 3316 if len(self.controlee_status) > 255: 3317 print(f"Invalid length for field SessionUpdateControllerMulticastListNtf::controlee_status: {len(self.controlee_status)} > 255; the array will be truncated") 3318 del self.controlee_status[255:] 3319 _span.append((len(self.controlee_status) << 0)) 3320 for _elt in self.controlee_status: 3321 _span.extend(_elt.serialize()) 3322 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 3323 3324 @property 3325 def size(self) -> int: 3326 return sum([elt.size for elt in self.controlee_status]) + 6 3327 3328@dataclass 3329class SessionDataCreditNtf(SessionControlPacket): 3330 session_token: int = field(kw_only=True, default=0) 3331 credit_availability: CreditAvailability = field(kw_only=True, default=CreditAvailability.CREDIT_NOT_AVAILABLE) 3332 3333 def __post_init__(self): 3334 self.mt = MessageType.NOTIFICATION 3335 self.oid = SessionControlOpcodeId.DATA_CREDIT 3336 self.gid = GroupId.SESSION_CONTROL 3337 3338 @staticmethod 3339 def parse(fields: dict, span: bytes) -> Tuple['SessionDataCreditNtf', bytes]: 3340 if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.DATA_CREDIT or fields['gid'] != GroupId.SESSION_CONTROL: 3341 raise Exception("Invalid constraint field values") 3342 if len(span) < 5: 3343 raise Exception('Invalid packet size') 3344 value_ = int.from_bytes(span[0:4], byteorder='little') 3345 fields['session_token'] = value_ 3346 fields['credit_availability'] = CreditAvailability.from_int(span[4]) 3347 span = span[5:] 3348 return SessionDataCreditNtf(**fields), span 3349 3350 def serialize(self, payload: bytes = None) -> bytes: 3351 _span = bytearray() 3352 if self.session_token > 4294967295: 3353 print(f"Invalid value for field SessionDataCreditNtf::session_token: {self.session_token} > 4294967295; the value will be truncated") 3354 self.session_token &= 4294967295 3355 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 3356 _span.append((self.credit_availability << 0)) 3357 return SessionControlPacket.serialize(self, payload = bytes(_span)) 3358 3359 @property 3360 def size(self) -> int: 3361 return 5 3362 3363@dataclass 3364class SessionDataTransferStatusNtf(SessionControlPacket): 3365 session_token: int = field(kw_only=True, default=0) 3366 uci_sequence_number: int = field(kw_only=True, default=0) 3367 status: DataTransferNtfStatusCode = field(kw_only=True, default=DataTransferNtfStatusCode.UCI_DATA_TRANSFER_STATUS_REPETITION_OK) 3368 tx_count: int = field(kw_only=True, default=0) 3369 3370 def __post_init__(self): 3371 self.mt = MessageType.NOTIFICATION 3372 self.oid = SessionControlOpcodeId.DATA_TRANSFER_STATUS 3373 self.gid = GroupId.SESSION_CONTROL 3374 3375 @staticmethod 3376 def parse(fields: dict, span: bytes) -> Tuple['SessionDataTransferStatusNtf', bytes]: 3377 if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.DATA_TRANSFER_STATUS or fields['gid'] != GroupId.SESSION_CONTROL: 3378 raise Exception("Invalid constraint field values") 3379 if len(span) < 7: 3380 raise Exception('Invalid packet size') 3381 value_ = int.from_bytes(span[0:4], byteorder='little') 3382 fields['session_token'] = value_ 3383 fields['uci_sequence_number'] = span[4] 3384 fields['status'] = DataTransferNtfStatusCode.from_int(span[5]) 3385 fields['tx_count'] = span[6] 3386 span = span[7:] 3387 return SessionDataTransferStatusNtf(**fields), span 3388 3389 def serialize(self, payload: bytes = None) -> bytes: 3390 _span = bytearray() 3391 if self.session_token > 4294967295: 3392 print(f"Invalid value for field SessionDataTransferStatusNtf::session_token: {self.session_token} > 4294967295; the value will be truncated") 3393 self.session_token &= 4294967295 3394 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 3395 if self.uci_sequence_number > 255: 3396 print(f"Invalid value for field SessionDataTransferStatusNtf::uci_sequence_number: {self.uci_sequence_number} > 255; the value will be truncated") 3397 self.uci_sequence_number &= 255 3398 _span.append((self.uci_sequence_number << 0)) 3399 _span.append((self.status << 0)) 3400 if self.tx_count > 255: 3401 print(f"Invalid value for field SessionDataTransferStatusNtf::tx_count: {self.tx_count} > 255; the value will be truncated") 3402 self.tx_count &= 255 3403 _span.append((self.tx_count << 0)) 3404 return SessionControlPacket.serialize(self, payload = bytes(_span)) 3405 3406 @property 3407 def size(self) -> int: 3408 return 7 3409 3410@dataclass 3411class SessionQueryMaxDataSizeInRangingCmd(SessionConfigPacket): 3412 session_token: int = field(kw_only=True, default=0) 3413 3414 def __post_init__(self): 3415 self.mt = MessageType.COMMAND 3416 self.oid = SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING 3417 self.gid = GroupId.SESSION_CONFIG 3418 3419 @staticmethod 3420 def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeInRangingCmd', bytes]: 3421 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING or fields['gid'] != GroupId.SESSION_CONFIG: 3422 raise Exception("Invalid constraint field values") 3423 if len(span) < 4: 3424 raise Exception('Invalid packet size') 3425 value_ = int.from_bytes(span[0:4], byteorder='little') 3426 fields['session_token'] = value_ 3427 span = span[4:] 3428 return SessionQueryMaxDataSizeInRangingCmd(**fields), span 3429 3430 def serialize(self, payload: bytes = None) -> bytes: 3431 _span = bytearray() 3432 if self.session_token > 4294967295: 3433 print(f"Invalid value for field SessionQueryMaxDataSizeInRangingCmd::session_token: {self.session_token} > 4294967295; the value will be truncated") 3434 self.session_token &= 4294967295 3435 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 3436 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 3437 3438 @property 3439 def size(self) -> int: 3440 return 4 3441 3442@dataclass 3443class SessionQueryMaxDataSizeInRangingRsp(SessionConfigPacket): 3444 session_token: int = field(kw_only=True, default=0) 3445 max_data_size: int = field(kw_only=True, default=0) 3446 3447 def __post_init__(self): 3448 self.mt = MessageType.RESPONSE 3449 self.oid = SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING 3450 self.gid = GroupId.SESSION_CONFIG 3451 3452 @staticmethod 3453 def parse(fields: dict, span: bytes) -> Tuple['SessionQueryMaxDataSizeInRangingRsp', bytes]: 3454 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionConfigOpcodeId.QUERY_DATA_SIZE_IN_RANGING or fields['gid'] != GroupId.SESSION_CONFIG: 3455 raise Exception("Invalid constraint field values") 3456 if len(span) < 6: 3457 raise Exception('Invalid packet size') 3458 value_ = int.from_bytes(span[0:4], byteorder='little') 3459 fields['session_token'] = value_ 3460 value_ = int.from_bytes(span[4:6], byteorder='little') 3461 fields['max_data_size'] = value_ 3462 span = span[6:] 3463 return SessionQueryMaxDataSizeInRangingRsp(**fields), span 3464 3465 def serialize(self, payload: bytes = None) -> bytes: 3466 _span = bytearray() 3467 if self.session_token > 4294967295: 3468 print(f"Invalid value for field SessionQueryMaxDataSizeInRangingRsp::session_token: {self.session_token} > 4294967295; the value will be truncated") 3469 self.session_token &= 4294967295 3470 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 3471 if self.max_data_size > 65535: 3472 print(f"Invalid value for field SessionQueryMaxDataSizeInRangingRsp::max_data_size: {self.max_data_size} > 65535; the value will be truncated") 3473 self.max_data_size &= 65535 3474 _span.extend(int.to_bytes((self.max_data_size << 0), length=2, byteorder='little')) 3475 return SessionConfigPacket.serialize(self, payload = bytes(_span)) 3476 3477 @property 3478 def size(self) -> int: 3479 return 6 3480 3481@dataclass 3482class SessionStartCmd(SessionControlPacket): 3483 session_id: int = field(kw_only=True, default=0) 3484 3485 def __post_init__(self): 3486 self.mt = MessageType.COMMAND 3487 self.oid = SessionControlOpcodeId.START 3488 self.gid = GroupId.SESSION_CONTROL 3489 3490 @staticmethod 3491 def parse(fields: dict, span: bytes) -> Tuple['SessionStartCmd', bytes]: 3492 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: 3493 raise Exception("Invalid constraint field values") 3494 if len(span) < 4: 3495 raise Exception('Invalid packet size') 3496 value_ = int.from_bytes(span[0:4], byteorder='little') 3497 fields['session_id'] = value_ 3498 span = span[4:] 3499 return SessionStartCmd(**fields), span 3500 3501 def serialize(self, payload: bytes = None) -> bytes: 3502 _span = bytearray() 3503 if self.session_id > 4294967295: 3504 print(f"Invalid value for field SessionStartCmd::session_id: {self.session_id} > 4294967295; the value will be truncated") 3505 self.session_id &= 4294967295 3506 _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little')) 3507 return SessionControlPacket.serialize(self, payload = bytes(_span)) 3508 3509 @property 3510 def size(self) -> int: 3511 return 4 3512 3513@dataclass 3514class SessionStartRsp(SessionControlPacket): 3515 status: Status = field(kw_only=True, default=Status.OK) 3516 3517 def __post_init__(self): 3518 self.mt = MessageType.RESPONSE 3519 self.oid = SessionControlOpcodeId.START 3520 self.gid = GroupId.SESSION_CONTROL 3521 3522 @staticmethod 3523 def parse(fields: dict, span: bytes) -> Tuple['SessionStartRsp', bytes]: 3524 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: 3525 raise Exception("Invalid constraint field values") 3526 if len(span) < 1: 3527 raise Exception('Invalid packet size') 3528 fields['status'] = Status.from_int(span[0]) 3529 span = span[1:] 3530 return SessionStartRsp(**fields), span 3531 3532 def serialize(self, payload: bytes = None) -> bytes: 3533 _span = bytearray() 3534 _span.append((self.status << 0)) 3535 return SessionControlPacket.serialize(self, payload = bytes(_span)) 3536 3537 @property 3538 def size(self) -> int: 3539 return 1 3540 3541@dataclass 3542class ShortAddressTwoWayRangingMeasurement(Packet): 3543 mac_address: int = field(kw_only=True, default=0) 3544 status: Status = field(kw_only=True, default=Status.OK) 3545 nlos: int = field(kw_only=True, default=0) 3546 distance: int = field(kw_only=True, default=0) 3547 aoa_azimuth: int = field(kw_only=True, default=0) 3548 aoa_azimuth_fom: int = field(kw_only=True, default=0) 3549 aoa_elevation: int = field(kw_only=True, default=0) 3550 aoa_elevation_fom: int = field(kw_only=True, default=0) 3551 aoa_destination_azimuth: int = field(kw_only=True, default=0) 3552 aoa_destination_azimuth_fom: int = field(kw_only=True, default=0) 3553 aoa_destination_elevation: int = field(kw_only=True, default=0) 3554 aoa_destination_elevation_fom: int = field(kw_only=True, default=0) 3555 slot_index: int = field(kw_only=True, default=0) 3556 rssi: int = field(kw_only=True, default=0) 3557 3558 def __post_init__(self): 3559 pass 3560 3561 @staticmethod 3562 def parse(span: bytes) -> Tuple['ShortAddressTwoWayRangingMeasurement', bytes]: 3563 fields = {'payload': None} 3564 if len(span) < 31: 3565 raise Exception('Invalid packet size') 3566 value_ = int.from_bytes(span[0:2], byteorder='little') 3567 fields['mac_address'] = value_ 3568 fields['status'] = Status.from_int(span[2]) 3569 fields['nlos'] = span[3] 3570 value_ = int.from_bytes(span[4:6], byteorder='little') 3571 fields['distance'] = value_ 3572 value_ = int.from_bytes(span[6:8], byteorder='little') 3573 fields['aoa_azimuth'] = value_ 3574 fields['aoa_azimuth_fom'] = span[8] 3575 value_ = int.from_bytes(span[9:11], byteorder='little') 3576 fields['aoa_elevation'] = value_ 3577 fields['aoa_elevation_fom'] = span[11] 3578 value_ = int.from_bytes(span[12:14], byteorder='little') 3579 fields['aoa_destination_azimuth'] = value_ 3580 fields['aoa_destination_azimuth_fom'] = span[14] 3581 value_ = int.from_bytes(span[15:17], byteorder='little') 3582 fields['aoa_destination_elevation'] = value_ 3583 fields['aoa_destination_elevation_fom'] = span[17] 3584 fields['slot_index'] = span[18] 3585 fields['rssi'] = span[19] 3586 value_ = int.from_bytes(span[20:28], byteorder='little') 3587 value_ = int.from_bytes(span[28:31], byteorder='little') 3588 span = span[31:] 3589 return ShortAddressTwoWayRangingMeasurement(**fields), span 3590 3591 def serialize(self, payload: bytes = None) -> bytes: 3592 _span = bytearray() 3593 if self.mac_address > 65535: 3594 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::mac_address: {self.mac_address} > 65535; the value will be truncated") 3595 self.mac_address &= 65535 3596 _span.extend(int.to_bytes((self.mac_address << 0), length=2, byteorder='little')) 3597 _span.append((self.status << 0)) 3598 if self.nlos > 255: 3599 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::nlos: {self.nlos} > 255; the value will be truncated") 3600 self.nlos &= 255 3601 _span.append((self.nlos << 0)) 3602 if self.distance > 65535: 3603 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::distance: {self.distance} > 65535; the value will be truncated") 3604 self.distance &= 65535 3605 _span.extend(int.to_bytes((self.distance << 0), length=2, byteorder='little')) 3606 if self.aoa_azimuth > 65535: 3607 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_azimuth: {self.aoa_azimuth} > 65535; the value will be truncated") 3608 self.aoa_azimuth &= 65535 3609 _span.extend(int.to_bytes((self.aoa_azimuth << 0), length=2, byteorder='little')) 3610 if self.aoa_azimuth_fom > 255: 3611 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_azimuth_fom: {self.aoa_azimuth_fom} > 255; the value will be truncated") 3612 self.aoa_azimuth_fom &= 255 3613 _span.append((self.aoa_azimuth_fom << 0)) 3614 if self.aoa_elevation > 65535: 3615 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_elevation: {self.aoa_elevation} > 65535; the value will be truncated") 3616 self.aoa_elevation &= 65535 3617 _span.extend(int.to_bytes((self.aoa_elevation << 0), length=2, byteorder='little')) 3618 if self.aoa_elevation_fom > 255: 3619 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_elevation_fom: {self.aoa_elevation_fom} > 255; the value will be truncated") 3620 self.aoa_elevation_fom &= 255 3621 _span.append((self.aoa_elevation_fom << 0)) 3622 if self.aoa_destination_azimuth > 65535: 3623 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_destination_azimuth: {self.aoa_destination_azimuth} > 65535; the value will be truncated") 3624 self.aoa_destination_azimuth &= 65535 3625 _span.extend(int.to_bytes((self.aoa_destination_azimuth << 0), length=2, byteorder='little')) 3626 if self.aoa_destination_azimuth_fom > 255: 3627 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_destination_azimuth_fom: {self.aoa_destination_azimuth_fom} > 255; the value will be truncated") 3628 self.aoa_destination_azimuth_fom &= 255 3629 _span.append((self.aoa_destination_azimuth_fom << 0)) 3630 if self.aoa_destination_elevation > 65535: 3631 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_destination_elevation: {self.aoa_destination_elevation} > 65535; the value will be truncated") 3632 self.aoa_destination_elevation &= 65535 3633 _span.extend(int.to_bytes((self.aoa_destination_elevation << 0), length=2, byteorder='little')) 3634 if self.aoa_destination_elevation_fom > 255: 3635 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::aoa_destination_elevation_fom: {self.aoa_destination_elevation_fom} > 255; the value will be truncated") 3636 self.aoa_destination_elevation_fom &= 255 3637 _span.append((self.aoa_destination_elevation_fom << 0)) 3638 if self.slot_index > 255: 3639 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::slot_index: {self.slot_index} > 255; the value will be truncated") 3640 self.slot_index &= 255 3641 _span.append((self.slot_index << 0)) 3642 if self.rssi > 255: 3643 print(f"Invalid value for field ShortAddressTwoWayRangingMeasurement::rssi: {self.rssi} > 255; the value will be truncated") 3644 self.rssi &= 255 3645 _span.append((self.rssi << 0)) 3646 _span.extend([0] * 8) 3647 _span.extend([0] * 3) 3648 return bytes(_span) 3649 3650 @property 3651 def size(self) -> int: 3652 return 31 3653 3654@dataclass 3655class ExtendedAddressTwoWayRangingMeasurement(Packet): 3656 mac_address: int = field(kw_only=True, default=0) 3657 status: Status = field(kw_only=True, default=Status.OK) 3658 nlos: int = field(kw_only=True, default=0) 3659 distance: int = field(kw_only=True, default=0) 3660 aoa_azimuth: int = field(kw_only=True, default=0) 3661 aoa_azimuth_fom: int = field(kw_only=True, default=0) 3662 aoa_elevation: int = field(kw_only=True, default=0) 3663 aoa_elevation_fom: int = field(kw_only=True, default=0) 3664 aoa_destination_azimuth: int = field(kw_only=True, default=0) 3665 aoa_destination_azimuth_fom: int = field(kw_only=True, default=0) 3666 aoa_destination_elevation: int = field(kw_only=True, default=0) 3667 aoa_destination_elevation_fom: int = field(kw_only=True, default=0) 3668 slot_index: int = field(kw_only=True, default=0) 3669 rssi: int = field(kw_only=True, default=0) 3670 3671 def __post_init__(self): 3672 pass 3673 3674 @staticmethod 3675 def parse(span: bytes) -> Tuple['ExtendedAddressTwoWayRangingMeasurement', bytes]: 3676 fields = {'payload': None} 3677 if len(span) < 31: 3678 raise Exception('Invalid packet size') 3679 value_ = int.from_bytes(span[0:8], byteorder='little') 3680 fields['mac_address'] = value_ 3681 fields['status'] = Status.from_int(span[8]) 3682 fields['nlos'] = span[9] 3683 value_ = int.from_bytes(span[10:12], byteorder='little') 3684 fields['distance'] = value_ 3685 value_ = int.from_bytes(span[12:14], byteorder='little') 3686 fields['aoa_azimuth'] = value_ 3687 fields['aoa_azimuth_fom'] = span[14] 3688 value_ = int.from_bytes(span[15:17], byteorder='little') 3689 fields['aoa_elevation'] = value_ 3690 fields['aoa_elevation_fom'] = span[17] 3691 value_ = int.from_bytes(span[18:20], byteorder='little') 3692 fields['aoa_destination_azimuth'] = value_ 3693 fields['aoa_destination_azimuth_fom'] = span[20] 3694 value_ = int.from_bytes(span[21:23], byteorder='little') 3695 fields['aoa_destination_elevation'] = value_ 3696 fields['aoa_destination_elevation_fom'] = span[23] 3697 fields['slot_index'] = span[24] 3698 fields['rssi'] = span[25] 3699 value_ = int.from_bytes(span[26:31], byteorder='little') 3700 span = span[31:] 3701 return ExtendedAddressTwoWayRangingMeasurement(**fields), span 3702 3703 def serialize(self, payload: bytes = None) -> bytes: 3704 _span = bytearray() 3705 if self.mac_address > 18446744073709551615: 3706 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::mac_address: {self.mac_address} > 18446744073709551615; the value will be truncated") 3707 self.mac_address &= 18446744073709551615 3708 _span.extend(int.to_bytes((self.mac_address << 0), length=8, byteorder='little')) 3709 _span.append((self.status << 0)) 3710 if self.nlos > 255: 3711 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::nlos: {self.nlos} > 255; the value will be truncated") 3712 self.nlos &= 255 3713 _span.append((self.nlos << 0)) 3714 if self.distance > 65535: 3715 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::distance: {self.distance} > 65535; the value will be truncated") 3716 self.distance &= 65535 3717 _span.extend(int.to_bytes((self.distance << 0), length=2, byteorder='little')) 3718 if self.aoa_azimuth > 65535: 3719 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_azimuth: {self.aoa_azimuth} > 65535; the value will be truncated") 3720 self.aoa_azimuth &= 65535 3721 _span.extend(int.to_bytes((self.aoa_azimuth << 0), length=2, byteorder='little')) 3722 if self.aoa_azimuth_fom > 255: 3723 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_azimuth_fom: {self.aoa_azimuth_fom} > 255; the value will be truncated") 3724 self.aoa_azimuth_fom &= 255 3725 _span.append((self.aoa_azimuth_fom << 0)) 3726 if self.aoa_elevation > 65535: 3727 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_elevation: {self.aoa_elevation} > 65535; the value will be truncated") 3728 self.aoa_elevation &= 65535 3729 _span.extend(int.to_bytes((self.aoa_elevation << 0), length=2, byteorder='little')) 3730 if self.aoa_elevation_fom > 255: 3731 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_elevation_fom: {self.aoa_elevation_fom} > 255; the value will be truncated") 3732 self.aoa_elevation_fom &= 255 3733 _span.append((self.aoa_elevation_fom << 0)) 3734 if self.aoa_destination_azimuth > 65535: 3735 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_destination_azimuth: {self.aoa_destination_azimuth} > 65535; the value will be truncated") 3736 self.aoa_destination_azimuth &= 65535 3737 _span.extend(int.to_bytes((self.aoa_destination_azimuth << 0), length=2, byteorder='little')) 3738 if self.aoa_destination_azimuth_fom > 255: 3739 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_destination_azimuth_fom: {self.aoa_destination_azimuth_fom} > 255; the value will be truncated") 3740 self.aoa_destination_azimuth_fom &= 255 3741 _span.append((self.aoa_destination_azimuth_fom << 0)) 3742 if self.aoa_destination_elevation > 65535: 3743 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_destination_elevation: {self.aoa_destination_elevation} > 65535; the value will be truncated") 3744 self.aoa_destination_elevation &= 65535 3745 _span.extend(int.to_bytes((self.aoa_destination_elevation << 0), length=2, byteorder='little')) 3746 if self.aoa_destination_elevation_fom > 255: 3747 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::aoa_destination_elevation_fom: {self.aoa_destination_elevation_fom} > 255; the value will be truncated") 3748 self.aoa_destination_elevation_fom &= 255 3749 _span.append((self.aoa_destination_elevation_fom << 0)) 3750 if self.slot_index > 255: 3751 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::slot_index: {self.slot_index} > 255; the value will be truncated") 3752 self.slot_index &= 255 3753 _span.append((self.slot_index << 0)) 3754 if self.rssi > 255: 3755 print(f"Invalid value for field ExtendedAddressTwoWayRangingMeasurement::rssi: {self.rssi} > 255; the value will be truncated") 3756 self.rssi &= 255 3757 _span.append((self.rssi << 0)) 3758 _span.extend([0] * 5) 3759 return bytes(_span) 3760 3761 @property 3762 def size(self) -> int: 3763 return 31 3764 3765@dataclass 3766class ShortAddressOwrAoaRangingMeasurement(Packet): 3767 mac_address: int = field(kw_only=True, default=0) 3768 status: Status = field(kw_only=True, default=Status.OK) 3769 nlos: int = field(kw_only=True, default=0) 3770 frame_sequence_number: int = field(kw_only=True, default=0) 3771 block_index: int = field(kw_only=True, default=0) 3772 aoa_azimuth: int = field(kw_only=True, default=0) 3773 aoa_azimuth_fom: int = field(kw_only=True, default=0) 3774 aoa_elevation: int = field(kw_only=True, default=0) 3775 aoa_elevation_fom: int = field(kw_only=True, default=0) 3776 3777 def __post_init__(self): 3778 pass 3779 3780 @staticmethod 3781 def parse(span: bytes) -> Tuple['ShortAddressOwrAoaRangingMeasurement', bytes]: 3782 fields = {'payload': None} 3783 if len(span) < 13: 3784 raise Exception('Invalid packet size') 3785 value_ = int.from_bytes(span[0:2], byteorder='little') 3786 fields['mac_address'] = value_ 3787 fields['status'] = Status.from_int(span[2]) 3788 fields['nlos'] = span[3] 3789 fields['frame_sequence_number'] = span[4] 3790 value_ = int.from_bytes(span[5:7], byteorder='little') 3791 fields['block_index'] = value_ 3792 value_ = int.from_bytes(span[7:9], byteorder='little') 3793 fields['aoa_azimuth'] = value_ 3794 fields['aoa_azimuth_fom'] = span[9] 3795 value_ = int.from_bytes(span[10:12], byteorder='little') 3796 fields['aoa_elevation'] = value_ 3797 fields['aoa_elevation_fom'] = span[12] 3798 span = span[13:] 3799 return ShortAddressOwrAoaRangingMeasurement(**fields), span 3800 3801 def serialize(self, payload: bytes = None) -> bytes: 3802 _span = bytearray() 3803 if self.mac_address > 65535: 3804 print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::mac_address: {self.mac_address} > 65535; the value will be truncated") 3805 self.mac_address &= 65535 3806 _span.extend(int.to_bytes((self.mac_address << 0), length=2, byteorder='little')) 3807 _span.append((self.status << 0)) 3808 if self.nlos > 255: 3809 print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::nlos: {self.nlos} > 255; the value will be truncated") 3810 self.nlos &= 255 3811 _span.append((self.nlos << 0)) 3812 if self.frame_sequence_number > 255: 3813 print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::frame_sequence_number: {self.frame_sequence_number} > 255; the value will be truncated") 3814 self.frame_sequence_number &= 255 3815 _span.append((self.frame_sequence_number << 0)) 3816 if self.block_index > 65535: 3817 print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::block_index: {self.block_index} > 65535; the value will be truncated") 3818 self.block_index &= 65535 3819 _span.extend(int.to_bytes((self.block_index << 0), length=2, byteorder='little')) 3820 if self.aoa_azimuth > 65535: 3821 print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::aoa_azimuth: {self.aoa_azimuth} > 65535; the value will be truncated") 3822 self.aoa_azimuth &= 65535 3823 _span.extend(int.to_bytes((self.aoa_azimuth << 0), length=2, byteorder='little')) 3824 if self.aoa_azimuth_fom > 255: 3825 print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::aoa_azimuth_fom: {self.aoa_azimuth_fom} > 255; the value will be truncated") 3826 self.aoa_azimuth_fom &= 255 3827 _span.append((self.aoa_azimuth_fom << 0)) 3828 if self.aoa_elevation > 65535: 3829 print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::aoa_elevation: {self.aoa_elevation} > 65535; the value will be truncated") 3830 self.aoa_elevation &= 65535 3831 _span.extend(int.to_bytes((self.aoa_elevation << 0), length=2, byteorder='little')) 3832 if self.aoa_elevation_fom > 255: 3833 print(f"Invalid value for field ShortAddressOwrAoaRangingMeasurement::aoa_elevation_fom: {self.aoa_elevation_fom} > 255; the value will be truncated") 3834 self.aoa_elevation_fom &= 255 3835 _span.append((self.aoa_elevation_fom << 0)) 3836 return bytes(_span) 3837 3838 @property 3839 def size(self) -> int: 3840 return 13 3841 3842@dataclass 3843class ExtendedAddressOwrAoaRangingMeasurement(Packet): 3844 mac_address: int = field(kw_only=True, default=0) 3845 status: Status = field(kw_only=True, default=Status.OK) 3846 nlos: int = field(kw_only=True, default=0) 3847 frame_sequence_number: int = field(kw_only=True, default=0) 3848 block_index: int = field(kw_only=True, default=0) 3849 aoa_azimuth: int = field(kw_only=True, default=0) 3850 aoa_azimuth_fom: int = field(kw_only=True, default=0) 3851 aoa_elevation: int = field(kw_only=True, default=0) 3852 aoa_elevation_fom: int = field(kw_only=True, default=0) 3853 3854 def __post_init__(self): 3855 pass 3856 3857 @staticmethod 3858 def parse(span: bytes) -> Tuple['ExtendedAddressOwrAoaRangingMeasurement', bytes]: 3859 fields = {'payload': None} 3860 if len(span) < 19: 3861 raise Exception('Invalid packet size') 3862 value_ = int.from_bytes(span[0:8], byteorder='little') 3863 fields['mac_address'] = value_ 3864 fields['status'] = Status.from_int(span[8]) 3865 fields['nlos'] = span[9] 3866 fields['frame_sequence_number'] = span[10] 3867 value_ = int.from_bytes(span[11:13], byteorder='little') 3868 fields['block_index'] = value_ 3869 value_ = int.from_bytes(span[13:15], byteorder='little') 3870 fields['aoa_azimuth'] = value_ 3871 fields['aoa_azimuth_fom'] = span[15] 3872 value_ = int.from_bytes(span[16:18], byteorder='little') 3873 fields['aoa_elevation'] = value_ 3874 fields['aoa_elevation_fom'] = span[18] 3875 span = span[19:] 3876 return ExtendedAddressOwrAoaRangingMeasurement(**fields), span 3877 3878 def serialize(self, payload: bytes = None) -> bytes: 3879 _span = bytearray() 3880 if self.mac_address > 18446744073709551615: 3881 print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::mac_address: {self.mac_address} > 18446744073709551615; the value will be truncated") 3882 self.mac_address &= 18446744073709551615 3883 _span.extend(int.to_bytes((self.mac_address << 0), length=8, byteorder='little')) 3884 _span.append((self.status << 0)) 3885 if self.nlos > 255: 3886 print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::nlos: {self.nlos} > 255; the value will be truncated") 3887 self.nlos &= 255 3888 _span.append((self.nlos << 0)) 3889 if self.frame_sequence_number > 255: 3890 print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::frame_sequence_number: {self.frame_sequence_number} > 255; the value will be truncated") 3891 self.frame_sequence_number &= 255 3892 _span.append((self.frame_sequence_number << 0)) 3893 if self.block_index > 65535: 3894 print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::block_index: {self.block_index} > 65535; the value will be truncated") 3895 self.block_index &= 65535 3896 _span.extend(int.to_bytes((self.block_index << 0), length=2, byteorder='little')) 3897 if self.aoa_azimuth > 65535: 3898 print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::aoa_azimuth: {self.aoa_azimuth} > 65535; the value will be truncated") 3899 self.aoa_azimuth &= 65535 3900 _span.extend(int.to_bytes((self.aoa_azimuth << 0), length=2, byteorder='little')) 3901 if self.aoa_azimuth_fom > 255: 3902 print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::aoa_azimuth_fom: {self.aoa_azimuth_fom} > 255; the value will be truncated") 3903 self.aoa_azimuth_fom &= 255 3904 _span.append((self.aoa_azimuth_fom << 0)) 3905 if self.aoa_elevation > 65535: 3906 print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::aoa_elevation: {self.aoa_elevation} > 65535; the value will be truncated") 3907 self.aoa_elevation &= 65535 3908 _span.extend(int.to_bytes((self.aoa_elevation << 0), length=2, byteorder='little')) 3909 if self.aoa_elevation_fom > 255: 3910 print(f"Invalid value for field ExtendedAddressOwrAoaRangingMeasurement::aoa_elevation_fom: {self.aoa_elevation_fom} > 255; the value will be truncated") 3911 self.aoa_elevation_fom &= 255 3912 _span.append((self.aoa_elevation_fom << 0)) 3913 return bytes(_span) 3914 3915 @property 3916 def size(self) -> int: 3917 return 19 3918 3919class RangingMeasurementType(enum.IntEnum): 3920 ONE_WAY = 0x0 3921 TWO_WAY = 0x1 3922 DL_TDOA = 0x2 3923 OWR_AOA = 0x3 3924 3925 @staticmethod 3926 def from_int(v: int) -> Union[int, 'RangingMeasurementType']: 3927 try: 3928 return RangingMeasurementType(v) 3929 except ValueError as exn: 3930 raise exn 3931 3932 3933@dataclass 3934class SessionInfoNtf(SessionControlPacket): 3935 sequence_number: int = field(kw_only=True, default=0) 3936 session_token: int = field(kw_only=True, default=0) 3937 rcr_indicator: int = field(kw_only=True, default=0) 3938 current_ranging_interval: int = field(kw_only=True, default=0) 3939 ranging_measurement_type: RangingMeasurementType = field(kw_only=True, default=RangingMeasurementType.ONE_WAY) 3940 mac_address_indicator: MacAddressIndicator = field(kw_only=True, default=MacAddressIndicator.SHORT_ADDRESS) 3941 3942 def __post_init__(self): 3943 self.mt = MessageType.NOTIFICATION 3944 self.oid = SessionControlOpcodeId.START 3945 self.gid = GroupId.SESSION_CONTROL 3946 3947 @staticmethod 3948 def parse(fields: dict, span: bytes) -> Tuple['SessionInfoNtf', bytes]: 3949 if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: 3950 raise Exception("Invalid constraint field values") 3951 if len(span) < 24: 3952 raise Exception('Invalid packet size') 3953 value_ = int.from_bytes(span[0:4], byteorder='little') 3954 fields['sequence_number'] = value_ 3955 value_ = int.from_bytes(span[4:8], byteorder='little') 3956 fields['session_token'] = value_ 3957 fields['rcr_indicator'] = span[8] 3958 value_ = int.from_bytes(span[9:13], byteorder='little') 3959 fields['current_ranging_interval'] = value_ 3960 fields['ranging_measurement_type'] = RangingMeasurementType.from_int(span[13]) 3961 fields['mac_address_indicator'] = MacAddressIndicator.from_int(span[15]) 3962 value_ = int.from_bytes(span[16:24], byteorder='little') 3963 span = span[24:] 3964 payload = span 3965 span = bytes([]) 3966 fields['payload'] = payload 3967 try: 3968 return ShortMacTwoWaySessionInfoNtf.parse(fields.copy(), payload) 3969 except Exception as exn: 3970 pass 3971 try: 3972 return ExtendedMacTwoWaySessionInfoNtf.parse(fields.copy(), payload) 3973 except Exception as exn: 3974 pass 3975 try: 3976 return ShortMacDlTDoASessionInfoNtf.parse(fields.copy(), payload) 3977 except Exception as exn: 3978 pass 3979 try: 3980 return ExtendedMacDlTDoASessionInfoNtf.parse(fields.copy(), payload) 3981 except Exception as exn: 3982 pass 3983 try: 3984 return ShortMacOwrAoaSessionInfoNtf.parse(fields.copy(), payload) 3985 except Exception as exn: 3986 pass 3987 try: 3988 return ExtendedMacOwrAoaSessionInfoNtf.parse(fields.copy(), payload) 3989 except Exception as exn: 3990 pass 3991 return SessionInfoNtf(**fields), span 3992 3993 def serialize(self, payload: bytes = None) -> bytes: 3994 _span = bytearray() 3995 if self.sequence_number > 4294967295: 3996 print(f"Invalid value for field SessionInfoNtf::sequence_number: {self.sequence_number} > 4294967295; the value will be truncated") 3997 self.sequence_number &= 4294967295 3998 _span.extend(int.to_bytes((self.sequence_number << 0), length=4, byteorder='little')) 3999 if self.session_token > 4294967295: 4000 print(f"Invalid value for field SessionInfoNtf::session_token: {self.session_token} > 4294967295; the value will be truncated") 4001 self.session_token &= 4294967295 4002 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 4003 if self.rcr_indicator > 255: 4004 print(f"Invalid value for field SessionInfoNtf::rcr_indicator: {self.rcr_indicator} > 255; the value will be truncated") 4005 self.rcr_indicator &= 255 4006 _span.append((self.rcr_indicator << 0)) 4007 if self.current_ranging_interval > 4294967295: 4008 print(f"Invalid value for field SessionInfoNtf::current_ranging_interval: {self.current_ranging_interval} > 4294967295; the value will be truncated") 4009 self.current_ranging_interval &= 4294967295 4010 _span.extend(int.to_bytes((self.current_ranging_interval << 0), length=4, byteorder='little')) 4011 _span.append((self.ranging_measurement_type << 0)) 4012 _span.extend([0] * 1) 4013 _span.append((self.mac_address_indicator << 0)) 4014 _span.extend([0] * 8) 4015 _span.extend(payload or self.payload or []) 4016 return SessionControlPacket.serialize(self, payload = bytes(_span)) 4017 4018 @property 4019 def size(self) -> int: 4020 return len(self.payload) + 24 4021 4022@dataclass 4023class ShortMacTwoWaySessionInfoNtf(SessionInfoNtf): 4024 two_way_ranging_measurements: List[ShortAddressTwoWayRangingMeasurement] = field(kw_only=True, default_factory=list) 4025 vendor_data: bytearray = field(kw_only=True, default_factory=bytearray) 4026 4027 def __post_init__(self): 4028 self.ranging_measurement_type = RangingMeasurementType.TWO_WAY 4029 self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS 4030 self.mt = MessageType.NOTIFICATION 4031 self.oid = SessionControlOpcodeId.START 4032 self.gid = GroupId.SESSION_CONTROL 4033 4034 @staticmethod 4035 def parse(fields: dict, span: bytes) -> Tuple['ShortMacTwoWaySessionInfoNtf', bytes]: 4036 if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: 4037 raise Exception("Invalid constraint field values") 4038 if len(span) < 1: 4039 raise Exception('Invalid packet size') 4040 two_way_ranging_measurements_count = span[0] 4041 span = span[1:] 4042 if len(span) < two_way_ranging_measurements_count * 31: 4043 raise Exception('Invalid packet size') 4044 two_way_ranging_measurements = [] 4045 for n in range(two_way_ranging_measurements_count): 4046 two_way_ranging_measurements.append(ShortAddressTwoWayRangingMeasurement.parse_all(span[n * 31:(n + 1) * 31])) 4047 fields['two_way_ranging_measurements'] = two_way_ranging_measurements 4048 span = span[two_way_ranging_measurements_count * 31:] 4049 fields['vendor_data'] = list(span) 4050 span = bytes() 4051 return ShortMacTwoWaySessionInfoNtf(**fields), span 4052 4053 def serialize(self, payload: bytes = None) -> bytes: 4054 _span = bytearray() 4055 if len(self.two_way_ranging_measurements) > 255: 4056 print(f"Invalid length for field ShortMacTwoWaySessionInfoNtf::two_way_ranging_measurements: {len(self.two_way_ranging_measurements)} > 255; the array will be truncated") 4057 del self.two_way_ranging_measurements[255:] 4058 _span.append((len(self.two_way_ranging_measurements) << 0)) 4059 for _elt in self.two_way_ranging_measurements: 4060 _span.extend(_elt.serialize()) 4061 _span.extend(self.vendor_data) 4062 return SessionInfoNtf.serialize(self, payload = bytes(_span)) 4063 4064 @property 4065 def size(self) -> int: 4066 return 1 + ( 4067 sum([elt.size for elt in self.two_way_ranging_measurements]) + 4068 len(self.vendor_data) * 1 4069 ) 4070 4071@dataclass 4072class ExtendedMacTwoWaySessionInfoNtf(SessionInfoNtf): 4073 two_way_ranging_measurements: List[ExtendedAddressTwoWayRangingMeasurement] = field(kw_only=True, default_factory=list) 4074 vendor_data: bytearray = field(kw_only=True, default_factory=bytearray) 4075 4076 def __post_init__(self): 4077 self.ranging_measurement_type = RangingMeasurementType.TWO_WAY 4078 self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS 4079 self.mt = MessageType.NOTIFICATION 4080 self.oid = SessionControlOpcodeId.START 4081 self.gid = GroupId.SESSION_CONTROL 4082 4083 @staticmethod 4084 def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacTwoWaySessionInfoNtf', bytes]: 4085 if fields['ranging_measurement_type'] != RangingMeasurementType.TWO_WAY or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: 4086 raise Exception("Invalid constraint field values") 4087 if len(span) < 1: 4088 raise Exception('Invalid packet size') 4089 two_way_ranging_measurements_count = span[0] 4090 span = span[1:] 4091 if len(span) < two_way_ranging_measurements_count * 31: 4092 raise Exception('Invalid packet size') 4093 two_way_ranging_measurements = [] 4094 for n in range(two_way_ranging_measurements_count): 4095 two_way_ranging_measurements.append(ExtendedAddressTwoWayRangingMeasurement.parse_all(span[n * 31:(n + 1) * 31])) 4096 fields['two_way_ranging_measurements'] = two_way_ranging_measurements 4097 span = span[two_way_ranging_measurements_count * 31:] 4098 fields['vendor_data'] = list(span) 4099 span = bytes() 4100 return ExtendedMacTwoWaySessionInfoNtf(**fields), span 4101 4102 def serialize(self, payload: bytes = None) -> bytes: 4103 _span = bytearray() 4104 if len(self.two_way_ranging_measurements) > 255: 4105 print(f"Invalid length for field ExtendedMacTwoWaySessionInfoNtf::two_way_ranging_measurements: {len(self.two_way_ranging_measurements)} > 255; the array will be truncated") 4106 del self.two_way_ranging_measurements[255:] 4107 _span.append((len(self.two_way_ranging_measurements) << 0)) 4108 for _elt in self.two_way_ranging_measurements: 4109 _span.extend(_elt.serialize()) 4110 _span.extend(self.vendor_data) 4111 return SessionInfoNtf.serialize(self, payload = bytes(_span)) 4112 4113 @property 4114 def size(self) -> int: 4115 return 1 + ( 4116 sum([elt.size for elt in self.two_way_ranging_measurements]) + 4117 len(self.vendor_data) * 1 4118 ) 4119 4120@dataclass 4121class ShortMacDlTDoASessionInfoNtf(SessionInfoNtf): 4122 no_of_ranging_measurements: int = field(kw_only=True, default=0) 4123 dl_tdoa_measurements: bytearray = field(kw_only=True, default_factory=bytearray) 4124 4125 def __post_init__(self): 4126 self.ranging_measurement_type = RangingMeasurementType.DL_TDOA 4127 self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS 4128 self.mt = MessageType.NOTIFICATION 4129 self.oid = SessionControlOpcodeId.START 4130 self.gid = GroupId.SESSION_CONTROL 4131 4132 @staticmethod 4133 def parse(fields: dict, span: bytes) -> Tuple['ShortMacDlTDoASessionInfoNtf', bytes]: 4134 if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: 4135 raise Exception("Invalid constraint field values") 4136 if len(span) < 1: 4137 raise Exception('Invalid packet size') 4138 fields['no_of_ranging_measurements'] = span[0] 4139 span = span[1:] 4140 fields['dl_tdoa_measurements'] = list(span) 4141 span = bytes() 4142 return ShortMacDlTDoASessionInfoNtf(**fields), span 4143 4144 def serialize(self, payload: bytes = None) -> bytes: 4145 _span = bytearray() 4146 if self.no_of_ranging_measurements > 255: 4147 print(f"Invalid value for field ShortMacDlTDoASessionInfoNtf::no_of_ranging_measurements: {self.no_of_ranging_measurements} > 255; the value will be truncated") 4148 self.no_of_ranging_measurements &= 255 4149 _span.append((self.no_of_ranging_measurements << 0)) 4150 _span.extend(self.dl_tdoa_measurements) 4151 return SessionInfoNtf.serialize(self, payload = bytes(_span)) 4152 4153 @property 4154 def size(self) -> int: 4155 return len(self.dl_tdoa_measurements) * 1 + 1 4156 4157@dataclass 4158class ExtendedMacDlTDoASessionInfoNtf(SessionInfoNtf): 4159 no_of_ranging_measurements: int = field(kw_only=True, default=0) 4160 dl_tdoa_measurements: bytearray = field(kw_only=True, default_factory=bytearray) 4161 4162 def __post_init__(self): 4163 self.ranging_measurement_type = RangingMeasurementType.DL_TDOA 4164 self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS 4165 self.mt = MessageType.NOTIFICATION 4166 self.oid = SessionControlOpcodeId.START 4167 self.gid = GroupId.SESSION_CONTROL 4168 4169 @staticmethod 4170 def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacDlTDoASessionInfoNtf', bytes]: 4171 if fields['ranging_measurement_type'] != RangingMeasurementType.DL_TDOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: 4172 raise Exception("Invalid constraint field values") 4173 if len(span) < 1: 4174 raise Exception('Invalid packet size') 4175 fields['no_of_ranging_measurements'] = span[0] 4176 span = span[1:] 4177 fields['dl_tdoa_measurements'] = list(span) 4178 span = bytes() 4179 return ExtendedMacDlTDoASessionInfoNtf(**fields), span 4180 4181 def serialize(self, payload: bytes = None) -> bytes: 4182 _span = bytearray() 4183 if self.no_of_ranging_measurements > 255: 4184 print(f"Invalid value for field ExtendedMacDlTDoASessionInfoNtf::no_of_ranging_measurements: {self.no_of_ranging_measurements} > 255; the value will be truncated") 4185 self.no_of_ranging_measurements &= 255 4186 _span.append((self.no_of_ranging_measurements << 0)) 4187 _span.extend(self.dl_tdoa_measurements) 4188 return SessionInfoNtf.serialize(self, payload = bytes(_span)) 4189 4190 @property 4191 def size(self) -> int: 4192 return len(self.dl_tdoa_measurements) * 1 + 1 4193 4194@dataclass 4195class ShortMacOwrAoaSessionInfoNtf(SessionInfoNtf): 4196 owr_aoa_ranging_measurements: List[ShortAddressOwrAoaRangingMeasurement] = field(kw_only=True, default_factory=list) 4197 vendor_data: bytearray = field(kw_only=True, default_factory=bytearray) 4198 4199 def __post_init__(self): 4200 self.ranging_measurement_type = RangingMeasurementType.OWR_AOA 4201 self.mac_address_indicator = MacAddressIndicator.SHORT_ADDRESS 4202 self.mt = MessageType.NOTIFICATION 4203 self.oid = SessionControlOpcodeId.START 4204 self.gid = GroupId.SESSION_CONTROL 4205 4206 @staticmethod 4207 def parse(fields: dict, span: bytes) -> Tuple['ShortMacOwrAoaSessionInfoNtf', bytes]: 4208 if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.SHORT_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: 4209 raise Exception("Invalid constraint field values") 4210 if len(span) < 1: 4211 raise Exception('Invalid packet size') 4212 owr_aoa_ranging_measurements_count = span[0] 4213 span = span[1:] 4214 if len(span) < owr_aoa_ranging_measurements_count * 13: 4215 raise Exception('Invalid packet size') 4216 owr_aoa_ranging_measurements = [] 4217 for n in range(owr_aoa_ranging_measurements_count): 4218 owr_aoa_ranging_measurements.append(ShortAddressOwrAoaRangingMeasurement.parse_all(span[n * 13:(n + 1) * 13])) 4219 fields['owr_aoa_ranging_measurements'] = owr_aoa_ranging_measurements 4220 span = span[owr_aoa_ranging_measurements_count * 13:] 4221 fields['vendor_data'] = list(span) 4222 span = bytes() 4223 return ShortMacOwrAoaSessionInfoNtf(**fields), span 4224 4225 def serialize(self, payload: bytes = None) -> bytes: 4226 _span = bytearray() 4227 if len(self.owr_aoa_ranging_measurements) > 255: 4228 print(f"Invalid length for field ShortMacOwrAoaSessionInfoNtf::owr_aoa_ranging_measurements: {len(self.owr_aoa_ranging_measurements)} > 255; the array will be truncated") 4229 del self.owr_aoa_ranging_measurements[255:] 4230 _span.append((len(self.owr_aoa_ranging_measurements) << 0)) 4231 for _elt in self.owr_aoa_ranging_measurements: 4232 _span.extend(_elt.serialize()) 4233 _span.extend(self.vendor_data) 4234 return SessionInfoNtf.serialize(self, payload = bytes(_span)) 4235 4236 @property 4237 def size(self) -> int: 4238 return 1 + ( 4239 sum([elt.size for elt in self.owr_aoa_ranging_measurements]) + 4240 len(self.vendor_data) * 1 4241 ) 4242 4243@dataclass 4244class ExtendedMacOwrAoaSessionInfoNtf(SessionInfoNtf): 4245 owr_aoa_ranging_measurements: List[ExtendedAddressOwrAoaRangingMeasurement] = field(kw_only=True, default_factory=list) 4246 vendor_data: bytearray = field(kw_only=True, default_factory=bytearray) 4247 4248 def __post_init__(self): 4249 self.ranging_measurement_type = RangingMeasurementType.OWR_AOA 4250 self.mac_address_indicator = MacAddressIndicator.EXTENDED_ADDRESS 4251 self.mt = MessageType.NOTIFICATION 4252 self.oid = SessionControlOpcodeId.START 4253 self.gid = GroupId.SESSION_CONTROL 4254 4255 @staticmethod 4256 def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacOwrAoaSessionInfoNtf', bytes]: 4257 if fields['ranging_measurement_type'] != RangingMeasurementType.OWR_AOA or fields['mac_address_indicator'] != MacAddressIndicator.EXTENDED_ADDRESS or fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != SessionControlOpcodeId.START or fields['gid'] != GroupId.SESSION_CONTROL: 4258 raise Exception("Invalid constraint field values") 4259 if len(span) < 1: 4260 raise Exception('Invalid packet size') 4261 owr_aoa_ranging_measurements_count = span[0] 4262 span = span[1:] 4263 if len(span) < owr_aoa_ranging_measurements_count * 19: 4264 raise Exception('Invalid packet size') 4265 owr_aoa_ranging_measurements = [] 4266 for n in range(owr_aoa_ranging_measurements_count): 4267 owr_aoa_ranging_measurements.append(ExtendedAddressOwrAoaRangingMeasurement.parse_all(span[n * 19:(n + 1) * 19])) 4268 fields['owr_aoa_ranging_measurements'] = owr_aoa_ranging_measurements 4269 span = span[owr_aoa_ranging_measurements_count * 19:] 4270 fields['vendor_data'] = list(span) 4271 span = bytes() 4272 return ExtendedMacOwrAoaSessionInfoNtf(**fields), span 4273 4274 def serialize(self, payload: bytes = None) -> bytes: 4275 _span = bytearray() 4276 if len(self.owr_aoa_ranging_measurements) > 255: 4277 print(f"Invalid length for field ExtendedMacOwrAoaSessionInfoNtf::owr_aoa_ranging_measurements: {len(self.owr_aoa_ranging_measurements)} > 255; the array will be truncated") 4278 del self.owr_aoa_ranging_measurements[255:] 4279 _span.append((len(self.owr_aoa_ranging_measurements) << 0)) 4280 for _elt in self.owr_aoa_ranging_measurements: 4281 _span.extend(_elt.serialize()) 4282 _span.extend(self.vendor_data) 4283 return SessionInfoNtf.serialize(self, payload = bytes(_span)) 4284 4285 @property 4286 def size(self) -> int: 4287 return 1 + ( 4288 sum([elt.size for elt in self.owr_aoa_ranging_measurements]) + 4289 len(self.vendor_data) * 1 4290 ) 4291 4292@dataclass 4293class SessionStopCmd(SessionControlPacket): 4294 session_id: int = field(kw_only=True, default=0) 4295 4296 def __post_init__(self): 4297 self.mt = MessageType.COMMAND 4298 self.oid = SessionControlOpcodeId.STOP 4299 self.gid = GroupId.SESSION_CONTROL 4300 4301 @staticmethod 4302 def parse(fields: dict, span: bytes) -> Tuple['SessionStopCmd', bytes]: 4303 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.STOP or fields['gid'] != GroupId.SESSION_CONTROL: 4304 raise Exception("Invalid constraint field values") 4305 if len(span) < 4: 4306 raise Exception('Invalid packet size') 4307 value_ = int.from_bytes(span[0:4], byteorder='little') 4308 fields['session_id'] = value_ 4309 span = span[4:] 4310 return SessionStopCmd(**fields), span 4311 4312 def serialize(self, payload: bytes = None) -> bytes: 4313 _span = bytearray() 4314 if self.session_id > 4294967295: 4315 print(f"Invalid value for field SessionStopCmd::session_id: {self.session_id} > 4294967295; the value will be truncated") 4316 self.session_id &= 4294967295 4317 _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little')) 4318 return SessionControlPacket.serialize(self, payload = bytes(_span)) 4319 4320 @property 4321 def size(self) -> int: 4322 return 4 4323 4324@dataclass 4325class SessionStopRsp(SessionControlPacket): 4326 status: Status = field(kw_only=True, default=Status.OK) 4327 4328 def __post_init__(self): 4329 self.mt = MessageType.RESPONSE 4330 self.oid = SessionControlOpcodeId.STOP 4331 self.gid = GroupId.SESSION_CONTROL 4332 4333 @staticmethod 4334 def parse(fields: dict, span: bytes) -> Tuple['SessionStopRsp', bytes]: 4335 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.STOP or fields['gid'] != GroupId.SESSION_CONTROL: 4336 raise Exception("Invalid constraint field values") 4337 if len(span) < 1: 4338 raise Exception('Invalid packet size') 4339 fields['status'] = Status.from_int(span[0]) 4340 span = span[1:] 4341 return SessionStopRsp(**fields), span 4342 4343 def serialize(self, payload: bytes = None) -> bytes: 4344 _span = bytearray() 4345 _span.append((self.status << 0)) 4346 return SessionControlPacket.serialize(self, payload = bytes(_span)) 4347 4348 @property 4349 def size(self) -> int: 4350 return 1 4351 4352@dataclass 4353class SessionGetRangingCountCmd(SessionControlPacket): 4354 session_id: int = field(kw_only=True, default=0) 4355 4356 def __post_init__(self): 4357 self.mt = MessageType.COMMAND 4358 self.oid = SessionControlOpcodeId.GET_RANGING_COUNT 4359 self.gid = GroupId.SESSION_CONTROL 4360 4361 @staticmethod 4362 def parse(fields: dict, span: bytes) -> Tuple['SessionGetRangingCountCmd', bytes]: 4363 if fields['mt'] != MessageType.COMMAND or fields['oid'] != SessionControlOpcodeId.GET_RANGING_COUNT or fields['gid'] != GroupId.SESSION_CONTROL: 4364 raise Exception("Invalid constraint field values") 4365 if len(span) < 4: 4366 raise Exception('Invalid packet size') 4367 value_ = int.from_bytes(span[0:4], byteorder='little') 4368 fields['session_id'] = value_ 4369 span = span[4:] 4370 return SessionGetRangingCountCmd(**fields), span 4371 4372 def serialize(self, payload: bytes = None) -> bytes: 4373 _span = bytearray() 4374 if self.session_id > 4294967295: 4375 print(f"Invalid value for field SessionGetRangingCountCmd::session_id: {self.session_id} > 4294967295; the value will be truncated") 4376 self.session_id &= 4294967295 4377 _span.extend(int.to_bytes((self.session_id << 0), length=4, byteorder='little')) 4378 return SessionControlPacket.serialize(self, payload = bytes(_span)) 4379 4380 @property 4381 def size(self) -> int: 4382 return 4 4383 4384@dataclass 4385class SessionGetRangingCountRsp(SessionControlPacket): 4386 status: Status = field(kw_only=True, default=Status.OK) 4387 count: int = field(kw_only=True, default=0) 4388 4389 def __post_init__(self): 4390 self.mt = MessageType.RESPONSE 4391 self.oid = SessionControlOpcodeId.GET_RANGING_COUNT 4392 self.gid = GroupId.SESSION_CONTROL 4393 4394 @staticmethod 4395 def parse(fields: dict, span: bytes) -> Tuple['SessionGetRangingCountRsp', bytes]: 4396 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != SessionControlOpcodeId.GET_RANGING_COUNT or fields['gid'] != GroupId.SESSION_CONTROL: 4397 raise Exception("Invalid constraint field values") 4398 if len(span) < 5: 4399 raise Exception('Invalid packet size') 4400 fields['status'] = Status.from_int(span[0]) 4401 value_ = int.from_bytes(span[1:5], byteorder='little') 4402 fields['count'] = value_ 4403 span = span[5:] 4404 return SessionGetRangingCountRsp(**fields), span 4405 4406 def serialize(self, payload: bytes = None) -> bytes: 4407 _span = bytearray() 4408 _span.append((self.status << 0)) 4409 if self.count > 4294967295: 4410 print(f"Invalid value for field SessionGetRangingCountRsp::count: {self.count} > 4294967295; the value will be truncated") 4411 self.count &= 4294967295 4412 _span.extend(int.to_bytes((self.count << 0), length=4, byteorder='little')) 4413 return SessionControlPacket.serialize(self, payload = bytes(_span)) 4414 4415 @property 4416 def size(self) -> int: 4417 return 5 4418 4419@dataclass 4420class AndroidGetPowerStatsCmd(AndroidPacket): 4421 4422 4423 def __post_init__(self): 4424 self.mt = MessageType.COMMAND 4425 self.oid = AndroidOpcodeId.GET_POWER_STATS 4426 self.gid = GroupId.VENDOR_ANDROID 4427 4428 @staticmethod 4429 def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsCmd', bytes]: 4430 if fields['mt'] != MessageType.COMMAND or fields['oid'] != AndroidOpcodeId.GET_POWER_STATS or fields['gid'] != GroupId.VENDOR_ANDROID: 4431 raise Exception("Invalid constraint field values") 4432 return AndroidGetPowerStatsCmd(**fields), span 4433 4434 def serialize(self, payload: bytes = None) -> bytes: 4435 _span = bytearray() 4436 return AndroidPacket.serialize(self, payload = bytes(_span)) 4437 4438 @property 4439 def size(self) -> int: 4440 return 0 4441 4442@dataclass 4443class PowerStats(Packet): 4444 status: Status = field(kw_only=True, default=Status.OK) 4445 idle_time_ms: int = field(kw_only=True, default=0) 4446 tx_time_ms: int = field(kw_only=True, default=0) 4447 rx_time_ms: int = field(kw_only=True, default=0) 4448 total_wake_count: int = field(kw_only=True, default=0) 4449 4450 def __post_init__(self): 4451 pass 4452 4453 @staticmethod 4454 def parse(span: bytes) -> Tuple['PowerStats', bytes]: 4455 fields = {'payload': None} 4456 if len(span) < 17: 4457 raise Exception('Invalid packet size') 4458 fields['status'] = Status.from_int(span[0]) 4459 value_ = int.from_bytes(span[1:5], byteorder='little') 4460 fields['idle_time_ms'] = value_ 4461 value_ = int.from_bytes(span[5:9], byteorder='little') 4462 fields['tx_time_ms'] = value_ 4463 value_ = int.from_bytes(span[9:13], byteorder='little') 4464 fields['rx_time_ms'] = value_ 4465 value_ = int.from_bytes(span[13:17], byteorder='little') 4466 fields['total_wake_count'] = value_ 4467 span = span[17:] 4468 return PowerStats(**fields), span 4469 4470 def serialize(self, payload: bytes = None) -> bytes: 4471 _span = bytearray() 4472 _span.append((self.status << 0)) 4473 if self.idle_time_ms > 4294967295: 4474 print(f"Invalid value for field PowerStats::idle_time_ms: {self.idle_time_ms} > 4294967295; the value will be truncated") 4475 self.idle_time_ms &= 4294967295 4476 _span.extend(int.to_bytes((self.idle_time_ms << 0), length=4, byteorder='little')) 4477 if self.tx_time_ms > 4294967295: 4478 print(f"Invalid value for field PowerStats::tx_time_ms: {self.tx_time_ms} > 4294967295; the value will be truncated") 4479 self.tx_time_ms &= 4294967295 4480 _span.extend(int.to_bytes((self.tx_time_ms << 0), length=4, byteorder='little')) 4481 if self.rx_time_ms > 4294967295: 4482 print(f"Invalid value for field PowerStats::rx_time_ms: {self.rx_time_ms} > 4294967295; the value will be truncated") 4483 self.rx_time_ms &= 4294967295 4484 _span.extend(int.to_bytes((self.rx_time_ms << 0), length=4, byteorder='little')) 4485 if self.total_wake_count > 4294967295: 4486 print(f"Invalid value for field PowerStats::total_wake_count: {self.total_wake_count} > 4294967295; the value will be truncated") 4487 self.total_wake_count &= 4294967295 4488 _span.extend(int.to_bytes((self.total_wake_count << 0), length=4, byteorder='little')) 4489 return bytes(_span) 4490 4491 @property 4492 def size(self) -> int: 4493 return 17 4494 4495@dataclass 4496class AndroidGetPowerStatsRsp(AndroidPacket): 4497 stats: PowerStats = field(kw_only=True, default_factory=PowerStats) 4498 4499 def __post_init__(self): 4500 self.mt = MessageType.RESPONSE 4501 self.oid = AndroidOpcodeId.GET_POWER_STATS 4502 self.gid = GroupId.VENDOR_ANDROID 4503 4504 @staticmethod 4505 def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsRsp', bytes]: 4506 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != AndroidOpcodeId.GET_POWER_STATS or fields['gid'] != GroupId.VENDOR_ANDROID: 4507 raise Exception("Invalid constraint field values") 4508 if len(span) < 17: 4509 raise Exception('Invalid packet size') 4510 fields['stats'] = PowerStats.parse_all(span[0:17]) 4511 span = span[17:] 4512 return AndroidGetPowerStatsRsp(**fields), span 4513 4514 def serialize(self, payload: bytes = None) -> bytes: 4515 _span = bytearray() 4516 _span.extend(self.stats.serialize()) 4517 return AndroidPacket.serialize(self, payload = bytes(_span)) 4518 4519 @property 4520 def size(self) -> int: 4521 return 17 4522 4523@dataclass 4524class AndroidSetCountryCodeCmd(AndroidPacket): 4525 country_code: bytearray = field(kw_only=True, default_factory=bytearray) 4526 4527 def __post_init__(self): 4528 self.mt = MessageType.COMMAND 4529 self.oid = AndroidOpcodeId.SET_COUNTRY_CODE 4530 self.gid = GroupId.VENDOR_ANDROID 4531 4532 @staticmethod 4533 def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeCmd', bytes]: 4534 if fields['mt'] != MessageType.COMMAND or fields['oid'] != AndroidOpcodeId.SET_COUNTRY_CODE or fields['gid'] != GroupId.VENDOR_ANDROID: 4535 raise Exception("Invalid constraint field values") 4536 if len(span) < 2: 4537 raise Exception('Invalid packet size') 4538 fields['country_code'] = list(span[:2]) 4539 span = span[2:] 4540 return AndroidSetCountryCodeCmd(**fields), span 4541 4542 def serialize(self, payload: bytes = None) -> bytes: 4543 _span = bytearray() 4544 _span.extend(self.country_code) 4545 return AndroidPacket.serialize(self, payload = bytes(_span)) 4546 4547 @property 4548 def size(self) -> int: 4549 return 2 4550 4551@dataclass 4552class AndroidSetCountryCodeRsp(AndroidPacket): 4553 status: Status = field(kw_only=True, default=Status.OK) 4554 4555 def __post_init__(self): 4556 self.mt = MessageType.RESPONSE 4557 self.oid = AndroidOpcodeId.SET_COUNTRY_CODE 4558 self.gid = GroupId.VENDOR_ANDROID 4559 4560 @staticmethod 4561 def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeRsp', bytes]: 4562 if fields['mt'] != MessageType.RESPONSE or fields['oid'] != AndroidOpcodeId.SET_COUNTRY_CODE or fields['gid'] != GroupId.VENDOR_ANDROID: 4563 raise Exception("Invalid constraint field values") 4564 if len(span) < 1: 4565 raise Exception('Invalid packet size') 4566 fields['status'] = Status.from_int(span[0]) 4567 span = span[1:] 4568 return AndroidSetCountryCodeRsp(**fields), span 4569 4570 def serialize(self, payload: bytes = None) -> bytes: 4571 _span = bytearray() 4572 _span.append((self.status << 0)) 4573 return AndroidPacket.serialize(self, payload = bytes(_span)) 4574 4575 @property 4576 def size(self) -> int: 4577 return 1 4578 4579class FrameReportTlvType(enum.IntEnum): 4580 RSSI = 0x0 4581 AOA = 0x1 4582 CIR = 0x2 4583 4584 @staticmethod 4585 def from_int(v: int) -> Union[int, 'FrameReportTlvType']: 4586 try: 4587 return FrameReportTlvType(v) 4588 except ValueError as exn: 4589 raise exn 4590 4591 4592@dataclass 4593class FrameReportTlv(Packet): 4594 t: FrameReportTlvType = field(kw_only=True, default=FrameReportTlvType.RSSI) 4595 v: bytearray = field(kw_only=True, default_factory=bytearray) 4596 4597 def __post_init__(self): 4598 pass 4599 4600 @staticmethod 4601 def parse(span: bytes) -> Tuple['FrameReportTlv', bytes]: 4602 fields = {'payload': None} 4603 if len(span) < 3: 4604 raise Exception('Invalid packet size') 4605 fields['t'] = FrameReportTlvType.from_int(span[0]) 4606 value_ = int.from_bytes(span[1:3], byteorder='little') 4607 v_size = value_ 4608 span = span[3:] 4609 if len(span) < v_size: 4610 raise Exception('Invalid packet size') 4611 fields['v'] = list(span[:v_size]) 4612 span = span[v_size:] 4613 return FrameReportTlv(**fields), span 4614 4615 def serialize(self, payload: bytes = None) -> bytes: 4616 _span = bytearray() 4617 _span.append((self.t << 0)) 4618 _span.extend(int.to_bytes(((len(self.v) * 1) << 0), length=2, byteorder='little')) 4619 _span.extend(self.v) 4620 return bytes(_span) 4621 4622 @property 4623 def size(self) -> int: 4624 return len(self.v) * 1 + 3 4625 4626@dataclass 4627class FrameReportTlvPacket(Packet): 4628 t: FrameReportTlvType = field(kw_only=True, default=FrameReportTlvType.RSSI) 4629 4630 def __post_init__(self): 4631 pass 4632 4633 @staticmethod 4634 def parse(span: bytes) -> Tuple['FrameReportTlvPacket', bytes]: 4635 fields = {'payload': None} 4636 if len(span) < 3: 4637 raise Exception('Invalid packet size') 4638 fields['t'] = FrameReportTlvType.from_int(span[0]) 4639 value_ = int.from_bytes(span[1:3], byteorder='little') 4640 _body__size = value_ 4641 span = span[3:] 4642 if len(span) < _body__size: 4643 raise Exception('Invalid packet size') 4644 payload = span[:_body__size] 4645 span = span[_body__size:] 4646 fields['payload'] = payload 4647 try: 4648 return Rssi.parse(fields.copy(), payload) 4649 except Exception as exn: 4650 pass 4651 try: 4652 return Aoa.parse(fields.copy(), payload) 4653 except Exception as exn: 4654 pass 4655 try: 4656 return Cir.parse(fields.copy(), payload) 4657 except Exception as exn: 4658 pass 4659 return FrameReportTlvPacket(**fields), span 4660 4661 def serialize(self, payload: bytes = None) -> bytes: 4662 _span = bytearray() 4663 _span.append((self.t << 0)) 4664 _payload_size = len(payload or self.payload or []) 4665 if _payload_size > 65535: 4666 print(f"Invalid length for payload field: {_payload_size} > 65535; the packet cannot be generated") 4667 raise Exception("Invalid payload length") 4668 _span.extend(int.to_bytes((_payload_size << 0), length=2, byteorder='little')) 4669 _span.extend(payload or self.payload or []) 4670 return bytes(_span) 4671 4672 @property 4673 def size(self) -> int: 4674 return len(self.payload) + 3 4675 4676@dataclass 4677class Rssi(FrameReportTlvPacket): 4678 rssi: bytearray = field(kw_only=True, default_factory=bytearray) 4679 4680 def __post_init__(self): 4681 self.t = FrameReportTlvType.RSSI 4682 4683 @staticmethod 4684 def parse(fields: dict, span: bytes) -> Tuple['Rssi', bytes]: 4685 if fields['t'] != FrameReportTlvType.RSSI: 4686 raise Exception("Invalid constraint field values") 4687 fields['rssi'] = list(span) 4688 span = bytes() 4689 return Rssi(**fields), span 4690 4691 def serialize(self, payload: bytes = None) -> bytes: 4692 _span = bytearray() 4693 _span.extend(self.rssi) 4694 return FrameReportTlvPacket.serialize(self, payload = bytes(_span)) 4695 4696 @property 4697 def size(self) -> int: 4698 return len(self.rssi) * 1 4699 4700@dataclass 4701class AoaMeasurement(Packet): 4702 tdoa: int = field(kw_only=True, default=0) 4703 pdoa: int = field(kw_only=True, default=0) 4704 aoa: int = field(kw_only=True, default=0) 4705 fom: int = field(kw_only=True, default=0) 4706 t: int = field(kw_only=True, default=0) 4707 4708 def __post_init__(self): 4709 pass 4710 4711 @staticmethod 4712 def parse(span: bytes) -> Tuple['AoaMeasurement', bytes]: 4713 fields = {'payload': None} 4714 if len(span) < 8: 4715 raise Exception('Invalid packet size') 4716 value_ = int.from_bytes(span[0:2], byteorder='little') 4717 fields['tdoa'] = value_ 4718 value_ = int.from_bytes(span[2:4], byteorder='little') 4719 fields['pdoa'] = value_ 4720 value_ = int.from_bytes(span[4:6], byteorder='little') 4721 fields['aoa'] = value_ 4722 fields['fom'] = span[6] 4723 fields['t'] = span[7] 4724 span = span[8:] 4725 return AoaMeasurement(**fields), span 4726 4727 def serialize(self, payload: bytes = None) -> bytes: 4728 _span = bytearray() 4729 if self.tdoa > 65535: 4730 print(f"Invalid value for field AoaMeasurement::tdoa: {self.tdoa} > 65535; the value will be truncated") 4731 self.tdoa &= 65535 4732 _span.extend(int.to_bytes((self.tdoa << 0), length=2, byteorder='little')) 4733 if self.pdoa > 65535: 4734 print(f"Invalid value for field AoaMeasurement::pdoa: {self.pdoa} > 65535; the value will be truncated") 4735 self.pdoa &= 65535 4736 _span.extend(int.to_bytes((self.pdoa << 0), length=2, byteorder='little')) 4737 if self.aoa > 65535: 4738 print(f"Invalid value for field AoaMeasurement::aoa: {self.aoa} > 65535; the value will be truncated") 4739 self.aoa &= 65535 4740 _span.extend(int.to_bytes((self.aoa << 0), length=2, byteorder='little')) 4741 if self.fom > 255: 4742 print(f"Invalid value for field AoaMeasurement::fom: {self.fom} > 255; the value will be truncated") 4743 self.fom &= 255 4744 _span.append((self.fom << 0)) 4745 if self.t > 255: 4746 print(f"Invalid value for field AoaMeasurement::t: {self.t} > 255; the value will be truncated") 4747 self.t &= 255 4748 _span.append((self.t << 0)) 4749 return bytes(_span) 4750 4751 @property 4752 def size(self) -> int: 4753 return 8 4754 4755@dataclass 4756class Aoa(FrameReportTlvPacket): 4757 aoa: List[AoaMeasurement] = field(kw_only=True, default_factory=list) 4758 4759 def __post_init__(self): 4760 self.t = FrameReportTlvType.AOA 4761 4762 @staticmethod 4763 def parse(fields: dict, span: bytes) -> Tuple['Aoa', bytes]: 4764 if fields['t'] != FrameReportTlvType.AOA: 4765 raise Exception("Invalid constraint field values") 4766 if len(span) % 8 != 0: 4767 raise Exception('Array size is not a multiple of the element size') 4768 aoa_count = int(len(span) / 8) 4769 aoa = [] 4770 for n in range(aoa_count): 4771 aoa.append(AoaMeasurement.parse_all(span[n * 8:(n + 1) * 8])) 4772 fields['aoa'] = aoa 4773 span = bytes() 4774 return Aoa(**fields), span 4775 4776 def serialize(self, payload: bytes = None) -> bytes: 4777 _span = bytearray() 4778 for _elt in self.aoa: 4779 _span.extend(_elt.serialize()) 4780 return FrameReportTlvPacket.serialize(self, payload = bytes(_span)) 4781 4782 @property 4783 def size(self) -> int: 4784 return sum([elt.size for elt in self.aoa]) 4785 4786@dataclass 4787class CirValue(Packet): 4788 first_path_index: int = field(kw_only=True, default=0) 4789 first_path_snr: int = field(kw_only=True, default=0) 4790 first_path_ns: int = field(kw_only=True, default=0) 4791 peak_path_index: int = field(kw_only=True, default=0) 4792 peak_path_snr: int = field(kw_only=True, default=0) 4793 peak_path_ns: int = field(kw_only=True, default=0) 4794 first_path_sample_offset: int = field(kw_only=True, default=0) 4795 samples_number: int = field(kw_only=True, default=0) 4796 sample_window: bytearray = field(kw_only=True, default_factory=bytearray) 4797 4798 def __post_init__(self): 4799 pass 4800 4801 @staticmethod 4802 def parse(span: bytes) -> Tuple['CirValue', bytes]: 4803 fields = {'payload': None} 4804 if len(span) < 16: 4805 raise Exception('Invalid packet size') 4806 value_ = int.from_bytes(span[0:2], byteorder='little') 4807 fields['first_path_index'] = value_ 4808 value_ = int.from_bytes(span[2:4], byteorder='little') 4809 fields['first_path_snr'] = value_ 4810 value_ = int.from_bytes(span[4:6], byteorder='little') 4811 fields['first_path_ns'] = value_ 4812 value_ = int.from_bytes(span[6:8], byteorder='little') 4813 fields['peak_path_index'] = value_ 4814 value_ = int.from_bytes(span[8:10], byteorder='little') 4815 fields['peak_path_snr'] = value_ 4816 value_ = int.from_bytes(span[10:12], byteorder='little') 4817 fields['peak_path_ns'] = value_ 4818 fields['first_path_sample_offset'] = span[12] 4819 fields['samples_number'] = span[13] 4820 value_ = int.from_bytes(span[14:16], byteorder='little') 4821 sample_window_size = value_ 4822 span = span[16:] 4823 if len(span) < sample_window_size: 4824 raise Exception('Invalid packet size') 4825 fields['sample_window'] = list(span[:sample_window_size]) 4826 span = span[sample_window_size:] 4827 return CirValue(**fields), span 4828 4829 def serialize(self, payload: bytes = None) -> bytes: 4830 _span = bytearray() 4831 if self.first_path_index > 65535: 4832 print(f"Invalid value for field CirValue::first_path_index: {self.first_path_index} > 65535; the value will be truncated") 4833 self.first_path_index &= 65535 4834 _span.extend(int.to_bytes((self.first_path_index << 0), length=2, byteorder='little')) 4835 if self.first_path_snr > 65535: 4836 print(f"Invalid value for field CirValue::first_path_snr: {self.first_path_snr} > 65535; the value will be truncated") 4837 self.first_path_snr &= 65535 4838 _span.extend(int.to_bytes((self.first_path_snr << 0), length=2, byteorder='little')) 4839 if self.first_path_ns > 65535: 4840 print(f"Invalid value for field CirValue::first_path_ns: {self.first_path_ns} > 65535; the value will be truncated") 4841 self.first_path_ns &= 65535 4842 _span.extend(int.to_bytes((self.first_path_ns << 0), length=2, byteorder='little')) 4843 if self.peak_path_index > 65535: 4844 print(f"Invalid value for field CirValue::peak_path_index: {self.peak_path_index} > 65535; the value will be truncated") 4845 self.peak_path_index &= 65535 4846 _span.extend(int.to_bytes((self.peak_path_index << 0), length=2, byteorder='little')) 4847 if self.peak_path_snr > 65535: 4848 print(f"Invalid value for field CirValue::peak_path_snr: {self.peak_path_snr} > 65535; the value will be truncated") 4849 self.peak_path_snr &= 65535 4850 _span.extend(int.to_bytes((self.peak_path_snr << 0), length=2, byteorder='little')) 4851 if self.peak_path_ns > 65535: 4852 print(f"Invalid value for field CirValue::peak_path_ns: {self.peak_path_ns} > 65535; the value will be truncated") 4853 self.peak_path_ns &= 65535 4854 _span.extend(int.to_bytes((self.peak_path_ns << 0), length=2, byteorder='little')) 4855 if self.first_path_sample_offset > 255: 4856 print(f"Invalid value for field CirValue::first_path_sample_offset: {self.first_path_sample_offset} > 255; the value will be truncated") 4857 self.first_path_sample_offset &= 255 4858 _span.append((self.first_path_sample_offset << 0)) 4859 if self.samples_number > 255: 4860 print(f"Invalid value for field CirValue::samples_number: {self.samples_number} > 255; the value will be truncated") 4861 self.samples_number &= 255 4862 _span.append((self.samples_number << 0)) 4863 _span.extend(int.to_bytes(((len(self.sample_window) * 1) << 0), length=2, byteorder='little')) 4864 _span.extend(self.sample_window) 4865 return bytes(_span) 4866 4867 @property 4868 def size(self) -> int: 4869 return len(self.sample_window) * 1 + 16 4870 4871@dataclass 4872class Cir(FrameReportTlvPacket): 4873 cir_value: List[CirValue] = field(kw_only=True, default_factory=list) 4874 4875 def __post_init__(self): 4876 self.t = FrameReportTlvType.CIR 4877 4878 @staticmethod 4879 def parse(fields: dict, span: bytes) -> Tuple['Cir', bytes]: 4880 if fields['t'] != FrameReportTlvType.CIR: 4881 raise Exception("Invalid constraint field values") 4882 if len(span) < 1: 4883 raise Exception('Invalid packet size') 4884 cir_value_count = span[0] 4885 span = span[1:] 4886 cir_value = [] 4887 for n in range(cir_value_count): 4888 element, span = CirValue.parse(span) 4889 cir_value.append(element) 4890 fields['cir_value'] = cir_value 4891 return Cir(**fields), span 4892 4893 def serialize(self, payload: bytes = None) -> bytes: 4894 _span = bytearray() 4895 if len(self.cir_value) > 255: 4896 print(f"Invalid length for field Cir::cir_value: {len(self.cir_value)} > 255; the array will be truncated") 4897 del self.cir_value[255:] 4898 _span.append((len(self.cir_value) << 0)) 4899 for _elt in self.cir_value: 4900 _span.extend(_elt.serialize()) 4901 return FrameReportTlvPacket.serialize(self, payload = bytes(_span)) 4902 4903 @property 4904 def size(self) -> int: 4905 return sum([elt.size for elt in self.cir_value]) + 1 4906 4907@dataclass 4908class FrameReport(Packet): 4909 uwb_msg_id: int = field(kw_only=True, default=0) 4910 action: int = field(kw_only=True, default=0) 4911 antenna_set: int = field(kw_only=True, default=0) 4912 frame_report_tlvs: List[FrameReportTlv] = field(kw_only=True, default_factory=list) 4913 4914 def __post_init__(self): 4915 pass 4916 4917 @staticmethod 4918 def parse(span: bytes) -> Tuple['FrameReport', bytes]: 4919 fields = {'payload': None} 4920 if len(span) < 4: 4921 raise Exception('Invalid packet size') 4922 fields['uwb_msg_id'] = span[0] 4923 fields['action'] = span[1] 4924 fields['antenna_set'] = span[2] 4925 frame_report_tlvs_count = span[3] 4926 span = span[4:] 4927 frame_report_tlvs = [] 4928 for n in range(frame_report_tlvs_count): 4929 element, span = FrameReportTlv.parse(span) 4930 frame_report_tlvs.append(element) 4931 fields['frame_report_tlvs'] = frame_report_tlvs 4932 return FrameReport(**fields), span 4933 4934 def serialize(self, payload: bytes = None) -> bytes: 4935 _span = bytearray() 4936 if self.uwb_msg_id > 255: 4937 print(f"Invalid value for field FrameReport::uwb_msg_id: {self.uwb_msg_id} > 255; the value will be truncated") 4938 self.uwb_msg_id &= 255 4939 _span.append((self.uwb_msg_id << 0)) 4940 if self.action > 255: 4941 print(f"Invalid value for field FrameReport::action: {self.action} > 255; the value will be truncated") 4942 self.action &= 255 4943 _span.append((self.action << 0)) 4944 if self.antenna_set > 255: 4945 print(f"Invalid value for field FrameReport::antenna_set: {self.antenna_set} > 255; the value will be truncated") 4946 self.antenna_set &= 255 4947 _span.append((self.antenna_set << 0)) 4948 if len(self.frame_report_tlvs) > 255: 4949 print(f"Invalid length for field FrameReport::frame_report_tlvs: {len(self.frame_report_tlvs)} > 255; the array will be truncated") 4950 del self.frame_report_tlvs[255:] 4951 _span.append((len(self.frame_report_tlvs) << 0)) 4952 for _elt in self.frame_report_tlvs: 4953 _span.extend(_elt.serialize()) 4954 return bytes(_span) 4955 4956 @property 4957 def size(self) -> int: 4958 return sum([elt.size for elt in self.frame_report_tlvs]) + 4 4959 4960@dataclass 4961class AndroidRangeDiagnosticsNtf(AndroidPacket): 4962 session_token: int = field(kw_only=True, default=0) 4963 sequence_number: int = field(kw_only=True, default=0) 4964 frame_reports: List[FrameReport] = field(kw_only=True, default_factory=list) 4965 4966 def __post_init__(self): 4967 self.mt = MessageType.NOTIFICATION 4968 self.oid = AndroidOpcodeId.FIRA_RANGE_DIAGNOSTICS 4969 self.gid = GroupId.VENDOR_ANDROID 4970 4971 @staticmethod 4972 def parse(fields: dict, span: bytes) -> Tuple['AndroidRangeDiagnosticsNtf', bytes]: 4973 if fields['mt'] != MessageType.NOTIFICATION or fields['oid'] != AndroidOpcodeId.FIRA_RANGE_DIAGNOSTICS or fields['gid'] != GroupId.VENDOR_ANDROID: 4974 raise Exception("Invalid constraint field values") 4975 if len(span) < 9: 4976 raise Exception('Invalid packet size') 4977 value_ = int.from_bytes(span[0:4], byteorder='little') 4978 fields['session_token'] = value_ 4979 value_ = int.from_bytes(span[4:8], byteorder='little') 4980 fields['sequence_number'] = value_ 4981 frame_reports_count = span[8] 4982 span = span[9:] 4983 frame_reports = [] 4984 for n in range(frame_reports_count): 4985 element, span = FrameReport.parse(span) 4986 frame_reports.append(element) 4987 fields['frame_reports'] = frame_reports 4988 return AndroidRangeDiagnosticsNtf(**fields), span 4989 4990 def serialize(self, payload: bytes = None) -> bytes: 4991 _span = bytearray() 4992 if self.session_token > 4294967295: 4993 print(f"Invalid value for field AndroidRangeDiagnosticsNtf::session_token: {self.session_token} > 4294967295; the value will be truncated") 4994 self.session_token &= 4294967295 4995 _span.extend(int.to_bytes((self.session_token << 0), length=4, byteorder='little')) 4996 if self.sequence_number > 4294967295: 4997 print(f"Invalid value for field AndroidRangeDiagnosticsNtf::sequence_number: {self.sequence_number} > 4294967295; the value will be truncated") 4998 self.sequence_number &= 4294967295 4999 _span.extend(int.to_bytes((self.sequence_number << 0), length=4, byteorder='little')) 5000 if len(self.frame_reports) > 255: 5001 print(f"Invalid length for field AndroidRangeDiagnosticsNtf::frame_reports: {len(self.frame_reports)} > 255; the array will be truncated") 5002 del self.frame_reports[255:] 5003 _span.append((len(self.frame_reports) << 0)) 5004 for _elt in self.frame_reports: 5005 _span.extend(_elt.serialize()) 5006 return AndroidPacket.serialize(self, payload = bytes(_span)) 5007 5008 @property 5009 def size(self) -> int: 5010 return sum([elt.size for elt in self.frame_reports]) + 9 5011