1*9c5db199SXin Li# Lint as: python2, python3 2*9c5db199SXin Li# Copyright 2015 The Chromium OS Authors. All rights reserved. 3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be 4*9c5db199SXin Li# found in the LICENSE file. 5*9c5db199SXin Li 6*9c5db199SXin Li""" 7*9c5db199SXin LiBase class for DHCPv6 tests. This class just sets up a little bit of plumbing, 8*9c5db199SXin Lilike a virtual ethernet device with one end that looks like a real ethernet 9*9c5db199SXin Lidevice to shill and a DHCPv6 test server on the end that doesn't look like a 10*9c5db199SXin Lireal ethernet interface to shill. Child classes should override test_body() 11*9c5db199SXin Liwith the logic of their test. 12*9c5db199SXin Li""" 13*9c5db199SXin Li 14*9c5db199SXin Lifrom __future__ import absolute_import 15*9c5db199SXin Lifrom __future__ import division 16*9c5db199SXin Lifrom __future__ import print_function 17*9c5db199SXin Li 18*9c5db199SXin Liimport logging 19*9c5db199SXin Liimport six 20*9c5db199SXin Lifrom six.moves import filter 21*9c5db199SXin Lifrom six.moves import range 22*9c5db199SXin Liimport time 23*9c5db199SXin Liimport traceback 24*9c5db199SXin Li 25*9c5db199SXin Lifrom autotest_lib.client.bin import test 26*9c5db199SXin Lifrom autotest_lib.client.common_lib import error 27*9c5db199SXin Lifrom autotest_lib.client.common_lib.cros import virtual_ethernet_pair 28*9c5db199SXin Lifrom autotest_lib.client.cros import dhcpv6_test_server 29*9c5db199SXin Lifrom autotest_lib.client.cros.networking import shill_proxy 30*9c5db199SXin Li 31*9c5db199SXin Li# These are keys that may be used with the DBus dictionary returned from 32*9c5db199SXin Li# Dhcpv6TestBase.get_interface_ipconfig(). 33*9c5db199SXin LiDHCPV6_KEY_ADDRESS = 'Address' 34*9c5db199SXin LiDHCPV6_KEY_DELEGATED_PREFIX = 'DelegatedPrefix' 35*9c5db199SXin LiDHCPV6_KEY_DELEGATED_PREFIX_LENGTH = 'DelegatedPrefixLength' 36*9c5db199SXin LiDHCPV6_KEY_NAMESERVERS = 'NameServers' 37*9c5db199SXin LiDHCPV6_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains' 38*9c5db199SXin Li 39*9c5db199SXin Li# After DHCPv6 completes, an ipconfig should appear shortly after 40*9c5db199SXin LiIPCONFIG_POLL_COUNT = 5 41*9c5db199SXin LiIPCONFIG_POLL_PERIOD_SECONDS = 1 42*9c5db199SXin Li 43*9c5db199SXin Liclass Dhcpv6TestBase(test.test): 44*9c5db199SXin Li """Parent class for tests that work verify DHCPv6 behavior.""" 45*9c5db199SXin Li version = 1 46*9c5db199SXin Li 47*9c5db199SXin Li def get_device(self, interface_name): 48*9c5db199SXin Li """Finds the corresponding Device object for an interface with 49*9c5db199SXin Li the name |interface_name|. 50*9c5db199SXin Li 51*9c5db199SXin Li @param interface_name string The name of the interface to check. 52*9c5db199SXin Li 53*9c5db199SXin Li @return DBus interface object representing the associated device. 54*9c5db199SXin Li 55*9c5db199SXin Li """ 56*9c5db199SXin Li return self.shill_proxy.find_object('Device', 57*9c5db199SXin Li {'Name': interface_name}) 58*9c5db199SXin Li 59*9c5db199SXin Li 60*9c5db199SXin Li def find_ethernet_service(self, interface_name): 61*9c5db199SXin Li """Finds the corresponding service object for an Ethernet interface. 62*9c5db199SXin Li 63*9c5db199SXin Li @param interface_name string The name of the associated interface 64*9c5db199SXin Li 65*9c5db199SXin Li @return Service object representing the associated service. 66*9c5db199SXin Li 67*9c5db199SXin Li """ 68*9c5db199SXin Li device = self.get_device(interface_name) 69*9c5db199SXin Li device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path) 70*9c5db199SXin Li return self.shill_proxy.find_object('Service', {'Device': device_path}) 71*9c5db199SXin Li 72*9c5db199SXin Li 73*9c5db199SXin Li def get_interface_ipconfig_objects(self, interface_name): 74*9c5db199SXin Li """ 75*9c5db199SXin Li Returns a list of dbus object proxies for |interface_name|. 76*9c5db199SXin Li Returns an empty list if no such interface exists. 77*9c5db199SXin Li 78*9c5db199SXin Li @param interface_name string name of the device to query (e.g., "eth0"). 79*9c5db199SXin Li 80*9c5db199SXin Li @return list of objects representing DBus IPConfig RPC endpoints. 81*9c5db199SXin Li 82*9c5db199SXin Li """ 83*9c5db199SXin Li device = self.get_device(interface_name) 84*9c5db199SXin Li if device is None: 85*9c5db199SXin Li return [] 86*9c5db199SXin Li 87*9c5db199SXin Li if six.PY2: 88*9c5db199SXin Li device_properties = device.GetProperties(utf8_strings=True) 89*9c5db199SXin Li else: 90*9c5db199SXin Li device_properties = device.GetProperties() 91*9c5db199SXin Li proxy = self.shill_proxy 92*9c5db199SXin Li 93*9c5db199SXin Li ipconfig_object = proxy.DBUS_TYPE_IPCONFIG 94*9c5db199SXin Li return list(filter(bool, 95*9c5db199SXin Li [ proxy.get_dbus_object(ipconfig_object, property_path) 96*9c5db199SXin Li for property_path in device_properties['IPConfigs'] ])) 97*9c5db199SXin Li 98*9c5db199SXin Li 99*9c5db199SXin Li def get_interface_ipconfig(self, interface_name): 100*9c5db199SXin Li """ 101*9c5db199SXin Li Returns a dictionary containing settings for an |interface_name| set 102*9c5db199SXin Li via DHCPv6. Returns None if no such interface or setting bundle on 103*9c5db199SXin Li that interface can be found in shill. 104*9c5db199SXin Li 105*9c5db199SXin Li @param interface_name string name of the device to query (e.g., "eth0"). 106*9c5db199SXin Li 107*9c5db199SXin Li @return dict containing the the properties of the IPConfig stripped 108*9c5db199SXin Li of DBus meta-data or None. 109*9c5db199SXin Li 110*9c5db199SXin Li """ 111*9c5db199SXin Li dhcp_properties = None 112*9c5db199SXin Li for ipconfig in self.get_interface_ipconfig_objects(interface_name): 113*9c5db199SXin Li logging.info('Looking at ipconfig %r', ipconfig) 114*9c5db199SXin Li if six.PY2: 115*9c5db199SXin Li ipconfig_properties = ipconfig.GetProperties(utf8_strings=True) 116*9c5db199SXin Li else: 117*9c5db199SXin Li ipconfig_properties = ipconfig.GetProperties() 118*9c5db199SXin Li if 'Method' not in ipconfig_properties: 119*9c5db199SXin Li logging.info('Found ipconfig object with no method field') 120*9c5db199SXin Li continue 121*9c5db199SXin Li if ipconfig_properties['Method'] != 'dhcp6': 122*9c5db199SXin Li logging.info('Found ipconfig object with method != dhcp6') 123*9c5db199SXin Li continue 124*9c5db199SXin Li if dhcp_properties != None: 125*9c5db199SXin Li raise error.TestFail('Found multiple ipconfig objects ' 126*9c5db199SXin Li 'with method == dhcp6') 127*9c5db199SXin Li dhcp_properties = ipconfig_properties 128*9c5db199SXin Li if dhcp_properties is None: 129*9c5db199SXin Li logging.info('Did not find IPConfig object with method == dhcp6') 130*9c5db199SXin Li return None 131*9c5db199SXin Li logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties) 132*9c5db199SXin Li return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties) 133*9c5db199SXin Li 134*9c5db199SXin Li 135*9c5db199SXin Li def run_once(self): 136*9c5db199SXin Li self._server = None 137*9c5db199SXin Li self._server_ip = None 138*9c5db199SXin Li self._ethernet_pair = None 139*9c5db199SXin Li self._shill_proxy = shill_proxy.ShillProxy() 140*9c5db199SXin Li try: 141*9c5db199SXin Li # TODO(zqiu): enable DHCPv6 for peer interface, either by restarting 142*9c5db199SXin Li # shill with appropriate command line options or via a new DBUS 143*9c5db199SXin Li # command. 144*9c5db199SXin Li self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair( 145*9c5db199SXin Li interface_ip=None, 146*9c5db199SXin Li peer_interface_name='pseudoethernet0', 147*9c5db199SXin Li peer_interface_ip=None, 148*9c5db199SXin Li interface_ipv6=dhcpv6_test_server.DHCPV6_SERVER_ADDRESS) 149*9c5db199SXin Li self._ethernet_pair.setup() 150*9c5db199SXin Li if not self._ethernet_pair.is_healthy: 151*9c5db199SXin Li raise error.TestFail('Could not create virtual ethernet pair.') 152*9c5db199SXin Li self._server_ip = self._ethernet_pair.interface_ip 153*9c5db199SXin Li self._server = dhcpv6_test_server.Dhcpv6TestServer( 154*9c5db199SXin Li self._ethernet_pair.interface_name) 155*9c5db199SXin Li self._server.start() 156*9c5db199SXin Li self.test_body() 157*9c5db199SXin Li except (error.TestFail, error.TestNAError): 158*9c5db199SXin Li # Pass these through without modification. 159*9c5db199SXin Li raise 160*9c5db199SXin Li except Exception as e: 161*9c5db199SXin Li logging.error('Caught exception: %s.', str(e)) 162*9c5db199SXin Li logging.error('Trace: %s', traceback.format_exc()) 163*9c5db199SXin Li raise error.TestFail('Caught exception: %s.' % str(e)) 164*9c5db199SXin Li finally: 165*9c5db199SXin Li if self._server is not None: 166*9c5db199SXin Li self._server.stop() 167*9c5db199SXin Li if self._ethernet_pair is not None: 168*9c5db199SXin Li self._ethernet_pair.teardown() 169*9c5db199SXin Li 170*9c5db199SXin Li def test_body(self): 171*9c5db199SXin Li """ 172*9c5db199SXin Li Override this method with the body of your test. You may safely assume 173*9c5db199SXin Li that the the properties exposed by DhcpTestBase correctly return 174*9c5db199SXin Li references to the test apparatus. 175*9c5db199SXin Li """ 176*9c5db199SXin Li raise error.TestFail('No test body implemented') 177*9c5db199SXin Li 178*9c5db199SXin Li @property 179*9c5db199SXin Li def server_ip(self): 180*9c5db199SXin Li """ 181*9c5db199SXin Li Return the IP address of the side of the interface that the DHCPv6 test 182*9c5db199SXin Li server is bound to. The server itself is bound the the broadcast 183*9c5db199SXin Li address on the interface. 184*9c5db199SXin Li """ 185*9c5db199SXin Li return self._server_ip 186*9c5db199SXin Li 187*9c5db199SXin Li @property 188*9c5db199SXin Li def server(self): 189*9c5db199SXin Li """ 190*9c5db199SXin Li Returns a reference to the DHCP test server. Use this to add handlers 191*9c5db199SXin Li and run tests. 192*9c5db199SXin Li """ 193*9c5db199SXin Li return self._server 194*9c5db199SXin Li 195*9c5db199SXin Li @property 196*9c5db199SXin Li def ethernet_pair(self): 197*9c5db199SXin Li """ 198*9c5db199SXin Li Returns a reference to the virtual ethernet pair created to run DHCP 199*9c5db199SXin Li tests on. 200*9c5db199SXin Li """ 201*9c5db199SXin Li return self._ethernet_pair 202*9c5db199SXin Li 203*9c5db199SXin Li @property 204*9c5db199SXin Li def shill_proxy(self): 205*9c5db199SXin Li """ 206*9c5db199SXin Li Returns a the shill proxy instance. 207*9c5db199SXin Li """ 208*9c5db199SXin Li return self._shill_proxy 209*9c5db199SXin Li 210*9c5db199SXin Li 211*9c5db199SXin Li def check_dhcpv6_config(self): 212*9c5db199SXin Li """ 213*9c5db199SXin Li Compare the DHCPv6 ipconfig with DHCP lease parameters to ensure 214*9c5db199SXin Li that the DUT attained the correct values. 215*9c5db199SXin Li 216*9c5db199SXin Li """ 217*9c5db199SXin Li # Retrieve DHCPv6 configuration. 218*9c5db199SXin Li for attempt in range(IPCONFIG_POLL_COUNT): 219*9c5db199SXin Li dhcpv6_config = self.get_interface_ipconfig( 220*9c5db199SXin Li self.ethernet_pair.peer_interface_name) 221*9c5db199SXin Li # Wait until both IP address and delegated prefix are obtained. 222*9c5db199SXin Li if (dhcpv6_config is not None and 223*9c5db199SXin Li dhcpv6_config.get(DHCPV6_KEY_ADDRESS) and 224*9c5db199SXin Li dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)): 225*9c5db199SXin Li break; 226*9c5db199SXin Li time.sleep(IPCONFIG_POLL_PERIOD_SECONDS) 227*9c5db199SXin Li else: 228*9c5db199SXin Li raise error.TestFail('Failed to retrieve DHCPv6 ipconfig object ' 229*9c5db199SXin Li 'from shill.') 230*9c5db199SXin Li 231*9c5db199SXin Li # Verify Non-temporary Address prefix. 232*9c5db199SXin Li address = dhcpv6_config.get(DHCPV6_KEY_ADDRESS) 233*9c5db199SXin Li actual_prefix = address[:address.index('::')] 234*9c5db199SXin Li expected_prefix = dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX[: 235*9c5db199SXin Li dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX.index('::')] 236*9c5db199SXin Li if actual_prefix != expected_prefix: 237*9c5db199SXin Li raise error.TestFail('Address prefix mismatch: ' 238*9c5db199SXin Li 'actual %s expected %s.' % 239*9c5db199SXin Li (actual_prefix, expected_prefix)) 240*9c5db199SXin Li # Verify Non-temporary Address suffix. 241*9c5db199SXin Li actual_suffix = int(address[address.index('::')+2:], 16) 242*9c5db199SXin Li if (actual_suffix < dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW or 243*9c5db199SXin Li actual_suffix > dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH): 244*9c5db199SXin Li raise error.TestFail('Invalid address suffix: ' 245*9c5db199SXin Li 'actual %x expected (%x-%x)' % 246*9c5db199SXin Li (actual_suffix, 247*9c5db199SXin Li dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW, 248*9c5db199SXin Li dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH)) 249*9c5db199SXin Li 250*9c5db199SXin Li # Verify delegated prefix. 251*9c5db199SXin Li delegated_prefix = dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX) 252*9c5db199SXin Li for x in range( 253*9c5db199SXin Li dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_LOW, 254*9c5db199SXin Li dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_HIGH+1): 255*9c5db199SXin Li valid_prefix = \ 256*9c5db199SXin Li dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_RANGE_FORMAT % x 257*9c5db199SXin Li if delegated_prefix == valid_prefix: 258*9c5db199SXin Li break; 259*9c5db199SXin Li else: 260*9c5db199SXin Li raise error.TestFail('Invalid delegated prefix: %s' % 261*9c5db199SXin Li (delegated_prefix)) 262*9c5db199SXin Li # Verify delegated prefix length. 263*9c5db199SXin Li delegated_prefix_length = \ 264*9c5db199SXin Li int(dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX_LENGTH)) 265*9c5db199SXin Li expected_delegated_prefix_length = \ 266*9c5db199SXin Li dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_PREFIX_LENGTH 267*9c5db199SXin Li if delegated_prefix_length != expected_delegated_prefix_length: 268*9c5db199SXin Li raise error.TestFail('Delegated prefix length mismatch: ' 269*9c5db199SXin Li 'actual %d expected %d' % 270*9c5db199SXin Li (delegated_prefix_length, 271*9c5db199SXin Li expected_delegated_prefix_length)) 272*9c5db199SXin Li 273*9c5db199SXin Li # Verify name servers. 274*9c5db199SXin Li actual_name_servers = dhcpv6_config.get(DHCPV6_KEY_NAMESERVERS) 275*9c5db199SXin Li expected_name_servers = \ 276*9c5db199SXin Li dhcpv6_test_server.DHCPV6_NAME_SERVERS.split(',') 277*9c5db199SXin Li if actual_name_servers != expected_name_servers: 278*9c5db199SXin Li raise error.TestFail('Name servers mismatch: actual %r expected %r' 279*9c5db199SXin Li % (actual_name_servers, expected_name_servers)) 280*9c5db199SXin Li # Verify domain search. 281*9c5db199SXin Li actual_domain_search = dhcpv6_config.get(DHCPV6_KEY_SEARCH_DOMAIN_LIST) 282*9c5db199SXin Li expected_domain_search = \ 283*9c5db199SXin Li dhcpv6_test_server.DHCPV6_DOMAIN_SEARCH.split(',') 284*9c5db199SXin Li if actual_domain_search != expected_domain_search: 285*9c5db199SXin Li raise error.TestFail('Domain search list mismatch: ' 286*9c5db199SXin Li 'actual %r expected %r' % 287*9c5db199SXin Li (actual_domain_search, expected_domain_search)) 288