xref: /aosp_15_r20/external/autotest/client/cros/cellular/mbim_compliance/mbim_channel.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import multiprocessing
7import struct
8import time
9
10import common
11from six.moves.queue import Empty
12
13from autotest_lib.client.bin import utils
14from autotest_lib.client.cros.cellular.mbim_compliance import \
15    mbim_channel_endpoint
16from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors
17
18
19class MBIMChannel(object):
20    """
21    Provide synchronous access to the modem with MBIM command level interaction.
22
23    This object should simplify your interaction over the MBIM channel as
24    follows:
25    - Use |bidirectional_transaction| to send MBIM packets that are part of a
26      transaction. This function will block until the transaction completes and
27      return the MBIM packets received in response.
28    - |bidirectional_transaction| will filter out packets that do not correspond
29      to your transaction. This way, you don't have to worry about unsolicited
30      notifications and/or stale packets when interacting with the modem.
31    - All filtered out packets can be grabbed using the
32      |get_outstanding_packets| function. Use this function to receive error
33      notifications, status notifications, etc.
34    - Use |unidirectional_transaction| to send MBIM packets for which you don't
35      expect a response.
36    - Use |flush| to clean out all pipes before starting a new transaction.
37
38    Note that "MBIM packets" here really means MBIM fragments. This object does
39    not (de)fragment packets for you. Out of necessity, it does check that
40    received fragments are contiguous and in-order.
41
42    So, this object houses the minimum information necessary about the MBIM
43    fragments to provide you a comfortable synchronous packet level channel.
44
45    """
46
47    ENDPOINT_JOIN_TIMEOUT_S = 5
48    FRAGMENT_TIMEOUT_S = 3
49    # TODO(pprabhu) Consider allowing each transaction to specify its own
50    # timeout.
51    TRANSACTION_TIMEOUT_S = 5
52
53    MESSAGE_HEADER_FORMAT = '<LLL'
54    FRAGMENT_HEADER_FORMAT = '<LL'
55    MBIM_FRAGMENTED_MESSAGES = [
56            0x00000003,  # MBIM_COMMAND_MSG
57            0x80000003,  # MBIM_COMMAND_DONE
58            0x80000007]  # MBIM_INDICATE_STATUS
59
60    def __init__(self,
61                 device,
62                 interface_number,
63                 interrupt_endpoint_address,
64                 in_buffer_size,
65                 process_class=None):
66        """
67        @param device: Device handle returned by PyUSB for the modem to test.
68        @param interface_number: |bInterfaceNumber| of the MBIM interface.
69        @param interrupt_endpoint_address: |bEndpointAddress| for the usb
70                INTERRUPT IN endpoint for notifications.
71        @param in_buffer_size: The (fixed) buffer size to use for in control
72                transfers.
73        @param process_class: The class to instantiate to create a subprocess.
74                This is used by tests only, to easily mock out the process
75                ceation.
76
77        """
78        self._stop_request_event = multiprocessing.Event()
79        self._request_queue = multiprocessing.Queue()
80        self._response_queue = multiprocessing.Queue()
81        self._outstanding_packets = []
82        self._last_response = []
83        self._stashed_first_fragment = None
84        if process_class is None:
85            process_class = multiprocessing.Process
86        self._endpoint_process = process_class(
87                target=mbim_channel_endpoint.MBIMChannelEndpoint,
88                args=(device,
89                      interface_number,
90                      interrupt_endpoint_address,
91                      in_buffer_size,
92                      self._request_queue,
93                      self._response_queue,
94                      self._stop_request_event))
95        self._endpoint_process.start()
96
97
98    def __del__(self):
99        """
100        The destructor.
101
102        Note that it is not guaranteed that |__del__| is called for objects that
103        exist when the interpreter exits. It is recommended to call |close|
104        explicitly.
105
106        """
107        self.close()
108
109
110    def close(self):
111        """
112        Cleanly close the MBIMChannel.
113
114        MBIMChannel forks a subprocess to communicate with the USB device. It is
115        recommended that |close| be called explicitly.
116
117        """
118        if not self._endpoint_process:
119            return
120
121        if self._endpoint_process.is_alive():
122            self._stop_request_event.set()
123            self._endpoint_process.join(self.ENDPOINT_JOIN_TIMEOUT_S)
124            if self._endpoint_process.is_alive():
125                self._endpoint_process.terminate()
126
127        self._endpoint_process = None
128
129
130    def bidirectional_transaction(self, *args):
131        """
132        Execute a synchronous bidirectional transaction.
133
134        @param *args: Fragments of a single MBIM transaction. An MBIM
135                transaction may consist of multiple fragments - each fragment is
136                the payload for a USB control message. It should be an
137                |array.array| object.  It is your responsibility (and choice) to
138                keep the fragments in-order, and to send all the fragments.
139                For more details, see "Fragmentation of messages" in the MBIM
140                spec.
141        @returns: A list of fragments in the same order as received that
142                correspond to the given transaction. If we receive less
143                fragments than claimed, we will return what we get. If we
144                receive non-contiguous / out-of-order fragments, we'll complain.
145        @raises: MBIMComplianceChannelError if received fragments are
146                out-of-order or non-contigouos.
147
148        """
149        self._verify_endpoint_open()
150        if not args:
151            mbim_errors.log_and_raise(
152                    mbim_errors.MBIMComplianceChannelError,
153                    'No data given to |bidirectional_transaction|.')
154
155        transaction_id, _, _ = self._fragment_metadata(args[0])
156        for fragment in args:
157            self._request_queue.put_nowait(fragment)
158        return self._get_response_fragments(transaction_id)
159
160
161    def unidirectional_transaction(self, *args):
162        """
163        Execute a synchronous unidirectional transaction. No return value.
164
165        @param *args: Fragments of a single MBIM transaction. An MBIM
166                transaction may consist of multiple fragments - each fragment is
167                the payload for a USB control message. It should be an
168                |array.array| object.  It is your responsibility (and choice) to
169                keep the fragments in-order, and to send all the fragments.
170                For more details, see "Fragmentation of messages" in the MBIM
171                spec.
172
173        """
174        self._verify_endpoint_open()
175        if not args:
176            mbim_errors.log_and_raise(
177                    mbim_errors.MBIMComplianceChannelError,
178                    'No data given to |unidirectional_transaction|.')
179
180        for fragment in args:
181            self._request_queue.put_nowait(fragment)
182
183
184    def flush(self):
185        """
186        Clean out all queues.
187
188        This waits till all outgoing packets have been sent, and then waits some
189        more to give the channel time to settle down.
190
191        @raises: MBIMComplianceChannelError if things don't settle down fast
192                enough.
193        """
194        self._verify_endpoint_open()
195        num_remaining_fragments = self._request_queue.qsize()
196        try:
197            timeout = self.FRAGMENT_TIMEOUT_S * num_remaining_fragments
198            utils.poll_for_condition(lambda: self._request_queue.empty(),
199                                     timeout=timeout)
200        except utils.TimeoutError:
201            mbim_errors.log_and_raise(
202                    mbim_errors.MBIMComplianceChannelError,
203                    'Could not flush request queue.')
204
205        # Now wait for the response queue to settle down.
206        # In the worst case, each request fragment that was remaining at the
207        # time flush was called belonged to a different transaction, and each of
208        # these transactions would serially timeout in |TRANSACTION_TIMEOUT_S|.
209        # To avoid sleeping for long times, we cap this value arbitrarily to 5
210        # transactions.
211        num_remaining_transactions = min(5, num_remaining_fragments)
212        time.sleep(num_remaining_fragments * self.TRANSACTION_TIMEOUT_S)
213        extra_packets = self.get_outstanding_packets()
214        for packet in extra_packets:
215            logging.debug('flush: discarding packet: %s', packet)
216
217
218    def get_outstanding_packets(self):
219        """
220        Get all received packets that were not part of an explicit transaction.
221
222        @returns: A list of packets. Each packet is a list of fragments, so you
223        perhaps want to do something like:
224            for packet in channel.get_outstanding_packets():
225                for fragment in packet:
226                    # handle fragment.
227
228        """
229        self._verify_endpoint_open()
230        # Try to get more packets from the response queue.
231        # This can block forever if the modem keeps spewing trash at us.
232        while True:
233            packet = self._get_packet_fragments()
234            if not packet:
235                break
236            self._outstanding_packets.append(packet)
237
238        packets = self._outstanding_packets
239        self._outstanding_packets = []
240        return packets
241
242
243    def _get_response_fragments(self, transaction_id):
244        """
245        Get response for the given |transaction_id|.
246
247        @returns: A list of fragments.
248        @raises: MBIMComplianceChannelError if response is not recieved.
249
250        """
251        def _poll_response():
252            packet = self._get_packet_fragments()
253            if not packet:
254                return False
255            first_fragment = packet[0]
256            response_id, _, _ = self._fragment_metadata(first_fragment)
257            if response_id == transaction_id:
258                self._last_response = packet
259                return True
260            self._outstanding_packets.append(packet)
261            return False
262
263        try:
264            utils.poll_for_condition(
265                    _poll_response,
266                    timeout=self.TRANSACTION_TIMEOUT_S)
267        except utils.TimeoutError:
268            mbim_errors.log_and_raise(
269                    mbim_errors.MBIMComplianceChannelError,
270                    'Did not receive timely reply to transaction %d' %
271                    transaction_id)
272        return self._last_response
273
274
275    def _get_packet_fragments(self):
276        """
277        Get all fragements of the next packet from the modem.
278
279        This function is responsible for putting together fragments of one
280        packet, and checking that fragments are continguous and in-order.
281
282        """
283        fragments = []
284        if self._stashed_first_fragment is not None:
285            first_fragment = self._stashed_first_fragment
286            self._stashed_first_fragment = None
287        else:
288            try:
289                first_fragment = self._response_queue.get(
290                        True, self.FRAGMENT_TIMEOUT_S)
291            except Empty:
292                # *Don't fail* Just return nothing.
293                return fragments
294
295        transaction_id, total_fragments, current_fragment = (
296                self._fragment_metadata(first_fragment))
297        if current_fragment != 0:
298            mbim_errors.log_and_raise(
299                    mbim_errors.MBIMComplianceChannelError,
300                    'First fragment reports fragment number %d' %
301                    current_fragment)
302
303        fragments.append(first_fragment)
304
305        last_fragment = 0
306        while last_fragment < total_fragments - 1:
307            try:
308                fragment = self._response_queue.get(True,
309                                                    self.FRAGMENT_TIMEOUT_S)
310            except Empty:
311                # *Don't fail* Just return the fragments we got so far.
312                break
313
314            fragment_id, fragment_total, fragment_current = (
315                    self._fragment_metadata(fragment))
316            if fragment_id != transaction_id:
317                # *Don't fail* Treat a different transaction id as indicating
318                # that the next packet has already arrived.
319                logging.warning('Recieved only %d out of %d fragments for '
320                                'transaction %d.',
321                                last_fragment,
322                                total_fragments,
323                                transaction_id)
324                self._stashed_first_fragment = fragment
325                break
326
327            if fragment_total != total_fragments:
328                mbim_errors.log_and_raise(
329                        mbim_errors.MBIMComplianceChannelError,
330                        'Fragment number %d reports incorrect total (%d/%d)' %
331                        (last_fragment + 1, fragment_total, total_fragments))
332
333            if fragment_current != last_fragment + 1:
334                mbim_errors.log_and_raise(
335                        mbim_errors.MBIMComplianceChannelError,
336                        'Received reordered fragments. Expected %d, got %d' %
337                        (last_fragment + 1, fragment_current))
338
339            last_fragment += 1
340            fragments.append(fragment)
341
342        return fragments
343
344
345    def _fragment_metadata(self, fragment):
346        """ This function houses all the MBIM packet knowledge. """
347        # All packets have a message header.
348        if len(fragment) < struct.calcsize(self.MESSAGE_HEADER_FORMAT):
349            mbim_errors.log_and_raise(
350                    mbim_errors.MBIMComplianceChannelError,
351                    'Corrupted fragment |%s| does not have an MBIM header.' %
352                    fragment)
353
354        message_type, _, transaction_id = struct.unpack_from(
355                self.MESSAGE_HEADER_FORMAT,
356                fragment)
357
358        if message_type in self.MBIM_FRAGMENTED_MESSAGES:
359            fragment = fragment[struct.calcsize(self.MESSAGE_HEADER_FORMAT):]
360            if len(fragment) < struct.calcsize(self.FRAGMENT_HEADER_FORMAT):
361                mbim_errors.log_and_raise(
362                        mbim_errors.MBIMComplianceChannelError,
363                        'Corrupted fragment |%s| does not have a fragment '
364                        'header. ' %
365                        fragment)
366
367            total_fragments, current_fragment = struct.unpack_from(
368                    self.FRAGMENT_HEADER_FORMAT,
369                    fragment)
370        else:
371            # For other types, there is only one 'fragment'.
372            total_fragments = 1
373            current_fragment = 0
374
375        return transaction_id, total_fragments, current_fragment
376
377
378    def _verify_endpoint_open(self):
379        if not self._endpoint_process.is_alive():
380            mbim_errors.log_and_raise(
381                    mbim_errors.MBIMComplianceChannelError,
382                    'MBIMChannelEndpoint died unexpectedly. '
383                    'The actual exception can be found in log entries from the '
384                    'subprocess.')
385