1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15from dataclasses import dataclass
16import re
17from mobly import asserts
18from mobly.controllers import android_device
19from mobly.controllers.android_device_lib.adb import AdbError
20from net_tests_utils.host.python import adb_utils, assert_utils
21
22
23class PatternNotFoundException(Exception):
24  """Raised when the given pattern cannot be found."""
25
26
27class UnsupportedOperationException(Exception):
28  pass
29
30
31def get_apf_counter(
32    ad: android_device.AndroidDevice, iface: str, counter_name: str
33) -> int:
34  counters = get_apf_counters_from_dumpsys(ad, iface)
35  return counters.get(counter_name, 0)
36
37
38def get_apf_counters_from_dumpsys(
39    ad: android_device.AndroidDevice, iface_name: str
40) -> dict:
41  dumpsys = adb_utils.get_dumpsys_for_service(ad, "network_stack")
42
43  # Extract IpClient section of the specified interface.
44  # This takes inputs like:
45  # IpClient.wlan0
46  #   ...
47  # IpClient.wlan1
48  #   ...
49  iface_pattern = re.compile(
50      r"^IpClient\." + iface_name + r"\n" + r"((^\s.*\n)+)", re.MULTILINE
51  )
52  iface_result = iface_pattern.search(dumpsys)
53  if iface_result is None:
54    raise PatternNotFoundException("Cannot find IpClient for " + iface_name)
55
56  # Extract APF counters section from IpClient section, which looks like:
57  #     APF packet counters:
58  #       COUNTER_NAME: VALUE
59  #       ....
60  apf_pattern = re.compile(
61      r"APF packet counters:.*\n.(\s+[A-Z_0-9]+: \d+\n)+", re.MULTILINE
62  )
63  apf_result = apf_pattern.search(iface_result.group(0))
64  if apf_result is None:
65    raise PatternNotFoundException(
66        "Cannot find APF counters in text: " + iface_result.group(0)
67    )
68
69  # Extract key-value pairs from APF counters section into a list of tuples,
70  # e.g. [('COUNTER1', '1'), ('COUNTER2', '2')].
71  counter_pattern = re.compile(r"(?P<name>[A-Z_0-9]+): (?P<value>\d+)")
72  counter_result = counter_pattern.findall(apf_result.group(0))
73  if counter_result is None:
74    raise PatternNotFoundException(
75        "Cannot extract APF counters in text: " + apf_result.group(0)
76    )
77
78  # Convert into a dict.
79  result = {}
80  for key, value_str in counter_result:
81    result[key] = int(value_str)
82
83  ad.log.debug("Getting apf counters: " + str(result))
84  return result
85
86def get_ipv4_addresses(
87    ad: android_device.AndroidDevice, iface_name: str
88) -> list[str]:
89  """Retrieves the IPv4 addresses of a given interface on an Android device.
90
91  This function executes an ADB shell command (`ip -4 address show`) to get the
92  network interface information and extracts the IPv4 address from the output.
93  If devices have no IPv4 address, raise PatternNotFoundException.
94
95  Args:
96      ad: The Android device object.
97      iface_name: The name of the network interface (e.g., "wlan0").
98
99  Returns:
100      The IPv4 addresses of the interface as a list of string.
101      Return empty list if no IPv4 address.
102  """
103  # output format:
104  # 54: wlan2: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
105  #     inet 192.168.195.162/24 brd 192.168.195.255 scope global wlan2
106  #         valid_lft forever preferred_lft forever
107  #     inet 192.168.1.1/24 brd 192.168.1.255 scope global wlan2
108  #         valid_lft forever preferred_lft forever
109  output = adb_utils.adb_shell(ad, f"ip -4 address show {iface_name}")
110  pattern = r"inet\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\/\d+"
111  matches = re.findall(pattern, output)
112
113  if matches:
114    return matches
115  else:
116    return []
117
118def get_ipv6_addresses(
119    ad: android_device.AndroidDevice, iface_name: str
120) -> list[str]:
121  """Retrieves the IPv6 addresses of a given interface on an Android device.
122
123  This function executes an ADB shell command (`ip -6 address show`) to get the
124  network interface information and extracts the IPv6 address from the output.
125  If devices have no IPv6 address, raise PatternNotFoundException.
126
127  Args:
128      ad: The Android device object.
129      iface_name: The name of the network interface (e.g., "wlan0").
130
131  Returns:
132      The IPv6 addresses of the interface as a list of string.
133      Return empty list if no IPv6 address.
134  """
135  # output format
136  # 54: wlan2: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 state UP qlen 1000
137  #     inet6 fe80::10a3:5dff:fe52:de32/64 scope link
138  #         valid_lft forever preferred_lft forever
139  #     inet6 fe80::1233:aadb:3d32:1234/64 scope link
140  #         valid_lft forever preferred_lft forever
141  output = adb_utils.adb_shell(ad, f"ip -6 address show {iface_name}")
142  pattern = r"inet6\s+([0-9a-fA-F:]+)\/\d+"
143  matches = re.findall(pattern, output)
144
145  if matches:
146    return matches
147  else:
148    return []
149
150def get_hardware_address(
151    ad: android_device.AndroidDevice, iface_name: str
152) -> str:
153  """Retrieves the hardware (MAC) address for a given network interface.
154
155  Returns:
156      The hex representative of the MAC address in uppercase.
157      E.g. 12:34:56:78:90:AB
158
159  Raises:
160      PatternNotFoundException: If the MAC address is not found in the command
161      output.
162  """
163
164  # Run the "ip link" command and get its output.
165  ip_link_output = adb_utils.adb_shell(ad, f"ip link show {iface_name}")
166
167  # Regular expression to extract the MAC address.
168  # Parse hardware address from ip link output like below:
169  # 46: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq ...
170  #    link/ether 72:05:77:82:21:e0 brd ff:ff:ff:ff:ff:ff
171  pattern = r"link/ether (([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2})"
172  match = re.search(pattern, ip_link_output)
173
174  if match:
175    return match.group(1).upper()  # Extract the MAC address string.
176  else:
177    raise PatternNotFoundException(
178        "Cannot get hardware address for " + iface_name
179    )
180
181def is_packet_capture_supported(
182        ad: android_device.AndroidDevice,
183) -> bool:
184
185  try:
186    # Invoke the shell command with empty argument and see how NetworkStack respond.
187    # If supported, an IllegalArgumentException with help page will be printed.
188    assert_utils.expect_throws(
189      lambda: start_capture_packets(ad, ""),
190      assert_utils.UnexpectedBehaviorError
191    )
192    assert_utils.expect_throws(
193      lambda: stop_capture_packets(ad, ""),
194      assert_utils.UnexpectedBehaviorError
195    )
196    assert_utils.expect_throws(
197      lambda: get_matched_packet_counts(ad, "", ""),
198      assert_utils.UnexpectedBehaviorError
199    )
200  except assert_utils.UnexpectedExceptionError:
201    return False
202
203  # If no UnsupportOperationException is thrown, regard it as supported
204  return True
205
206def is_send_raw_packet_downstream_supported(
207    ad: android_device.AndroidDevice,
208) -> bool:
209  try:
210    # Invoke the shell command with empty argument and see how NetworkStack respond.
211    # If supported, an IllegalArgumentException with help page will be printed.
212    send_raw_packet_downstream(ad, "", "")
213  except assert_utils.UnexpectedBehaviorError:
214    return True
215  except UnsupportedOperationException:
216    return False
217
218
219def send_raw_packet_downstream(
220    ad: android_device.AndroidDevice,
221    iface_name: str,
222    packet_in_hex: str,
223) -> None:
224  """Sends a raw packet over the specified downstream interface.
225
226  This function constructs and sends a raw packet using the
227  `send-raw-packet-downstream`
228  command provided by NetworkStack process. It's primarily intended for testing
229  purposes.
230
231  Args:
232      ad: The AndroidDevice object representing the connected device.
233      iface_name: The name of the network interface to use (e.g., "wlan0",
234        "eth0").
235      packet_in_hex: The raw packet data starting from L2 header encoded in
236        hexadecimal string format.
237
238  Raises:
239      UnsupportedOperationException: If the NetworkStack doesn't support
240        the `send-raw-packet` command.
241      UnexpectedBehaviorException: If the command execution produces unexpected
242        output other than an empty response or "Unknown command".
243
244  Important Considerations:
245      Security: This method only works on tethering downstream interfaces due
246        to security restrictions.
247      Packet Format: The `packet_in_hex` must be a valid hexadecimal
248        representation of a packet starting from L2 header.
249  """
250
251  cmd = f"cmd network_stack send-raw-packet-downstream {iface_name} {packet_in_hex}"
252
253  # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise.
254  adb_output = AdbOutputHandler(ad, cmd).get_output()
255  if adb_output:
256    raise assert_utils.UnexpectedBehaviorError(
257      f"Got unexpected output: {adb_output} for command: {cmd}."
258    )
259
260def start_capture_packets(
261        ad: android_device.AndroidDevice,
262        iface_name: str
263) -> None:
264  """Starts packet capturing on a specified network interface.
265
266  This function initiates packet capture on the given network interface of an
267  Android device using an ADB shell command. It handles potential errors
268  related to unsupported commands or unexpected output.
269  This command only supports downstream tethering interface.
270
271  Args:
272    ad: The Android device object.
273    iface_name: The name of the network interface (e.g., "wlan0").
274  """
275  cmd = f"cmd network_stack capture start {iface_name}"
276
277  # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise.
278  adb_output = AdbOutputHandler(ad, cmd).get_output()
279  if adb_output != "success":
280    raise assert_utils.UnexpectedBehaviorError(
281      f"Got unexpected output: {adb_output} for command: {cmd}."
282    )
283
284def stop_capture_packets(
285        ad: android_device.AndroidDevice,
286        iface_name: str
287) -> None:
288  """Stops packet capturing on a specified network interface.
289
290  This function terminates packet capture on the given network interface of an
291  Android device using an ADB shell command. It handles potential errors
292  related to unsupported commands or unexpected output.
293
294  Args:
295    ad: The Android device object.
296    iface_name: The name of the network interface (e.g., "wlan0").
297  """
298  cmd = f"cmd network_stack capture stop {iface_name}"
299
300  # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise.
301  adb_output = AdbOutputHandler(ad, cmd).get_output()
302  if adb_output != "success":
303    raise assert_utils.UnexpectedBehaviorError(
304      f"Got unexpected output: {adb_output} for command: {cmd}."
305    )
306
307def get_matched_packet_counts(
308        ad: android_device.AndroidDevice,
309        iface_name: str,
310        packet_in_hex: str
311) -> int:
312  """Gets the number of captured packets matching a specific hexadecimal pattern.
313
314  This function retrieves the count of captured packets on the specified
315  network interface that match a given hexadecimal pattern. It uses an ADB
316  shell command and handles potential errors related to unsupported commands,
317  unexpected output, or invalid output format.
318
319  Args:
320    ad: The Android device object.
321    iface_name: The name of the network interface (e.g., "wlan0").
322    packet_in_hex: The hexadecimal string representing the packet pattern.
323
324  Returns:
325    The number of matched packets as an integer.
326  """
327  cmd = f"cmd network_stack capture matched-packet-counts {iface_name} {packet_in_hex}"
328
329  # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise.
330  adb_output = AdbOutputHandler(ad, cmd).get_output()
331  try:
332    return int(adb_output)
333  except ValueError as e:
334    raise assert_utils.UnexpectedBehaviorError(
335      f"Got unexpected exception: {e} for command: {cmd}."
336    )
337
338@dataclass
339class ApfCapabilities:
340  """APF program support capabilities.
341
342  See android.net.apf.ApfCapabilities.
343
344  Attributes:
345      apf_version_supported (int): Version of APF instruction set supported for
346        packet filtering. 0 indicates no support for packet filtering using APF
347        programs.
348      apf_ram_size (int): Size of APF ram.
349      apf_packet_format (int): Format of packets passed to APF filter. Should be
350        one of ARPHRD_*
351  """
352
353  apf_version_supported: int
354  apf_ram_size: int
355  apf_packet_format: int
356
357  def __init__(
358      self,
359      apf_version_supported: int,
360      apf_ram_size: int,
361      apf_packet_format: int,
362  ):
363    self.apf_version_supported = apf_version_supported
364    self.apf_ram_size = apf_ram_size
365    self.apf_packet_format = apf_packet_format
366
367  def __str__(self):
368    """Returns a user-friendly string representation of the APF capabilities."""
369    return (
370        f"APF Version: {self.apf_version_supported}\n"
371        f"Ram Size: {self.apf_ram_size} bytes\n"
372        f"Packet Format: {self.apf_packet_format}"
373    )
374
375
376def get_apf_capabilities(
377    ad: android_device.AndroidDevice, iface_name: str
378) -> ApfCapabilities:
379  output = adb_utils.adb_shell(
380      ad, f"cmd network_stack apf {iface_name} capabilities"
381  )
382  try:
383    values = [int(value_str) for value_str in output.split(",")]
384  except ValueError:
385    return ApfCapabilities(0, 0, 0)  # Conversion to integer failed
386  return ApfCapabilities(values[0], values[1], values[2])
387
388
389def assume_apf_version_support_at_least(
390    ad: android_device.AndroidDevice, iface_name: str, expected_version: int
391) -> None:
392  caps = get_apf_capabilities(ad, iface_name)
393  asserts.abort_class_if(
394      caps.apf_version_supported < expected_version,
395      f"Supported apf version {caps.apf_version_supported} < expected version"
396      f" {expected_version}",
397  )
398
399class AdbOutputHandler:
400  def __init__(self, ad, cmd):
401    self._ad = ad
402    self._cmd = cmd
403
404  def get_output(self) -> str:
405    try:
406      return adb_utils.adb_shell(self._ad, self._cmd)
407    except AdbError as e:
408      output = str(e.stdout)
409      if "Unknown command" in output:
410        raise UnsupportedOperationException(
411          f"{self._cmd} is not supported."
412        )
413      return output