1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15from unittest.mock import MagicMock, patch
16from absl.testing import parameterized
17from mobly import asserts
18from mobly import base_test
19from mobly import config_parser
20from mobly.controllers.android_device_lib.adb import AdbError
21from net_tests_utils.host.python.apf_utils import (
22    ApfCapabilities,
23    PatternNotFoundException,
24    UnsupportedOperationException,
25    get_apf_capabilities,
26    get_apf_counter,
27    get_apf_counters_from_dumpsys,
28    get_ipv4_addresses,
29    get_ipv6_addresses,
30    get_hardware_address,
31    is_send_raw_packet_downstream_supported,
32    is_packet_capture_supported,
33    start_capture_packets,
34    stop_capture_packets,
35    get_matched_packet_counts,
36    send_raw_packet_downstream,
37)
38from net_tests_utils.host.python.assert_utils import UnexpectedBehaviorError
39
40TEST_IFACE_NAME = "eth0"
41TEST_PACKET_IN_HEX = "AABBCCDDEEFF"
42
43
44class TestApfUtils(base_test.BaseTestClass, parameterized.TestCase):
45
46  def __init__(self, configs: config_parser.TestRunConfig):
47    super().__init__(configs)
48
49  def setup_test(self):
50    self.mock_ad = MagicMock()  # Mock Android device object
51
52  @patch("net_tests_utils.host.python.adb_utils.get_dumpsys_for_service")
53  def test_get_apf_counters_from_dumpsys_success(
54      self, mock_get_dumpsys: MagicMock
55  ) -> None:
56    mock_get_dumpsys.return_value = """
57IpClient.wlan0
58  APF packet counters:
59    COUNTER_NAME1: 123
60    COUNTER_NAME2: 456
61"""
62    counters = get_apf_counters_from_dumpsys(self.mock_ad, "wlan0")
63    asserts.assert_equal(counters, {"COUNTER_NAME1": 123, "COUNTER_NAME2": 456})
64
65  @patch("net_tests_utils.host.python.adb_utils.get_dumpsys_for_service")
66  def test_get_apf_counters_from_dumpsys_exceptions(
67      self, mock_get_dumpsys: MagicMock
68  ) -> None:
69    test_cases = [
70        "",
71        "IpClient.wlan0\n",
72        "IpClient.wlan0\n APF packet counters:\n",
73        """
74IpClient.wlan1
75  APF packet counters:
76    COUNTER_NAME1: 123
77    COUNTER_NAME2: 456
78""",
79    ]
80
81    for dumpsys_output in test_cases:
82      mock_get_dumpsys.return_value = dumpsys_output
83      with asserts.assert_raises(PatternNotFoundException):
84        get_apf_counters_from_dumpsys(self.mock_ad, "wlan0")
85
86  @patch("net_tests_utils.host.python.apf_utils.get_apf_counters_from_dumpsys")
87  def test_get_apf_counter(self, mock_get_counters: MagicMock) -> None:
88    iface = "wlan0"
89    mock_get_counters.return_value = {
90        "COUNTER_NAME1": 123,
91        "COUNTER_NAME2": 456,
92    }
93    asserts.assert_equal(
94        get_apf_counter(self.mock_ad, iface, "COUNTER_NAME1"), 123
95    )
96    # Not found
97    asserts.assert_equal(
98        get_apf_counter(self.mock_ad, iface, "COUNTER_NAME3"), 0
99    )
100
101  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
102  def test_get_hardware_address_success(
103      self, mock_adb_shell: MagicMock
104  ) -> None:
105    mock_adb_shell.return_value = """
10646: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq ...
107 link/ether 72:05:77:82:21:e0 brd ff:ff:ff:ff:ff:ff
108"""
109    mac_address = get_hardware_address(self.mock_ad, "wlan0")
110    asserts.assert_equal(mac_address, "72:05:77:82:21:E0")
111
112  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
113  def test_get_hardware_address_not_found(
114      self, mock_adb_shell: MagicMock
115  ) -> None:
116    mock_adb_shell.return_value = "Some output without MAC address"
117    with asserts.assert_raises(PatternNotFoundException):
118      get_hardware_address(self.mock_ad, "wlan0")
119
120  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
121  def test_get_ipv4_addresses_success(
122      self, mock_adb_shell: MagicMock
123  ) -> None:
124    mock_adb_shell.return_value = """
12554: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 state UP qlen 1000
126    inet 192.168.195.162/24 brd 192.168.195.255 scope global wlan0
127       valid_lft forever preferred_lft forever
128    inet 192.168.200.1/24 brd 192.168.200.255 scope global wlan0
129       valid_lft forever preferred_lft forever
130"""
131    ip_addresses = get_ipv4_addresses(self.mock_ad, "wlan0")
132    asserts.assert_equal(ip_addresses, ["192.168.195.162", "192.168.200.1"])
133
134  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
135  def test_get_ipv4_addresses_not_found(
136      self, mock_adb_shell: MagicMock
137  ) -> None:
138    mock_adb_shell.return_value = ""
139    ip_addresses = get_ipv4_addresses(self.mock_ad, "wlan0")
140    asserts.assert_equal(ip_addresses, [])
141
142  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
143  def test_get_ipv6_addresses_success(
144      self, mock_adb_shell: MagicMock
145  ) -> None:
146    mock_adb_shell.return_value = """
14754: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 state UP qlen 1000
148    inet6 fe80::10a3:5dff:fe52:de32/64 scope link
149        valid_lft forever preferred_lft forever
150    inet6 2001:b400:e53f:164e:9c1e:780e:d1:4658/64 scope global dynamic mngtmpaddr noprefixroute
151        valid_lft 6995sec preferred_lft 6995sec
152    inet6 fe80::3aff:2199:2d8e:20d1/64 scope link noprefixroute
153        valid_lft forever preferred_lft forever
154"""
155    ip_addresses = get_ipv6_addresses(self.mock_ad, "wlan0")
156    asserts.assert_equal(ip_addresses,
157                         ["fe80::10a3:5dff:fe52:de32",
158                          "2001:b400:e53f:164e:9c1e:780e:d1:4658",
159                          "fe80::3aff:2199:2d8e:20d1"])
160
161  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
162  def test_get_ipv6_address_not_found(
163          self, mock_adb_shell: MagicMock
164  ) -> None:
165    mock_adb_shell.return_value = ""
166    ip_addresses = get_ipv6_addresses(self.mock_ad, "wlan0")
167    asserts.assert_equal(ip_addresses, [])
168
169  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
170  def test_send_raw_packet_downstream_success(
171      self, mock_adb_shell: MagicMock
172  ) -> None:
173    mock_adb_shell.return_value = ""  # Successful command output
174    send_raw_packet_downstream(
175        self.mock_ad, TEST_IFACE_NAME, TEST_PACKET_IN_HEX
176    )
177    mock_adb_shell.assert_called_once_with(
178        self.mock_ad,
179        "cmd network_stack send-raw-packet-downstream"
180        f" {TEST_IFACE_NAME} {TEST_PACKET_IN_HEX}",
181    )
182
183  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
184  def test_send_raw_packet_downstream_failure(
185      self, mock_adb_shell: MagicMock
186  ) -> None:
187    mock_adb_shell.return_value = (  # Unexpected command output
188        "Any Unexpected Output"
189    )
190    with asserts.assert_raises(UnexpectedBehaviorError):
191      send_raw_packet_downstream(
192          self.mock_ad, TEST_IFACE_NAME, TEST_PACKET_IN_HEX
193      )
194    asserts.assert_true(
195        is_send_raw_packet_downstream_supported(self.mock_ad),
196        "Send raw packet should be supported.",
197    )
198
199  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
200  def test_send_raw_packet_downstream_unsupported(
201      self, mock_adb_shell: MagicMock
202  ) -> None:
203    mock_adb_shell.side_effect = AdbError(
204        cmd="", stdout="Unknown command", stderr="", ret_code=3
205    )
206    with asserts.assert_raises(UnsupportedOperationException):
207      send_raw_packet_downstream(
208          self.mock_ad, TEST_IFACE_NAME, TEST_PACKET_IN_HEX
209      )
210    asserts.assert_false(
211        is_send_raw_packet_downstream_supported(self.mock_ad),
212        "Send raw packet should not be supported.",
213    )
214
215  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
216  def test_start_capture_success(
217          self, mock_adb_shell: MagicMock
218  ) -> None:
219      mock_adb_shell.return_value = "success"  # Successful command output
220      start_capture_packets(
221          self.mock_ad, TEST_IFACE_NAME
222      )
223      mock_adb_shell.assert_called_once_with(
224          self.mock_ad,
225          "cmd network_stack capture start"
226          f" {TEST_IFACE_NAME}"
227      )
228
229  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
230  def test_start_capture_failure(
231          self, mock_adb_shell: MagicMock
232  ) -> None:
233      mock_adb_shell.return_value = (  # Unexpected command output
234          "Any Unexpected Output"
235      )
236      with asserts.assert_raises(UnexpectedBehaviorError):
237          start_capture_packets(
238              self.mock_ad, TEST_IFACE_NAME
239          )
240      asserts.assert_true(
241          is_packet_capture_supported(self.mock_ad),
242          "Start capturing packets should be supported.",
243      )
244
245  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
246  def test_start_capture_unsupported(
247          self, mock_adb_shell: MagicMock
248  ) -> None:
249      mock_adb_shell.side_effect = AdbError(
250          cmd="", stdout="Unknown command", stderr="", ret_code=3
251      )
252      with asserts.assert_raises(UnsupportedOperationException):
253          start_capture_packets(
254              self.mock_ad, TEST_IFACE_NAME
255          )
256      asserts.assert_false(
257          is_packet_capture_supported(self.mock_ad),
258          "Start capturing packets should not be supported.",
259      )
260
261  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
262  def test_stop_capture_success(
263          self, mock_adb_shell: MagicMock
264  ) -> None:
265      mock_adb_shell.return_value = "success"  # Successful command output
266      stop_capture_packets(
267          self.mock_ad, TEST_IFACE_NAME
268      )
269      mock_adb_shell.assert_called_once_with(
270          self.mock_ad,
271          "cmd network_stack capture stop"
272          f" {TEST_IFACE_NAME}"
273      )
274
275  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
276  def test_stop_capture_failure(
277          self, mock_adb_shell: MagicMock
278  ) -> None:
279      mock_adb_shell.return_value = (  # Unexpected command output
280          "Any Unexpected Output"
281      )
282      with asserts.assert_raises(UnexpectedBehaviorError):
283          stop_capture_packets(
284              self.mock_ad, TEST_IFACE_NAME
285          )
286      asserts.assert_true(
287          is_packet_capture_supported(self.mock_ad),
288          "Stop capturing packets should be supported.",
289      )
290
291  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
292  def test_stop_capture_unsupported(
293          self, mock_adb_shell: MagicMock
294  ) -> None:
295      mock_adb_shell.side_effect = AdbError(
296          cmd="", stdout="Unknown command", stderr="", ret_code=3
297      )
298      with asserts.assert_raises(UnsupportedOperationException):
299          stop_capture_packets(
300              self.mock_ad, TEST_IFACE_NAME
301          )
302      asserts.assert_false(
303          is_packet_capture_supported(self.mock_ad),
304          "Stop capturing packets should not be supported.",
305      )
306
307  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
308  def test_get_matched_packet_counts_success(
309          self, mock_adb_shell: MagicMock
310  ) -> None:
311      mock_adb_shell.return_value = "10"  # Successful command output
312      get_matched_packet_counts(
313          self.mock_ad, TEST_IFACE_NAME, TEST_PACKET_IN_HEX
314      )
315      mock_adb_shell.assert_called_once_with(
316          self.mock_ad,
317          "cmd network_stack capture matched-packet-counts"
318          f" {TEST_IFACE_NAME} {TEST_PACKET_IN_HEX}"
319      )
320
321  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
322  def test_get_matched_packet_counts_failure(
323          self, mock_adb_shell: MagicMock
324  ) -> None:
325      mock_adb_shell.return_value = (  # Unexpected command output
326          "Any Unexpected Output"
327      )
328      with asserts.assert_raises(UnexpectedBehaviorError):
329          get_matched_packet_counts(
330              self.mock_ad, TEST_IFACE_NAME, TEST_PACKET_IN_HEX
331          )
332      asserts.assert_true(
333          is_packet_capture_supported(self.mock_ad),
334          "Get matched packet counts should be supported.",
335      )
336
337  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
338  def test_get_matched_packet_counts_unsupported(
339          self, mock_adb_shell: MagicMock
340  ) -> None:
341      mock_adb_shell.side_effect = AdbError(
342          cmd="", stdout="Unknown command", stderr="", ret_code=3
343      )
344      with asserts.assert_raises(UnsupportedOperationException):
345          get_matched_packet_counts(
346              self.mock_ad, TEST_IFACE_NAME, TEST_PACKET_IN_HEX
347          )
348      asserts.assert_false(
349          is_packet_capture_supported(self.mock_ad),
350          "Get matched packet counts should not be supported.",
351      )
352
353  @parameterized.parameters(
354      ("2,2048,1", ApfCapabilities(2, 2048, 1)),  # Valid input
355      ("3,1024,0", ApfCapabilities(3, 1024, 0)),  # Valid input
356      ("invalid,output", ApfCapabilities(0, 0, 0)),  # Invalid input
357      ("", ApfCapabilities(0, 0, 0)),  # Empty input
358  )
359  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
360  def test_get_apf_capabilities(
361      self, mock_output, expected_result, mock_adb_shell
362  ):
363    """Tests the get_apf_capabilities function with various inputs and expected results."""
364    # Configure the mock adb_shell to return the specified output
365    mock_adb_shell.return_value = mock_output
366
367    # Call the function under test
368    result = get_apf_capabilities(self.mock_ad, "wlan0")
369
370    # Assert that the result matches the expected result
371    asserts.assert_equal(result, expected_result)
372