1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16# ----------------------------------------------------------------------------- 17# Imports 18# ----------------------------------------------------------------------------- 19from enum import IntEnum 20import struct 21 22from bumble import core 23from ..gatt_client import ProfileServiceProxy 24from ..att import ATT_Error 25from ..gatt import ( 26 GATT_HEART_RATE_SERVICE, 27 GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, 28 GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC, 29 GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC, 30 TemplateService, 31 Characteristic, 32 CharacteristicValue, 33 DelegatedCharacteristicAdapter, 34 PackedCharacteristicAdapter, 35) 36 37 38# ----------------------------------------------------------------------------- 39class HeartRateService(TemplateService): 40 UUID = GATT_HEART_RATE_SERVICE 41 HEART_RATE_CONTROL_POINT_FORMAT = 'B' 42 CONTROL_POINT_NOT_SUPPORTED = 0x80 43 RESET_ENERGY_EXPENDED = 0x01 44 45 class BodySensorLocation(IntEnum): 46 OTHER = 0 47 CHEST = 1 48 WRIST = 2 49 FINGER = 3 50 HAND = 4 51 EAR_LOBE = 5 52 FOOT = 6 53 54 class HeartRateMeasurement: 55 def __init__( 56 self, 57 heart_rate, 58 sensor_contact_detected=None, 59 energy_expended=None, 60 rr_intervals=None, 61 ): 62 if heart_rate < 0 or heart_rate > 0xFFFF: 63 raise core.InvalidArgumentError('heart_rate out of range') 64 65 if energy_expended is not None and ( 66 energy_expended < 0 or energy_expended > 0xFFFF 67 ): 68 raise core.InvalidArgumentError('energy_expended out of range') 69 70 if rr_intervals: 71 for rr_interval in rr_intervals: 72 if rr_interval < 0 or rr_interval * 1024 > 0xFFFF: 73 raise core.InvalidArgumentError('rr_intervals out of range') 74 75 self.heart_rate = heart_rate 76 self.sensor_contact_detected = sensor_contact_detected 77 self.energy_expended = energy_expended 78 self.rr_intervals = rr_intervals 79 80 @classmethod 81 def from_bytes(cls, data): 82 flags = data[0] 83 offset = 1 84 85 if flags & 1: 86 hr = struct.unpack_from('<H', data, offset)[0] 87 offset += 2 88 else: 89 hr = struct.unpack_from('B', data, offset)[0] 90 offset += 1 91 92 if flags & (1 << 2): 93 sensor_contact_detected = flags & (1 << 1) != 0 94 else: 95 sensor_contact_detected = None 96 97 if flags & (1 << 3): 98 energy_expended = struct.unpack_from('<H', data, offset)[0] 99 offset += 2 100 else: 101 energy_expended = None 102 103 if flags & (1 << 4): 104 rr_intervals = tuple( 105 struct.unpack_from('<H', data, offset + i * 2)[0] / 1024 106 for i in range((len(data) - offset) // 2) 107 ) 108 else: 109 rr_intervals = () 110 111 return cls(hr, sensor_contact_detected, energy_expended, rr_intervals) 112 113 def __bytes__(self): 114 if self.heart_rate < 256: 115 flags = 0 116 data = struct.pack('B', self.heart_rate) 117 else: 118 flags = 1 119 data = struct.pack('<H', self.heart_rate) 120 121 if self.sensor_contact_detected is not None: 122 flags |= ((1 if self.sensor_contact_detected else 0) << 1) | (1 << 2) 123 124 if self.energy_expended is not None: 125 flags |= 1 << 3 126 data += struct.pack('<H', self.energy_expended) 127 128 if self.rr_intervals: 129 flags |= 1 << 4 130 data += b''.join( 131 [ 132 struct.pack('<H', int(rr_interval * 1024)) 133 for rr_interval in self.rr_intervals 134 ] 135 ) 136 137 return bytes([flags]) + data 138 139 def __str__(self): 140 return ( 141 f'HeartRateMeasurement(heart_rate={self.heart_rate},' 142 f' sensor_contact_detected={self.sensor_contact_detected},' 143 f' energy_expended={self.energy_expended},' 144 f' rr_intervals={self.rr_intervals})' 145 ) 146 147 def __init__( 148 self, 149 read_heart_rate_measurement, 150 body_sensor_location=None, 151 reset_energy_expended=None, 152 ): 153 self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter( 154 Characteristic( 155 GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, 156 Characteristic.Properties.NOTIFY, 157 0, 158 CharacteristicValue(read=read_heart_rate_measurement), 159 ), 160 # pylint: disable=unnecessary-lambda 161 encode=lambda value: bytes(value), 162 ) 163 characteristics = [self.heart_rate_measurement_characteristic] 164 165 if body_sensor_location is not None: 166 self.body_sensor_location_characteristic = Characteristic( 167 GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC, 168 Characteristic.Properties.READ, 169 Characteristic.READABLE, 170 bytes([int(body_sensor_location)]), 171 ) 172 characteristics.append(self.body_sensor_location_characteristic) 173 174 if reset_energy_expended: 175 176 def write_heart_rate_control_point_value(connection, value): 177 if value == self.RESET_ENERGY_EXPENDED: 178 if reset_energy_expended is not None: 179 reset_energy_expended(connection) 180 else: 181 raise ATT_Error(self.CONTROL_POINT_NOT_SUPPORTED) 182 183 self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter( 184 Characteristic( 185 GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC, 186 Characteristic.Properties.WRITE, 187 Characteristic.WRITEABLE, 188 CharacteristicValue(write=write_heart_rate_control_point_value), 189 ), 190 pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT, 191 ) 192 characteristics.append(self.heart_rate_control_point_characteristic) 193 194 super().__init__(characteristics) 195 196 197# ----------------------------------------------------------------------------- 198class HeartRateServiceProxy(ProfileServiceProxy): 199 SERVICE_CLASS = HeartRateService 200 201 def __init__(self, service_proxy): 202 self.service_proxy = service_proxy 203 204 if characteristics := service_proxy.get_characteristics_by_uuid( 205 GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC 206 ): 207 self.heart_rate_measurement = DelegatedCharacteristicAdapter( 208 characteristics[0], 209 decode=HeartRateService.HeartRateMeasurement.from_bytes, 210 ) 211 else: 212 self.heart_rate_measurement = None 213 214 if characteristics := service_proxy.get_characteristics_by_uuid( 215 GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC 216 ): 217 self.body_sensor_location = DelegatedCharacteristicAdapter( 218 characteristics[0], 219 decode=lambda value: HeartRateService.BodySensorLocation(value[0]), 220 ) 221 else: 222 self.body_sensor_location = None 223 224 if characteristics := service_proxy.get_characteristics_by_uuid( 225 GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC 226 ): 227 self.heart_rate_control_point = PackedCharacteristicAdapter( 228 characteristics[0], 229 pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT, 230 ) 231 else: 232 self.heart_rate_control_point = None 233 234 async def reset_energy_expended(self): 235 if self.heart_rate_control_point is not None: 236 return await self.heart_rate_control_point.write_value( 237 HeartRateService.RESET_ENERGY_EXPENDED 238 ) 239