xref: /aosp_15_r20/external/autotest/client/cros/dhcp_test_base.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""
7Base class for DHCP tests.  This class just sets up a little bit of plumbing,
8like a virtual ethernet device with one end that looks like a real ethernet
9device to shill and a DHCP test server on the end that doesn't look like a real
10ethernet interface to shill.  Child classes should override test_body() with the
11logic of their test.  The plumbing of DhcpTestBase is accessible via properties.
12"""
13
14from __future__ import absolute_import
15from __future__ import division
16from __future__ import print_function
17
18import logging
19import six
20from six.moves import filter
21from six.moves import range
22import socket
23import struct
24import time
25import traceback
26
27from autotest_lib.client.bin import test
28from autotest_lib.client.common_lib import error
29from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
30from autotest_lib.client.cros import dhcp_handling_rule
31from autotest_lib.client.cros import dhcp_packet
32from autotest_lib.client.cros import dhcp_test_server
33from autotest_lib.client.cros.networking import shill_proxy
34
35
36# These are keys that may be used with the DBus dictionary returned from
37# DhcpTestBase.get_interface_ipconfig().
38DHCPCD_KEY_NAMESERVERS = 'NameServers'
39DHCPCD_KEY_GATEWAY = 'Gateway'
40DHCPCD_KEY_BROADCAST_ADDR = 'Broadcast'
41DHCPCD_KEY_ADDRESS = 'Address'
42DHCPCD_KEY_PREFIX_LENGTH = 'Prefixlen'
43DHCPCD_KEY_DOMAIN_NAME = 'DomainName'
44DHCPCD_KEY_ACCEPTED_HOSTNAME = 'AcceptedHostname'
45DHCPCD_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains'
46
47# We should be able to complete a DHCP negotiation in this amount of time.
48DHCP_NEGOTIATION_TIMEOUT_SECONDS = 10
49
50# After DHCP completes, an ipconfig should appear shortly after
51IPCONFIG_POLL_COUNT = 5
52IPCONFIG_POLL_PERIOD_SECONDS = 0.5
53
54class DhcpTestBase(test.test):
55    """Parent class for tests that work verify DHCP behavior."""
56    version = 1
57
58    def __init__(self, job, bindir, outputdir, namespace='autotest'):
59        test.test.__init__(self, job, bindir, outputdir)
60        self._namespace = namespace
61
62    @staticmethod
63    def rewrite_ip_suffix(subnet_mask, ip_in_subnet, ip_suffix):
64        """
65        Create a new IPv4 address in a subnet by bitwise and'ing an existing
66        address |ip_in_subnet| with |subnet_mask| and bitwise or'ing in
67        |ip_suffix|.  For safety, bitwise or the suffix with the complement of
68        the subnet mask.
69
70        Usage: rewrite_ip_suffix("255.255.255.0", "192.168.1.1", "0.0.0.105")
71
72        The example usage will return "192.168.1.105".
73
74        @param subnet_mask string subnet mask, e.g. "255.255.255.0"
75        @param ip_in_subnet string an IP address in the desired subnet
76        @param ip_suffix string suffix desired for new address, e.g. "0.0.0.105"
77
78        @return string IP address on in the same subnet with specified suffix.
79
80        """
81        mask = struct.unpack('!I', socket.inet_aton(subnet_mask))[0]
82        subnet = mask & struct.unpack('!I', socket.inet_aton(ip_in_subnet))[0]
83        suffix = ~mask & struct.unpack('!I', socket.inet_aton(ip_suffix))[0]
84        return socket.inet_ntoa(struct.pack('!I', (subnet | suffix)))
85
86
87    def get_device(self, interface_name):
88        """Finds the corresponding Device object for an interface with
89        the name |interface_name|.
90
91        @param interface_name string The name of the interface to check.
92
93        @return DBus interface object representing the associated device.
94
95        """
96        return self.shill_proxy.find_object('Device',
97                                            {'Name': interface_name})
98
99
100    def find_ethernet_service(self, interface_name):
101        """Finds the corresponding service object for an Ethernet interface.
102
103        @param interface_name string The name of the associated interface
104
105        @return Service object representing the associated service.
106
107        """
108        device = self.get_device(interface_name)
109        device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path)
110        return self.shill_proxy.find_object('Service', {'Device': device_path})
111
112
113    def get_interface_ipconfig_objects(self, interface_name):
114        """
115        Returns a list of dbus object proxies for |interface_name|.
116        Returns an empty list if no such interface exists.
117
118        @param interface_name string name of the device to query (e.g., "eth0").
119
120        @return list of objects representing DBus IPConfig RPC endpoints.
121
122        """
123        device = self.get_device(interface_name)
124        if device is None:
125            return []
126
127        if six.PY2:
128            device_properties = device.GetProperties(utf8_strings=True)
129        else:
130            device_properties = device.GetProperties()
131        proxy = self.shill_proxy
132
133        ipconfig_object = proxy.DBUS_TYPE_IPCONFIG
134        return list(filter(bool,
135                      [ proxy.get_dbus_object(ipconfig_object, property_path)
136                        for property_path in device_properties['IPConfigs'] ]))
137
138
139    def get_interface_ipconfig(self, interface_name):
140        """
141        Returns a dictionary containing settings for an |interface_name| set
142        via DHCP.  Returns None if no such interface or setting bundle on
143        that interface can be found in shill.
144
145        @param interface_name string name of the device to query (e.g., "eth0").
146
147        @return dict containing the the properties of the IPConfig stripped
148            of DBus meta-data or None.
149
150        """
151        dhcp_properties = None
152        for ipconfig in self.get_interface_ipconfig_objects(interface_name):
153            logging.info('Looking at ipconfig %r', ipconfig)
154            if six.PY2:
155                ipconfig_properties = ipconfig.GetProperties(utf8_strings=True)
156            else:
157                ipconfig_properties = ipconfig.GetProperties()
158            if 'Method' not in ipconfig_properties:
159                logging.info('Found ipconfig object with no method field')
160                continue
161            if ipconfig_properties['Method'] != 'dhcp':
162                logging.info('Found ipconfig object with method != dhcp')
163                continue
164            if dhcp_properties != None:
165                raise error.TestFail('Found multiple ipconfig objects '
166                                     'with method == dhcp')
167            dhcp_properties = ipconfig_properties
168        if dhcp_properties is None:
169            logging.info('Did not find IPConfig object with method == dhcp')
170            return None
171        logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties)
172        return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties)
173
174
175    def run_once(self):
176        self._server = None
177        self._server_ip = None
178        self._ethernet_pair = None
179        self._server = None
180        self._shill_proxy = shill_proxy.ShillProxy()
181        try:
182            self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair(
183                    interface_ns=self._namespace,
184                    peer_interface_name='pseudoethernet0',
185                    peer_interface_ip=None)
186            self._ethernet_pair.setup()
187            if not self._ethernet_pair.is_healthy:
188                raise error.TestFail('Could not create virtual ethernet pair.')
189            self._server_ip = self._ethernet_pair.interface_ip
190            self._server = dhcp_test_server.DhcpTestServer(
191                    interface=self._ethernet_pair.interface_name,
192                    ingress_address='',
193                    namespace=self._namespace)
194            self._server.start()
195            if not self._server.is_healthy:
196                raise error.TestFail('Could not start DHCP test server.')
197            self._subnet_mask = self._ethernet_pair.interface_subnet_mask
198            self.test_body()
199        except (error.TestFail, error.TestNAError):
200            # Pass these through without modification.
201            raise
202        except Exception as e:
203            logging.error('Caught exception: %s.', str(e))
204            logging.error('Trace: %s', traceback.format_exc())
205            raise error.TestFail('Caught exception: %s.' % str(e))
206        finally:
207            if self._server is not None:
208                self._server.stop()
209            if self._ethernet_pair is not None:
210                self._ethernet_pair.teardown()
211
212    def test_body(self):
213        """
214        Override this method with the body of your test.  You may safely assume
215        that the the properties exposed by DhcpTestBase correctly return
216        references to the test apparatus.
217        """
218        raise error.TestFail('No test body implemented')
219
220    @property
221    def server_ip(self):
222        """
223        Return the IP address of the side of the interface that the DHCP test
224        server is bound to.  The server itself is bound the the broadcast
225        address on the interface.
226        """
227        return self._server_ip
228
229    @property
230    def server(self):
231        """
232        Returns a reference to the DHCP test server.  Use this to add handlers
233        and run tests.
234        """
235        return self._server
236
237    @property
238    def ethernet_pair(self):
239        """
240        Returns a reference to the virtual ethernet pair created to run DHCP
241        tests on.
242        """
243        return self._ethernet_pair
244
245    @property
246    def shill_proxy(self):
247        """
248        Returns a the shill proxy instance.
249        """
250        return self._shill_proxy
251
252    def negotiate_and_check_lease(self,
253                                  dhcp_options,
254                                  custom_fields={},
255                                  disable_check=False):
256        """
257        Perform DHCP lease negotiation, and ensure that the resulting
258        ipconfig matches the DHCP options provided to the server.
259
260        @param dhcp_options dict of properties the DHCP server should provide.
261        @param custom_fields dict of custom DHCP parameters to add to server.
262        @param disable_check bool whether to perform IPConfig parameter
263             checking.
264
265        """
266        if dhcp_packet.OPTION_REQUESTED_IP not in dhcp_options:
267            raise error.TestFail('You must specify OPTION_REQUESTED_IP to '
268                                 'negotiate a DHCP lease')
269        intended_ip = dhcp_options[dhcp_packet.OPTION_REQUESTED_IP]
270        # Build up the handling rules for the server and start the test.
271        rules = []
272        rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
273                intended_ip,
274                self.server_ip,
275                dhcp_options,
276                custom_fields))
277        rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
278                intended_ip,
279                self.server_ip,
280                dhcp_options,
281                custom_fields))
282        rules[-1].is_final_handler = True
283        self.server.start_test(rules, DHCP_NEGOTIATION_TIMEOUT_SECONDS)
284        logging.info('Server is negotiating new lease with options: %s',
285                     dhcp_options)
286        self.server.wait_for_test_to_finish()
287        if not self.server.last_test_passed:
288            raise error.TestFail(
289                'Test failed: active rule is %s' % self.server.current_rule)
290
291        if disable_check:
292            logging.info('Skipping check of negotiated DHCP lease parameters.')
293        else:
294            self.wait_for_dhcp_propagation()
295            self.check_dhcp_config(dhcp_options)
296
297    def wait_for_dhcp_propagation(self):
298        """
299        Wait for configuration to propagate over dbus to shill.
300        TODO(wiley) Make this event based.  This is pretty sloppy.
301        """
302        time.sleep(0.1)
303
304    def check_dhcp_config(self, dhcp_options):
305        """
306        Compare the DHCP ipconfig with DHCP lease parameters to ensure
307        that the DUT attained the correct values.
308
309        @param dhcp_options dict of properties the DHCP server provided.
310
311        """
312        # The config is what the interface was actually configured with, as
313        # opposed to dhcp_options, which is what the server expected it be
314        # configured with.
315        for attempt in range(IPCONFIG_POLL_COUNT):
316            dhcp_config = self.get_interface_ipconfig(
317                    self.ethernet_pair.peer_interface_name)
318            if dhcp_config is not None:
319                break
320            time.sleep(IPCONFIG_POLL_PERIOD_SECONDS)
321        else:
322            raise error.TestFail('Failed to retrieve DHCP ipconfig object '
323                                 'from shill.')
324
325        logging.debug('Got DHCP config: %s', str(dhcp_config))
326        expected_address = dhcp_options.get(dhcp_packet.OPTION_REQUESTED_IP)
327        configured_address = dhcp_config.get(DHCPCD_KEY_ADDRESS)
328        if expected_address != configured_address:
329            raise error.TestFail('Interface configured with IP address not '
330                                 'granted by the DHCP server after DHCP '
331                                 'negotiation.  Expected %s but got %s.' %
332                                 (expected_address, configured_address))
333
334        # While DNS related settings only propagate to the system when the
335        # service is marked as the default service, we can still check the
336        # IP address on the interface, since that is set immediately.
337        interface_address = self.ethernet_pair.peer_interface_ip
338        if expected_address != interface_address:
339            raise error.TestFail('shill somehow knew about the proper DHCP '
340                                 'assigned address: %s, but configured the '
341                                 'interface with something completely '
342                                 'different: %s.' %
343                                 (expected_address, interface_address))
344
345        expected_dns_servers = dhcp_options.get(dhcp_packet.OPTION_DNS_SERVERS)
346        configured_dns_servers = dhcp_config.get(DHCPCD_KEY_NAMESERVERS)
347        if (expected_dns_servers is not None and
348            expected_dns_servers != configured_dns_servers):
349            raise error.TestFail('Expected to be configured with DNS server '
350                                 'list %s, but was configured with %s '
351                                 'instead.' % (expected_dns_servers,
352                                               configured_dns_servers))
353
354        expected_domain_name = dhcp_options.get(dhcp_packet.OPTION_DOMAIN_NAME)
355        configured_domain_name = dhcp_config.get(DHCPCD_KEY_DOMAIN_NAME)
356        if (expected_domain_name is not None and
357            expected_domain_name != configured_domain_name):
358            raise error.TestFail('Expected to be configured with domain '
359                                 'name %s, but got %s instead.' %
360                                 (expected_domain_name, configured_domain_name))
361
362        expected_host_name = dhcp_options.get(dhcp_packet.OPTION_HOST_NAME)
363        configured_host_name = dhcp_config.get(DHCPCD_KEY_ACCEPTED_HOSTNAME)
364        if (expected_host_name is not None and
365            expected_host_name != configured_host_name):
366            raise error.TestFail('Expected to be configured with host '
367                                 'name %s, but got %s instead.' %
368                                 (expected_host_name, configured_host_name))
369
370        expected_search_list = dhcp_options.get(
371                dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST)
372        configured_search_list = dhcp_config.get(DHCPCD_KEY_SEARCH_DOMAIN_LIST)
373        if (expected_search_list is not None and
374            expected_search_list != configured_search_list):
375            raise error.TestFail('Expected to be configured with domain '
376                                 'search list %s, but got %s instead.' %
377                                 (expected_search_list, configured_search_list))
378
379        expected_routers = dhcp_options.get(dhcp_packet.OPTION_ROUTERS)
380        if (not expected_routers and
381            dhcp_options.get(dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES)):
382            classless_static_routes = dhcp_options[
383                dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES]
384            for prefix, destination, gateway in classless_static_routes:
385                if not prefix:
386                    logging.info('Using %s as the default gateway', gateway)
387                    expected_routers = [ gateway ]
388                    break
389        configured_router = dhcp_config.get(DHCPCD_KEY_GATEWAY)
390        if expected_routers and expected_routers[0] != configured_router:
391            raise error.TestFail('Expected to be configured with gateway %s, '
392                                 'but got %s instead.' %
393                                 (expected_routers[0], configured_router))
394
395        self.server.wait_for_test_to_finish()
396        if not self.server.last_test_passed:
397            raise error.TestFail('Test server didn\'t get all the messages it '
398                                 'was told to expect for renewal.')
399