1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7Base class for DHCP tests. This class just sets up a little bit of plumbing, 8like a virtual ethernet device with one end that looks like a real ethernet 9device to shill and a DHCP test server on the end that doesn't look like a real 10ethernet interface to shill. Child classes should override test_body() with the 11logic of their test. The plumbing of DhcpTestBase is accessible via properties. 12""" 13 14from __future__ import absolute_import 15from __future__ import division 16from __future__ import print_function 17 18import logging 19import six 20from six.moves import filter 21from six.moves import range 22import socket 23import struct 24import time 25import traceback 26 27from autotest_lib.client.bin import test 28from autotest_lib.client.common_lib import error 29from autotest_lib.client.common_lib.cros import virtual_ethernet_pair 30from autotest_lib.client.cros import dhcp_handling_rule 31from autotest_lib.client.cros import dhcp_packet 32from autotest_lib.client.cros import dhcp_test_server 33from autotest_lib.client.cros.networking import shill_proxy 34 35 36# These are keys that may be used with the DBus dictionary returned from 37# DhcpTestBase.get_interface_ipconfig(). 38DHCPCD_KEY_NAMESERVERS = 'NameServers' 39DHCPCD_KEY_GATEWAY = 'Gateway' 40DHCPCD_KEY_BROADCAST_ADDR = 'Broadcast' 41DHCPCD_KEY_ADDRESS = 'Address' 42DHCPCD_KEY_PREFIX_LENGTH = 'Prefixlen' 43DHCPCD_KEY_DOMAIN_NAME = 'DomainName' 44DHCPCD_KEY_ACCEPTED_HOSTNAME = 'AcceptedHostname' 45DHCPCD_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains' 46 47# We should be able to complete a DHCP negotiation in this amount of time. 48DHCP_NEGOTIATION_TIMEOUT_SECONDS = 10 49 50# After DHCP completes, an ipconfig should appear shortly after 51IPCONFIG_POLL_COUNT = 5 52IPCONFIG_POLL_PERIOD_SECONDS = 0.5 53 54class DhcpTestBase(test.test): 55 """Parent class for tests that work verify DHCP behavior.""" 56 version = 1 57 58 def __init__(self, job, bindir, outputdir, namespace='autotest'): 59 test.test.__init__(self, job, bindir, outputdir) 60 self._namespace = namespace 61 62 @staticmethod 63 def rewrite_ip_suffix(subnet_mask, ip_in_subnet, ip_suffix): 64 """ 65 Create a new IPv4 address in a subnet by bitwise and'ing an existing 66 address |ip_in_subnet| with |subnet_mask| and bitwise or'ing in 67 |ip_suffix|. For safety, bitwise or the suffix with the complement of 68 the subnet mask. 69 70 Usage: rewrite_ip_suffix("255.255.255.0", "192.168.1.1", "0.0.0.105") 71 72 The example usage will return "192.168.1.105". 73 74 @param subnet_mask string subnet mask, e.g. "255.255.255.0" 75 @param ip_in_subnet string an IP address in the desired subnet 76 @param ip_suffix string suffix desired for new address, e.g. "0.0.0.105" 77 78 @return string IP address on in the same subnet with specified suffix. 79 80 """ 81 mask = struct.unpack('!I', socket.inet_aton(subnet_mask))[0] 82 subnet = mask & struct.unpack('!I', socket.inet_aton(ip_in_subnet))[0] 83 suffix = ~mask & struct.unpack('!I', socket.inet_aton(ip_suffix))[0] 84 return socket.inet_ntoa(struct.pack('!I', (subnet | suffix))) 85 86 87 def get_device(self, interface_name): 88 """Finds the corresponding Device object for an interface with 89 the name |interface_name|. 90 91 @param interface_name string The name of the interface to check. 92 93 @return DBus interface object representing the associated device. 94 95 """ 96 return self.shill_proxy.find_object('Device', 97 {'Name': interface_name}) 98 99 100 def find_ethernet_service(self, interface_name): 101 """Finds the corresponding service object for an Ethernet interface. 102 103 @param interface_name string The name of the associated interface 104 105 @return Service object representing the associated service. 106 107 """ 108 device = self.get_device(interface_name) 109 device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path) 110 return self.shill_proxy.find_object('Service', {'Device': device_path}) 111 112 113 def get_interface_ipconfig_objects(self, interface_name): 114 """ 115 Returns a list of dbus object proxies for |interface_name|. 116 Returns an empty list if no such interface exists. 117 118 @param interface_name string name of the device to query (e.g., "eth0"). 119 120 @return list of objects representing DBus IPConfig RPC endpoints. 121 122 """ 123 device = self.get_device(interface_name) 124 if device is None: 125 return [] 126 127 if six.PY2: 128 device_properties = device.GetProperties(utf8_strings=True) 129 else: 130 device_properties = device.GetProperties() 131 proxy = self.shill_proxy 132 133 ipconfig_object = proxy.DBUS_TYPE_IPCONFIG 134 return list(filter(bool, 135 [ proxy.get_dbus_object(ipconfig_object, property_path) 136 for property_path in device_properties['IPConfigs'] ])) 137 138 139 def get_interface_ipconfig(self, interface_name): 140 """ 141 Returns a dictionary containing settings for an |interface_name| set 142 via DHCP. Returns None if no such interface or setting bundle on 143 that interface can be found in shill. 144 145 @param interface_name string name of the device to query (e.g., "eth0"). 146 147 @return dict containing the the properties of the IPConfig stripped 148 of DBus meta-data or None. 149 150 """ 151 dhcp_properties = None 152 for ipconfig in self.get_interface_ipconfig_objects(interface_name): 153 logging.info('Looking at ipconfig %r', ipconfig) 154 if six.PY2: 155 ipconfig_properties = ipconfig.GetProperties(utf8_strings=True) 156 else: 157 ipconfig_properties = ipconfig.GetProperties() 158 if 'Method' not in ipconfig_properties: 159 logging.info('Found ipconfig object with no method field') 160 continue 161 if ipconfig_properties['Method'] != 'dhcp': 162 logging.info('Found ipconfig object with method != dhcp') 163 continue 164 if dhcp_properties != None: 165 raise error.TestFail('Found multiple ipconfig objects ' 166 'with method == dhcp') 167 dhcp_properties = ipconfig_properties 168 if dhcp_properties is None: 169 logging.info('Did not find IPConfig object with method == dhcp') 170 return None 171 logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties) 172 return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties) 173 174 175 def run_once(self): 176 self._server = None 177 self._server_ip = None 178 self._ethernet_pair = None 179 self._server = None 180 self._shill_proxy = shill_proxy.ShillProxy() 181 try: 182 self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair( 183 interface_ns=self._namespace, 184 peer_interface_name='pseudoethernet0', 185 peer_interface_ip=None) 186 self._ethernet_pair.setup() 187 if not self._ethernet_pair.is_healthy: 188 raise error.TestFail('Could not create virtual ethernet pair.') 189 self._server_ip = self._ethernet_pair.interface_ip 190 self._server = dhcp_test_server.DhcpTestServer( 191 interface=self._ethernet_pair.interface_name, 192 ingress_address='', 193 namespace=self._namespace) 194 self._server.start() 195 if not self._server.is_healthy: 196 raise error.TestFail('Could not start DHCP test server.') 197 self._subnet_mask = self._ethernet_pair.interface_subnet_mask 198 self.test_body() 199 except (error.TestFail, error.TestNAError): 200 # Pass these through without modification. 201 raise 202 except Exception as e: 203 logging.error('Caught exception: %s.', str(e)) 204 logging.error('Trace: %s', traceback.format_exc()) 205 raise error.TestFail('Caught exception: %s.' % str(e)) 206 finally: 207 if self._server is not None: 208 self._server.stop() 209 if self._ethernet_pair is not None: 210 self._ethernet_pair.teardown() 211 212 def test_body(self): 213 """ 214 Override this method with the body of your test. You may safely assume 215 that the the properties exposed by DhcpTestBase correctly return 216 references to the test apparatus. 217 """ 218 raise error.TestFail('No test body implemented') 219 220 @property 221 def server_ip(self): 222 """ 223 Return the IP address of the side of the interface that the DHCP test 224 server is bound to. The server itself is bound the the broadcast 225 address on the interface. 226 """ 227 return self._server_ip 228 229 @property 230 def server(self): 231 """ 232 Returns a reference to the DHCP test server. Use this to add handlers 233 and run tests. 234 """ 235 return self._server 236 237 @property 238 def ethernet_pair(self): 239 """ 240 Returns a reference to the virtual ethernet pair created to run DHCP 241 tests on. 242 """ 243 return self._ethernet_pair 244 245 @property 246 def shill_proxy(self): 247 """ 248 Returns a the shill proxy instance. 249 """ 250 return self._shill_proxy 251 252 def negotiate_and_check_lease(self, 253 dhcp_options, 254 custom_fields={}, 255 disable_check=False): 256 """ 257 Perform DHCP lease negotiation, and ensure that the resulting 258 ipconfig matches the DHCP options provided to the server. 259 260 @param dhcp_options dict of properties the DHCP server should provide. 261 @param custom_fields dict of custom DHCP parameters to add to server. 262 @param disable_check bool whether to perform IPConfig parameter 263 checking. 264 265 """ 266 if dhcp_packet.OPTION_REQUESTED_IP not in dhcp_options: 267 raise error.TestFail('You must specify OPTION_REQUESTED_IP to ' 268 'negotiate a DHCP lease') 269 intended_ip = dhcp_options[dhcp_packet.OPTION_REQUESTED_IP] 270 # Build up the handling rules for the server and start the test. 271 rules = [] 272 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 273 intended_ip, 274 self.server_ip, 275 dhcp_options, 276 custom_fields)) 277 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 278 intended_ip, 279 self.server_ip, 280 dhcp_options, 281 custom_fields)) 282 rules[-1].is_final_handler = True 283 self.server.start_test(rules, DHCP_NEGOTIATION_TIMEOUT_SECONDS) 284 logging.info('Server is negotiating new lease with options: %s', 285 dhcp_options) 286 self.server.wait_for_test_to_finish() 287 if not self.server.last_test_passed: 288 raise error.TestFail( 289 'Test failed: active rule is %s' % self.server.current_rule) 290 291 if disable_check: 292 logging.info('Skipping check of negotiated DHCP lease parameters.') 293 else: 294 self.wait_for_dhcp_propagation() 295 self.check_dhcp_config(dhcp_options) 296 297 def wait_for_dhcp_propagation(self): 298 """ 299 Wait for configuration to propagate over dbus to shill. 300 TODO(wiley) Make this event based. This is pretty sloppy. 301 """ 302 time.sleep(0.1) 303 304 def check_dhcp_config(self, dhcp_options): 305 """ 306 Compare the DHCP ipconfig with DHCP lease parameters to ensure 307 that the DUT attained the correct values. 308 309 @param dhcp_options dict of properties the DHCP server provided. 310 311 """ 312 # The config is what the interface was actually configured with, as 313 # opposed to dhcp_options, which is what the server expected it be 314 # configured with. 315 for attempt in range(IPCONFIG_POLL_COUNT): 316 dhcp_config = self.get_interface_ipconfig( 317 self.ethernet_pair.peer_interface_name) 318 if dhcp_config is not None: 319 break 320 time.sleep(IPCONFIG_POLL_PERIOD_SECONDS) 321 else: 322 raise error.TestFail('Failed to retrieve DHCP ipconfig object ' 323 'from shill.') 324 325 logging.debug('Got DHCP config: %s', str(dhcp_config)) 326 expected_address = dhcp_options.get(dhcp_packet.OPTION_REQUESTED_IP) 327 configured_address = dhcp_config.get(DHCPCD_KEY_ADDRESS) 328 if expected_address != configured_address: 329 raise error.TestFail('Interface configured with IP address not ' 330 'granted by the DHCP server after DHCP ' 331 'negotiation. Expected %s but got %s.' % 332 (expected_address, configured_address)) 333 334 # While DNS related settings only propagate to the system when the 335 # service is marked as the default service, we can still check the 336 # IP address on the interface, since that is set immediately. 337 interface_address = self.ethernet_pair.peer_interface_ip 338 if expected_address != interface_address: 339 raise error.TestFail('shill somehow knew about the proper DHCP ' 340 'assigned address: %s, but configured the ' 341 'interface with something completely ' 342 'different: %s.' % 343 (expected_address, interface_address)) 344 345 expected_dns_servers = dhcp_options.get(dhcp_packet.OPTION_DNS_SERVERS) 346 configured_dns_servers = dhcp_config.get(DHCPCD_KEY_NAMESERVERS) 347 if (expected_dns_servers is not None and 348 expected_dns_servers != configured_dns_servers): 349 raise error.TestFail('Expected to be configured with DNS server ' 350 'list %s, but was configured with %s ' 351 'instead.' % (expected_dns_servers, 352 configured_dns_servers)) 353 354 expected_domain_name = dhcp_options.get(dhcp_packet.OPTION_DOMAIN_NAME) 355 configured_domain_name = dhcp_config.get(DHCPCD_KEY_DOMAIN_NAME) 356 if (expected_domain_name is not None and 357 expected_domain_name != configured_domain_name): 358 raise error.TestFail('Expected to be configured with domain ' 359 'name %s, but got %s instead.' % 360 (expected_domain_name, configured_domain_name)) 361 362 expected_host_name = dhcp_options.get(dhcp_packet.OPTION_HOST_NAME) 363 configured_host_name = dhcp_config.get(DHCPCD_KEY_ACCEPTED_HOSTNAME) 364 if (expected_host_name is not None and 365 expected_host_name != configured_host_name): 366 raise error.TestFail('Expected to be configured with host ' 367 'name %s, but got %s instead.' % 368 (expected_host_name, configured_host_name)) 369 370 expected_search_list = dhcp_options.get( 371 dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST) 372 configured_search_list = dhcp_config.get(DHCPCD_KEY_SEARCH_DOMAIN_LIST) 373 if (expected_search_list is not None and 374 expected_search_list != configured_search_list): 375 raise error.TestFail('Expected to be configured with domain ' 376 'search list %s, but got %s instead.' % 377 (expected_search_list, configured_search_list)) 378 379 expected_routers = dhcp_options.get(dhcp_packet.OPTION_ROUTERS) 380 if (not expected_routers and 381 dhcp_options.get(dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES)): 382 classless_static_routes = dhcp_options[ 383 dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES] 384 for prefix, destination, gateway in classless_static_routes: 385 if not prefix: 386 logging.info('Using %s as the default gateway', gateway) 387 expected_routers = [ gateway ] 388 break 389 configured_router = dhcp_config.get(DHCPCD_KEY_GATEWAY) 390 if expected_routers and expected_routers[0] != configured_router: 391 raise error.TestFail('Expected to be configured with gateway %s, ' 392 'but got %s instead.' % 393 (expected_routers[0], configured_router)) 394 395 self.server.wait_for_test_to_finish() 396 if not self.server.last_test_passed: 397 raise error.TestFail('Test server didn\'t get all the messages it ' 398 'was told to expect for renewal.') 399