1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import enum
20from dataclasses import dataclass
21from typing import Optional, Tuple
22
23from .hci import (
24    Address,
25    HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
26    HCI_DISPLAY_ONLY_IO_CAPABILITY,
27    HCI_DISPLAY_YES_NO_IO_CAPABILITY,
28    HCI_KEYBOARD_ONLY_IO_CAPABILITY,
29)
30from .smp import (
31    SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
32    SMP_KEYBOARD_ONLY_IO_CAPABILITY,
33    SMP_DISPLAY_ONLY_IO_CAPABILITY,
34    SMP_DISPLAY_YES_NO_IO_CAPABILITY,
35    SMP_KEYBOARD_DISPLAY_IO_CAPABILITY,
36    SMP_ENC_KEY_DISTRIBUTION_FLAG,
37    SMP_ID_KEY_DISTRIBUTION_FLAG,
38    SMP_SIGN_KEY_DISTRIBUTION_FLAG,
39    SMP_LINK_KEY_DISTRIBUTION_FLAG,
40    OobContext,
41    OobLegacyContext,
42    OobSharedData,
43)
44from .core import AdvertisingData, LeRole
45
46
47# -----------------------------------------------------------------------------
48@dataclass
49class OobData:
50    """OOB data that can be sent from one device to another."""
51
52    address: Optional[Address] = None
53    role: Optional[LeRole] = None
54    shared_data: Optional[OobSharedData] = None
55    legacy_context: Optional[OobLegacyContext] = None
56
57    @classmethod
58    def from_ad(cls, ad: AdvertisingData) -> OobData:
59        instance = cls()
60        shared_data_c: Optional[bytes] = None
61        shared_data_r: Optional[bytes] = None
62        for ad_type, ad_data in ad.ad_structures:
63            if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS:
64                instance.address = Address(ad_data)
65            elif ad_type == AdvertisingData.LE_ROLE:
66                instance.role = LeRole(ad_data[0])
67            elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE:
68                shared_data_c = ad_data
69            elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE:
70                shared_data_r = ad_data
71            elif ad_type == AdvertisingData.SECURITY_MANAGER_TK_VALUE:
72                instance.legacy_context = OobLegacyContext(tk=ad_data)
73        if shared_data_c and shared_data_r:
74            instance.shared_data = OobSharedData(c=shared_data_c, r=shared_data_r)
75
76        return instance
77
78    def to_ad(self) -> AdvertisingData:
79        ad_structures = []
80        if self.address is not None:
81            ad_structures.append(
82                (AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS, bytes(self.address))
83            )
84        if self.role is not None:
85            ad_structures.append((AdvertisingData.LE_ROLE, bytes([self.role])))
86        if self.shared_data is not None:
87            ad_structures.extend(self.shared_data.to_ad().ad_structures)
88        if self.legacy_context is not None:
89            ad_structures.append(
90                (AdvertisingData.SECURITY_MANAGER_TK_VALUE, self.legacy_context.tk)
91            )
92
93        return AdvertisingData(ad_structures)
94
95
96# -----------------------------------------------------------------------------
97class PairingDelegate:
98    """Abstract base class for Pairing Delegates."""
99
100    # I/O Capabilities.
101    # These are defined abstractly, and can be mapped to specific Classic pairing
102    # and/or SMP constants.
103    class IoCapability(enum.IntEnum):
104        NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
105        KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
106        DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
107        DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
108        DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
109
110    # Direct names for backward compatibility.
111    NO_OUTPUT_NO_INPUT = IoCapability.NO_OUTPUT_NO_INPUT
112    KEYBOARD_INPUT_ONLY = IoCapability.KEYBOARD_INPUT_ONLY
113    DISPLAY_OUTPUT_ONLY = IoCapability.DISPLAY_OUTPUT_ONLY
114    DISPLAY_OUTPUT_AND_YES_NO_INPUT = IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT
115    DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = IoCapability.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
116
117    # Key Distribution [LE only]
118    class KeyDistribution(enum.IntFlag):
119        DISTRIBUTE_ENCRYPTION_KEY = SMP_ENC_KEY_DISTRIBUTION_FLAG
120        DISTRIBUTE_IDENTITY_KEY = SMP_ID_KEY_DISTRIBUTION_FLAG
121        DISTRIBUTE_SIGNING_KEY = SMP_SIGN_KEY_DISTRIBUTION_FLAG
122        DISTRIBUTE_LINK_KEY = SMP_LINK_KEY_DISTRIBUTION_FLAG
123
124    DEFAULT_KEY_DISTRIBUTION: KeyDistribution = (
125        KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY
126        | KeyDistribution.DISTRIBUTE_IDENTITY_KEY
127    )
128
129    # Default mapping from abstract to Classic I/O capabilities.
130    # Subclasses may override this if they prefer a different mapping.
131    CLASSIC_IO_CAPABILITIES_MAP = {
132        NO_OUTPUT_NO_INPUT: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
133        KEYBOARD_INPUT_ONLY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
134        DISPLAY_OUTPUT_ONLY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
135        DISPLAY_OUTPUT_AND_YES_NO_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
136        DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
137    }
138
139    io_capability: IoCapability
140    local_initiator_key_distribution: KeyDistribution
141    local_responder_key_distribution: KeyDistribution
142
143    def __init__(
144        self,
145        io_capability: IoCapability = NO_OUTPUT_NO_INPUT,
146        local_initiator_key_distribution: KeyDistribution = DEFAULT_KEY_DISTRIBUTION,
147        local_responder_key_distribution: KeyDistribution = DEFAULT_KEY_DISTRIBUTION,
148    ) -> None:
149        self.io_capability = io_capability
150        self.local_initiator_key_distribution = local_initiator_key_distribution
151        self.local_responder_key_distribution = local_responder_key_distribution
152
153    @property
154    def classic_io_capability(self) -> int:
155        """Map the abstract I/O capability to a Classic constant."""
156
157        # pylint: disable=line-too-long
158        return self.CLASSIC_IO_CAPABILITIES_MAP.get(
159            self.io_capability, HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
160        )
161
162    @property
163    def smp_io_capability(self) -> int:
164        """Map the abstract I/O capability to an SMP constant."""
165
166        # This is just a 1-1 direct mapping
167        return self.io_capability
168
169    async def accept(self) -> bool:
170        """Accept or reject a Pairing request."""
171        return True
172
173    async def confirm(self, auto: bool = False) -> bool:
174        """
175        Respond yes or no to a Pairing confirmation question.
176        The `auto` parameter stands for automatic confirmation.
177        """
178        return True
179
180    # pylint: disable-next=unused-argument
181    async def compare_numbers(self, number: int, digits: int) -> bool:
182        """Compare two numbers."""
183        return True
184
185    async def get_number(self) -> Optional[int]:
186        """
187        Return an optional number as an answer to a passkey request.
188        Returning `None` will result in a negative reply.
189        """
190        return 0
191
192    async def get_string(self, max_length: int) -> Optional[str]:
193        """
194        Return a string whose utf-8 encoding is up to max_length bytes.
195        """
196        return None
197
198    # pylint: disable-next=unused-argument
199    async def display_number(self, number: int, digits: int) -> None:
200        """Display a number."""
201
202    # [LE only]
203    async def key_distribution_response(
204        self, peer_initiator_key_distribution: int, peer_responder_key_distribution: int
205    ) -> Tuple[int, int]:
206        """
207        Return the key distribution response in an SMP protocol context.
208
209        NOTE: since it is only used by the SMP protocol, this method's input and output
210        are directly as integers, using the SMP constants, rather than the abstract
211        KeyDistribution enums.
212        """
213        return (
214            int(
215                peer_initiator_key_distribution & self.local_initiator_key_distribution
216            ),
217            int(
218                peer_responder_key_distribution & self.local_responder_key_distribution
219            ),
220        )
221
222
223# -----------------------------------------------------------------------------
224class PairingConfig:
225    """Configuration for the Pairing protocol."""
226
227    class AddressType(enum.IntEnum):
228        PUBLIC = Address.PUBLIC_DEVICE_ADDRESS
229        RANDOM = Address.RANDOM_DEVICE_ADDRESS
230
231    @dataclass
232    class OobConfig:
233        """Config for OOB pairing."""
234
235        our_context: Optional[OobContext]
236        peer_data: Optional[OobSharedData]
237        legacy_context: Optional[OobLegacyContext]
238
239    def __init__(
240        self,
241        sc: bool = True,
242        mitm: bool = True,
243        bonding: bool = True,
244        delegate: Optional[PairingDelegate] = None,
245        identity_address_type: Optional[AddressType] = None,
246        oob: Optional[OobConfig] = None,
247    ) -> None:
248        self.sc = sc
249        self.mitm = mitm
250        self.bonding = bonding
251        self.delegate = delegate or PairingDelegate()
252        self.identity_address_type = identity_address_type
253        self.oob = oob
254
255    def __str__(self) -> str:
256        return (
257            f'PairingConfig(sc={self.sc}, '
258            f'mitm={self.mitm}, bonding={self.bonding}, '
259            f'identity_address_type={self.identity_address_type}, '
260            f'delegate[{self.delegate.io_capability}]), '
261            f'oob[{self.oob}])'
262        )
263