1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import multiprocessing 7import struct 8import time 9 10import common 11from six.moves.queue import Empty 12 13from autotest_lib.client.bin import utils 14from autotest_lib.client.cros.cellular.mbim_compliance import \ 15 mbim_channel_endpoint 16from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors 17 18 19class MBIMChannel(object): 20 """ 21 Provide synchronous access to the modem with MBIM command level interaction. 22 23 This object should simplify your interaction over the MBIM channel as 24 follows: 25 - Use |bidirectional_transaction| to send MBIM packets that are part of a 26 transaction. This function will block until the transaction completes and 27 return the MBIM packets received in response. 28 - |bidirectional_transaction| will filter out packets that do not correspond 29 to your transaction. This way, you don't have to worry about unsolicited 30 notifications and/or stale packets when interacting with the modem. 31 - All filtered out packets can be grabbed using the 32 |get_outstanding_packets| function. Use this function to receive error 33 notifications, status notifications, etc. 34 - Use |unidirectional_transaction| to send MBIM packets for which you don't 35 expect a response. 36 - Use |flush| to clean out all pipes before starting a new transaction. 37 38 Note that "MBIM packets" here really means MBIM fragments. This object does 39 not (de)fragment packets for you. Out of necessity, it does check that 40 received fragments are contiguous and in-order. 41 42 So, this object houses the minimum information necessary about the MBIM 43 fragments to provide you a comfortable synchronous packet level channel. 44 45 """ 46 47 ENDPOINT_JOIN_TIMEOUT_S = 5 48 FRAGMENT_TIMEOUT_S = 3 49 # TODO(pprabhu) Consider allowing each transaction to specify its own 50 # timeout. 51 TRANSACTION_TIMEOUT_S = 5 52 53 MESSAGE_HEADER_FORMAT = '<LLL' 54 FRAGMENT_HEADER_FORMAT = '<LL' 55 MBIM_FRAGMENTED_MESSAGES = [ 56 0x00000003, # MBIM_COMMAND_MSG 57 0x80000003, # MBIM_COMMAND_DONE 58 0x80000007] # MBIM_INDICATE_STATUS 59 60 def __init__(self, 61 device, 62 interface_number, 63 interrupt_endpoint_address, 64 in_buffer_size, 65 process_class=None): 66 """ 67 @param device: Device handle returned by PyUSB for the modem to test. 68 @param interface_number: |bInterfaceNumber| of the MBIM interface. 69 @param interrupt_endpoint_address: |bEndpointAddress| for the usb 70 INTERRUPT IN endpoint for notifications. 71 @param in_buffer_size: The (fixed) buffer size to use for in control 72 transfers. 73 @param process_class: The class to instantiate to create a subprocess. 74 This is used by tests only, to easily mock out the process 75 ceation. 76 77 """ 78 self._stop_request_event = multiprocessing.Event() 79 self._request_queue = multiprocessing.Queue() 80 self._response_queue = multiprocessing.Queue() 81 self._outstanding_packets = [] 82 self._last_response = [] 83 self._stashed_first_fragment = None 84 if process_class is None: 85 process_class = multiprocessing.Process 86 self._endpoint_process = process_class( 87 target=mbim_channel_endpoint.MBIMChannelEndpoint, 88 args=(device, 89 interface_number, 90 interrupt_endpoint_address, 91 in_buffer_size, 92 self._request_queue, 93 self._response_queue, 94 self._stop_request_event)) 95 self._endpoint_process.start() 96 97 98 def __del__(self): 99 """ 100 The destructor. 101 102 Note that it is not guaranteed that |__del__| is called for objects that 103 exist when the interpreter exits. It is recommended to call |close| 104 explicitly. 105 106 """ 107 self.close() 108 109 110 def close(self): 111 """ 112 Cleanly close the MBIMChannel. 113 114 MBIMChannel forks a subprocess to communicate with the USB device. It is 115 recommended that |close| be called explicitly. 116 117 """ 118 if not self._endpoint_process: 119 return 120 121 if self._endpoint_process.is_alive(): 122 self._stop_request_event.set() 123 self._endpoint_process.join(self.ENDPOINT_JOIN_TIMEOUT_S) 124 if self._endpoint_process.is_alive(): 125 self._endpoint_process.terminate() 126 127 self._endpoint_process = None 128 129 130 def bidirectional_transaction(self, *args): 131 """ 132 Execute a synchronous bidirectional transaction. 133 134 @param *args: Fragments of a single MBIM transaction. An MBIM 135 transaction may consist of multiple fragments - each fragment is 136 the payload for a USB control message. It should be an 137 |array.array| object. It is your responsibility (and choice) to 138 keep the fragments in-order, and to send all the fragments. 139 For more details, see "Fragmentation of messages" in the MBIM 140 spec. 141 @returns: A list of fragments in the same order as received that 142 correspond to the given transaction. If we receive less 143 fragments than claimed, we will return what we get. If we 144 receive non-contiguous / out-of-order fragments, we'll complain. 145 @raises: MBIMComplianceChannelError if received fragments are 146 out-of-order or non-contigouos. 147 148 """ 149 self._verify_endpoint_open() 150 if not args: 151 mbim_errors.log_and_raise( 152 mbim_errors.MBIMComplianceChannelError, 153 'No data given to |bidirectional_transaction|.') 154 155 transaction_id, _, _ = self._fragment_metadata(args[0]) 156 for fragment in args: 157 self._request_queue.put_nowait(fragment) 158 return self._get_response_fragments(transaction_id) 159 160 161 def unidirectional_transaction(self, *args): 162 """ 163 Execute a synchronous unidirectional transaction. No return value. 164 165 @param *args: Fragments of a single MBIM transaction. An MBIM 166 transaction may consist of multiple fragments - each fragment is 167 the payload for a USB control message. It should be an 168 |array.array| object. It is your responsibility (and choice) to 169 keep the fragments in-order, and to send all the fragments. 170 For more details, see "Fragmentation of messages" in the MBIM 171 spec. 172 173 """ 174 self._verify_endpoint_open() 175 if not args: 176 mbim_errors.log_and_raise( 177 mbim_errors.MBIMComplianceChannelError, 178 'No data given to |unidirectional_transaction|.') 179 180 for fragment in args: 181 self._request_queue.put_nowait(fragment) 182 183 184 def flush(self): 185 """ 186 Clean out all queues. 187 188 This waits till all outgoing packets have been sent, and then waits some 189 more to give the channel time to settle down. 190 191 @raises: MBIMComplianceChannelError if things don't settle down fast 192 enough. 193 """ 194 self._verify_endpoint_open() 195 num_remaining_fragments = self._request_queue.qsize() 196 try: 197 timeout = self.FRAGMENT_TIMEOUT_S * num_remaining_fragments 198 utils.poll_for_condition(lambda: self._request_queue.empty(), 199 timeout=timeout) 200 except utils.TimeoutError: 201 mbim_errors.log_and_raise( 202 mbim_errors.MBIMComplianceChannelError, 203 'Could not flush request queue.') 204 205 # Now wait for the response queue to settle down. 206 # In the worst case, each request fragment that was remaining at the 207 # time flush was called belonged to a different transaction, and each of 208 # these transactions would serially timeout in |TRANSACTION_TIMEOUT_S|. 209 # To avoid sleeping for long times, we cap this value arbitrarily to 5 210 # transactions. 211 num_remaining_transactions = min(5, num_remaining_fragments) 212 time.sleep(num_remaining_fragments * self.TRANSACTION_TIMEOUT_S) 213 extra_packets = self.get_outstanding_packets() 214 for packet in extra_packets: 215 logging.debug('flush: discarding packet: %s', packet) 216 217 218 def get_outstanding_packets(self): 219 """ 220 Get all received packets that were not part of an explicit transaction. 221 222 @returns: A list of packets. Each packet is a list of fragments, so you 223 perhaps want to do something like: 224 for packet in channel.get_outstanding_packets(): 225 for fragment in packet: 226 # handle fragment. 227 228 """ 229 self._verify_endpoint_open() 230 # Try to get more packets from the response queue. 231 # This can block forever if the modem keeps spewing trash at us. 232 while True: 233 packet = self._get_packet_fragments() 234 if not packet: 235 break 236 self._outstanding_packets.append(packet) 237 238 packets = self._outstanding_packets 239 self._outstanding_packets = [] 240 return packets 241 242 243 def _get_response_fragments(self, transaction_id): 244 """ 245 Get response for the given |transaction_id|. 246 247 @returns: A list of fragments. 248 @raises: MBIMComplianceChannelError if response is not recieved. 249 250 """ 251 def _poll_response(): 252 packet = self._get_packet_fragments() 253 if not packet: 254 return False 255 first_fragment = packet[0] 256 response_id, _, _ = self._fragment_metadata(first_fragment) 257 if response_id == transaction_id: 258 self._last_response = packet 259 return True 260 self._outstanding_packets.append(packet) 261 return False 262 263 try: 264 utils.poll_for_condition( 265 _poll_response, 266 timeout=self.TRANSACTION_TIMEOUT_S) 267 except utils.TimeoutError: 268 mbim_errors.log_and_raise( 269 mbim_errors.MBIMComplianceChannelError, 270 'Did not receive timely reply to transaction %d' % 271 transaction_id) 272 return self._last_response 273 274 275 def _get_packet_fragments(self): 276 """ 277 Get all fragements of the next packet from the modem. 278 279 This function is responsible for putting together fragments of one 280 packet, and checking that fragments are continguous and in-order. 281 282 """ 283 fragments = [] 284 if self._stashed_first_fragment is not None: 285 first_fragment = self._stashed_first_fragment 286 self._stashed_first_fragment = None 287 else: 288 try: 289 first_fragment = self._response_queue.get( 290 True, self.FRAGMENT_TIMEOUT_S) 291 except Empty: 292 # *Don't fail* Just return nothing. 293 return fragments 294 295 transaction_id, total_fragments, current_fragment = ( 296 self._fragment_metadata(first_fragment)) 297 if current_fragment != 0: 298 mbim_errors.log_and_raise( 299 mbim_errors.MBIMComplianceChannelError, 300 'First fragment reports fragment number %d' % 301 current_fragment) 302 303 fragments.append(first_fragment) 304 305 last_fragment = 0 306 while last_fragment < total_fragments - 1: 307 try: 308 fragment = self._response_queue.get(True, 309 self.FRAGMENT_TIMEOUT_S) 310 except Empty: 311 # *Don't fail* Just return the fragments we got so far. 312 break 313 314 fragment_id, fragment_total, fragment_current = ( 315 self._fragment_metadata(fragment)) 316 if fragment_id != transaction_id: 317 # *Don't fail* Treat a different transaction id as indicating 318 # that the next packet has already arrived. 319 logging.warning('Recieved only %d out of %d fragments for ' 320 'transaction %d.', 321 last_fragment, 322 total_fragments, 323 transaction_id) 324 self._stashed_first_fragment = fragment 325 break 326 327 if fragment_total != total_fragments: 328 mbim_errors.log_and_raise( 329 mbim_errors.MBIMComplianceChannelError, 330 'Fragment number %d reports incorrect total (%d/%d)' % 331 (last_fragment + 1, fragment_total, total_fragments)) 332 333 if fragment_current != last_fragment + 1: 334 mbim_errors.log_and_raise( 335 mbim_errors.MBIMComplianceChannelError, 336 'Received reordered fragments. Expected %d, got %d' % 337 (last_fragment + 1, fragment_current)) 338 339 last_fragment += 1 340 fragments.append(fragment) 341 342 return fragments 343 344 345 def _fragment_metadata(self, fragment): 346 """ This function houses all the MBIM packet knowledge. """ 347 # All packets have a message header. 348 if len(fragment) < struct.calcsize(self.MESSAGE_HEADER_FORMAT): 349 mbim_errors.log_and_raise( 350 mbim_errors.MBIMComplianceChannelError, 351 'Corrupted fragment |%s| does not have an MBIM header.' % 352 fragment) 353 354 message_type, _, transaction_id = struct.unpack_from( 355 self.MESSAGE_HEADER_FORMAT, 356 fragment) 357 358 if message_type in self.MBIM_FRAGMENTED_MESSAGES: 359 fragment = fragment[struct.calcsize(self.MESSAGE_HEADER_FORMAT):] 360 if len(fragment) < struct.calcsize(self.FRAGMENT_HEADER_FORMAT): 361 mbim_errors.log_and_raise( 362 mbim_errors.MBIMComplianceChannelError, 363 'Corrupted fragment |%s| does not have a fragment ' 364 'header. ' % 365 fragment) 366 367 total_fragments, current_fragment = struct.unpack_from( 368 self.FRAGMENT_HEADER_FORMAT, 369 fragment) 370 else: 371 # For other types, there is only one 'fragment'. 372 total_fragments = 1 373 current_fragment = 0 374 375 return transaction_id, total_fragments, current_fragment 376 377 378 def _verify_endpoint_open(self): 379 if not self._endpoint_process.is_alive(): 380 mbim_errors.log_and_raise( 381 mbim_errors.MBIMComplianceChannelError, 382 'MBIMChannelEndpoint died unexpectedly. ' 383 'The actual exception can be found in log entries from the ' 384 'subprocess.') 385