1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16# -----------------------------------------------------------------------------
17# Imports
18# -----------------------------------------------------------------------------
19from enum import IntEnum
20import struct
21
22from bumble import core
23from ..gatt_client import ProfileServiceProxy
24from ..att import ATT_Error
25from ..gatt import (
26    GATT_HEART_RATE_SERVICE,
27    GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
28    GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
29    GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
30    TemplateService,
31    Characteristic,
32    CharacteristicValue,
33    DelegatedCharacteristicAdapter,
34    PackedCharacteristicAdapter,
35)
36
37
38# -----------------------------------------------------------------------------
39class HeartRateService(TemplateService):
40    UUID = GATT_HEART_RATE_SERVICE
41    HEART_RATE_CONTROL_POINT_FORMAT = 'B'
42    CONTROL_POINT_NOT_SUPPORTED = 0x80
43    RESET_ENERGY_EXPENDED = 0x01
44
45    class BodySensorLocation(IntEnum):
46        OTHER = 0
47        CHEST = 1
48        WRIST = 2
49        FINGER = 3
50        HAND = 4
51        EAR_LOBE = 5
52        FOOT = 6
53
54    class HeartRateMeasurement:
55        def __init__(
56            self,
57            heart_rate,
58            sensor_contact_detected=None,
59            energy_expended=None,
60            rr_intervals=None,
61        ):
62            if heart_rate < 0 or heart_rate > 0xFFFF:
63                raise core.InvalidArgumentError('heart_rate out of range')
64
65            if energy_expended is not None and (
66                energy_expended < 0 or energy_expended > 0xFFFF
67            ):
68                raise core.InvalidArgumentError('energy_expended out of range')
69
70            if rr_intervals:
71                for rr_interval in rr_intervals:
72                    if rr_interval < 0 or rr_interval * 1024 > 0xFFFF:
73                        raise core.InvalidArgumentError('rr_intervals out of range')
74
75            self.heart_rate = heart_rate
76            self.sensor_contact_detected = sensor_contact_detected
77            self.energy_expended = energy_expended
78            self.rr_intervals = rr_intervals
79
80        @classmethod
81        def from_bytes(cls, data):
82            flags = data[0]
83            offset = 1
84
85            if flags & 1:
86                hr = struct.unpack_from('<H', data, offset)[0]
87                offset += 2
88            else:
89                hr = struct.unpack_from('B', data, offset)[0]
90                offset += 1
91
92            if flags & (1 << 2):
93                sensor_contact_detected = flags & (1 << 1) != 0
94            else:
95                sensor_contact_detected = None
96
97            if flags & (1 << 3):
98                energy_expended = struct.unpack_from('<H', data, offset)[0]
99                offset += 2
100            else:
101                energy_expended = None
102
103            if flags & (1 << 4):
104                rr_intervals = tuple(
105                    struct.unpack_from('<H', data, offset + i * 2)[0] / 1024
106                    for i in range((len(data) - offset) // 2)
107                )
108            else:
109                rr_intervals = ()
110
111            return cls(hr, sensor_contact_detected, energy_expended, rr_intervals)
112
113        def __bytes__(self):
114            if self.heart_rate < 256:
115                flags = 0
116                data = struct.pack('B', self.heart_rate)
117            else:
118                flags = 1
119                data = struct.pack('<H', self.heart_rate)
120
121            if self.sensor_contact_detected is not None:
122                flags |= ((1 if self.sensor_contact_detected else 0) << 1) | (1 << 2)
123
124            if self.energy_expended is not None:
125                flags |= 1 << 3
126                data += struct.pack('<H', self.energy_expended)
127
128            if self.rr_intervals:
129                flags |= 1 << 4
130                data += b''.join(
131                    [
132                        struct.pack('<H', int(rr_interval * 1024))
133                        for rr_interval in self.rr_intervals
134                    ]
135                )
136
137            return bytes([flags]) + data
138
139        def __str__(self):
140            return (
141                f'HeartRateMeasurement(heart_rate={self.heart_rate},'
142                f' sensor_contact_detected={self.sensor_contact_detected},'
143                f' energy_expended={self.energy_expended},'
144                f' rr_intervals={self.rr_intervals})'
145            )
146
147    def __init__(
148        self,
149        read_heart_rate_measurement,
150        body_sensor_location=None,
151        reset_energy_expended=None,
152    ):
153        self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter(
154            Characteristic(
155                GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
156                Characteristic.Properties.NOTIFY,
157                0,
158                CharacteristicValue(read=read_heart_rate_measurement),
159            ),
160            # pylint: disable=unnecessary-lambda
161            encode=lambda value: bytes(value),
162        )
163        characteristics = [self.heart_rate_measurement_characteristic]
164
165        if body_sensor_location is not None:
166            self.body_sensor_location_characteristic = Characteristic(
167                GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
168                Characteristic.Properties.READ,
169                Characteristic.READABLE,
170                bytes([int(body_sensor_location)]),
171            )
172            characteristics.append(self.body_sensor_location_characteristic)
173
174        if reset_energy_expended:
175
176            def write_heart_rate_control_point_value(connection, value):
177                if value == self.RESET_ENERGY_EXPENDED:
178                    if reset_energy_expended is not None:
179                        reset_energy_expended(connection)
180                else:
181                    raise ATT_Error(self.CONTROL_POINT_NOT_SUPPORTED)
182
183            self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter(
184                Characteristic(
185                    GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
186                    Characteristic.Properties.WRITE,
187                    Characteristic.WRITEABLE,
188                    CharacteristicValue(write=write_heart_rate_control_point_value),
189                ),
190                pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT,
191            )
192            characteristics.append(self.heart_rate_control_point_characteristic)
193
194        super().__init__(characteristics)
195
196
197# -----------------------------------------------------------------------------
198class HeartRateServiceProxy(ProfileServiceProxy):
199    SERVICE_CLASS = HeartRateService
200
201    def __init__(self, service_proxy):
202        self.service_proxy = service_proxy
203
204        if characteristics := service_proxy.get_characteristics_by_uuid(
205            GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC
206        ):
207            self.heart_rate_measurement = DelegatedCharacteristicAdapter(
208                characteristics[0],
209                decode=HeartRateService.HeartRateMeasurement.from_bytes,
210            )
211        else:
212            self.heart_rate_measurement = None
213
214        if characteristics := service_proxy.get_characteristics_by_uuid(
215            GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC
216        ):
217            self.body_sensor_location = DelegatedCharacteristicAdapter(
218                characteristics[0],
219                decode=lambda value: HeartRateService.BodySensorLocation(value[0]),
220            )
221        else:
222            self.body_sensor_location = None
223
224        if characteristics := service_proxy.get_characteristics_by_uuid(
225            GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC
226        ):
227            self.heart_rate_control_point = PackedCharacteristicAdapter(
228                characteristics[0],
229                pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT,
230            )
231        else:
232            self.heart_rate_control_point = None
233
234    async def reset_energy_expended(self):
235        if self.heart_rate_control_point is not None:
236            return await self.heart_rate_control_point.write_value(
237                HeartRateService.RESET_ENERGY_EXPENDED
238            )
239