1*9c5db199SXin Li#!/usr/bin/python3 2*9c5db199SXin Li 3*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be 5*9c5db199SXin Li# found in the LICENSE file. 6*9c5db199SXin Li 7*9c5db199SXin Lifrom __future__ import absolute_import 8*9c5db199SXin Lifrom __future__ import division 9*9c5db199SXin Lifrom __future__ import print_function 10*9c5db199SXin Li 11*9c5db199SXin Liimport logging 12*9c5db199SXin Lifrom six.moves import range 13*9c5db199SXin Liimport socket 14*9c5db199SXin Liimport sys 15*9c5db199SXin Liimport time 16*9c5db199SXin Li 17*9c5db199SXin Liimport common 18*9c5db199SXin Li 19*9c5db199SXin Lifrom autotest_lib.client.cros import dhcp_handling_rule 20*9c5db199SXin Lifrom autotest_lib.client.cros import dhcp_packet 21*9c5db199SXin Lifrom autotest_lib.client.cros import dhcp_test_server 22*9c5db199SXin Li 23*9c5db199SXin LiTEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/" 24*9c5db199SXin Li 25*9c5db199SXin LiTEST_CLASSLESS_STATIC_ROUTE_DATA = \ 26*9c5db199SXin Li b"\x12\x0a\x09\xc0\xac\x1f\x9b\x0a" \ 27*9c5db199SXin Li b"\x00\xc0\xa8\x00\xfe" 28*9c5db199SXin Li 29*9c5db199SXin LiTEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED = [ 30*9c5db199SXin Li (18, "10.9.192.0", "172.31.155.10"), 31*9c5db199SXin Li (0, "0.0.0.0", "192.168.0.254") 32*9c5db199SXin Li ] 33*9c5db199SXin Li 34*9c5db199SXin LiTEST_DOMAIN_SEARCH_LIST_COMPRESSED = \ 35*9c5db199SXin Li b"\x03eng\x06google\x03com\x00\x09marketing\xC0\x04" 36*9c5db199SXin Li 37*9c5db199SXin LiTEST_DOMAIN_SEARCH_LIST_PARSED = ("eng.google.com", "marketing.google.com") 38*9c5db199SXin Li 39*9c5db199SXin Li# At this time, we don't support the compression allowed in the RFC. 40*9c5db199SXin Li# This is correct and sufficient for our purposes. 41*9c5db199SXin LiTEST_DOMAIN_SEARCH_LIST_EXPECTED = \ 42*9c5db199SXin Li b"\x03eng\x06google\x03com\x00\x09marketing\x06google\x03com\x00" 43*9c5db199SXin Li 44*9c5db199SXin LiTEST_DOMAIN_SEARCH_LIST1 = \ 45*9c5db199SXin Li b"w\x10\x03eng\x06google\x03com\x00" 46*9c5db199SXin Li 47*9c5db199SXin LiTEST_DOMAIN_SEARCH_LIST2 = \ 48*9c5db199SXin Li b"w\x16\x09marketing\x06google\x03com\x00" 49*9c5db199SXin Li 50*9c5db199SXin Li 51*9c5db199SXin Lidef bin2hex(byte_str, justification=20): 52*9c5db199SXin Li """ 53*9c5db199SXin Li Turn big hex strings into prettier strings of hex bytes. Group those hex 54*9c5db199SXin Li bytes into lines justification bytes long. 55*9c5db199SXin Li """ 56*9c5db199SXin Li chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str] 57*9c5db199SXin Li groups = [] 58*9c5db199SXin Li for i in range(0, len(chars), justification): 59*9c5db199SXin Li groups.append("".join(chars[i:i+justification])) 60*9c5db199SXin Li return "\n".join(groups) 61*9c5db199SXin Li 62*9c5db199SXin Lidef test_packet_serialization(): 63*9c5db199SXin Li log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb") 64*9c5db199SXin Li binary_discovery_packet = log_file.read() 65*9c5db199SXin Li log_file.close() 66*9c5db199SXin Li discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet) 67*9c5db199SXin Li if not discovery_packet.is_valid: 68*9c5db199SXin Li return False 69*9c5db199SXin Li generated_string = discovery_packet.to_binary_string() 70*9c5db199SXin Li if generated_string is None: 71*9c5db199SXin Li print("Failed to generate string from packet object.") 72*9c5db199SXin Li return False 73*9c5db199SXin Li if generated_string != binary_discovery_packet: 74*9c5db199SXin Li print("Packets didn't match: ") 75*9c5db199SXin Li print("Generated: \n%s" % bin2hex(generated_string)) 76*9c5db199SXin Li print("Expected: \n%s" % bin2hex(binary_discovery_packet)) 77*9c5db199SXin Li return False 78*9c5db199SXin Li print("test_packet_serialization PASSED") 79*9c5db199SXin Li return True 80*9c5db199SXin Li 81*9c5db199SXin Lidef test_classless_static_route_parsing(): 82*9c5db199SXin Li parsed_routes = dhcp_packet.ClasslessStaticRoutesOption.unpack( 83*9c5db199SXin Li TEST_CLASSLESS_STATIC_ROUTE_DATA) 84*9c5db199SXin Li if parsed_routes != TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED: 85*9c5db199SXin Li print("Parsed binary domain list and got %s but expected %s" % 86*9c5db199SXin Li (repr(parsed_routes), 87*9c5db199SXin Li repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED))) 88*9c5db199SXin Li return False 89*9c5db199SXin Li print("test_classless_static_route_parsing PASSED") 90*9c5db199SXin Li return True 91*9c5db199SXin Li 92*9c5db199SXin Lidef test_classless_static_route_serialization(): 93*9c5db199SXin Li byte_string = dhcp_packet.ClasslessStaticRoutesOption.pack( 94*9c5db199SXin Li TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED) 95*9c5db199SXin Li if byte_string != TEST_CLASSLESS_STATIC_ROUTE_DATA: 96*9c5db199SXin Li # Turn the strings into printable hex strings on a single line. 97*9c5db199SXin Li pretty_actual = bin2hex(byte_string, 100) 98*9c5db199SXin Li pretty_expected = bin2hex(TEST_CLASSLESS_STATIC_ROUTE_DATA, 100) 99*9c5db199SXin Li print("Expected to serialize %s to %s but instead got %s." % 100*9c5db199SXin Li (repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED), pretty_expected, 101*9c5db199SXin Li pretty_actual)) 102*9c5db199SXin Li return False 103*9c5db199SXin Li print("test_classless_static_route_serialization PASSED") 104*9c5db199SXin Li return True 105*9c5db199SXin Li 106*9c5db199SXin Lidef test_domain_search_list_parsing(): 107*9c5db199SXin Li parsed_domains = dhcp_packet.DomainListOption.unpack( 108*9c5db199SXin Li TEST_DOMAIN_SEARCH_LIST_COMPRESSED) 109*9c5db199SXin Li # Order matters too. 110*9c5db199SXin Li parsed_domains = tuple(parsed_domains) 111*9c5db199SXin Li if parsed_domains != TEST_DOMAIN_SEARCH_LIST_PARSED: 112*9c5db199SXin Li print("Parsed binary domain list and got %s but expected %s" % 113*9c5db199SXin Li (parsed_domains, TEST_DOMAIN_SEARCH_LIST_EXPECTED)) 114*9c5db199SXin Li return False 115*9c5db199SXin Li print("test_domain_search_list_parsing PASSED") 116*9c5db199SXin Li return True 117*9c5db199SXin Li 118*9c5db199SXin Lidef test_domain_search_list_serialization(): 119*9c5db199SXin Li byte_string = dhcp_packet.DomainListOption.pack( 120*9c5db199SXin Li TEST_DOMAIN_SEARCH_LIST_PARSED) 121*9c5db199SXin Li if byte_string != TEST_DOMAIN_SEARCH_LIST_EXPECTED: 122*9c5db199SXin Li # Turn the strings into printable hex strings on a single line. 123*9c5db199SXin Li pretty_actual = bin2hex(byte_string, 100) 124*9c5db199SXin Li pretty_expected = bin2hex(TEST_DOMAIN_SEARCH_LIST_EXPECTED, 100) 125*9c5db199SXin Li print("Expected to serialize %s to %s but instead got %s." % 126*9c5db199SXin Li (TEST_DOMAIN_SEARCH_LIST_PARSED, pretty_expected, pretty_actual)) 127*9c5db199SXin Li return False 128*9c5db199SXin Li print("test_domain_search_list_serialization PASSED") 129*9c5db199SXin Li return True 130*9c5db199SXin Li 131*9c5db199SXin Lidef test_broken_domain_search_list_parsing(): 132*9c5db199SXin Li byte_string = b'\x00' * 240 + TEST_DOMAIN_SEARCH_LIST1 + TEST_DOMAIN_SEARCH_LIST2 + b'\xff' 133*9c5db199SXin Li packet = dhcp_packet.DhcpPacket(byte_str=byte_string) 134*9c5db199SXin Li if len(packet._options) != 1: 135*9c5db199SXin Li print("Expected domain list of length 1") 136*9c5db199SXin Li return False 137*9c5db199SXin Li for k, v in packet._options.items(): 138*9c5db199SXin Li if tuple(v) != TEST_DOMAIN_SEARCH_LIST_PARSED: 139*9c5db199SXin Li print("Expected binary domain list and got %s but expected %s" % 140*9c5db199SXin Li (tuple(v), TEST_DOMAIN_SEARCH_LIST_PARSED)) 141*9c5db199SXin Li return False 142*9c5db199SXin Li print("test_broken_domain_search_list_parsing PASSED") 143*9c5db199SXin Li return True 144*9c5db199SXin Li 145*9c5db199SXin Lidef receive_packet(a_socket, timeout_seconds=1.0): 146*9c5db199SXin Li data = None 147*9c5db199SXin Li start_time = time.time() 148*9c5db199SXin Li while data is None and start_time + timeout_seconds > time.time(): 149*9c5db199SXin Li try: 150*9c5db199SXin Li data, _ = a_socket.recvfrom(1024) 151*9c5db199SXin Li except socket.timeout: 152*9c5db199SXin Li pass # We expect many timeouts. 153*9c5db199SXin Li if data is None: 154*9c5db199SXin Li print("Timed out before we received a response from the server.") 155*9c5db199SXin Li return None 156*9c5db199SXin Li 157*9c5db199SXin Li print("Client received a packet of length %d from the server." % len(data)) 158*9c5db199SXin Li packet = dhcp_packet.DhcpPacket(byte_str=data) 159*9c5db199SXin Li if not packet.is_valid: 160*9c5db199SXin Li print("Received an invalid response from DHCP server.") 161*9c5db199SXin Li return None 162*9c5db199SXin Li 163*9c5db199SXin Li return packet 164*9c5db199SXin Li 165*9c5db199SXin Lidef test_simple_server_exchange(server): 166*9c5db199SXin Li intended_ip = "127.0.0.42" 167*9c5db199SXin Li subnet_mask = "255.255.255.0" 168*9c5db199SXin Li server_ip = "127.0.0.1" 169*9c5db199SXin Li lease_time_seconds = 60 170*9c5db199SXin Li test_timeout = 3.0 171*9c5db199SXin Li mac_addr = b"\x01\x02\x03\x04\x05\x06" 172*9c5db199SXin Li # Build up our packets and have them request some default option values, 173*9c5db199SXin Li # like the IP we're being assigned and the address of the server assigning 174*9c5db199SXin Li # it. 175*9c5db199SXin Li discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(mac_addr) 176*9c5db199SXin Li discovery_message.set_option( 177*9c5db199SXin Li dhcp_packet.OPTION_PARAMETER_REQUEST_LIST, 178*9c5db199SXin Li dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT) 179*9c5db199SXin Li request_message = dhcp_packet.DhcpPacket.create_request_packet( 180*9c5db199SXin Li discovery_message.transaction_id, 181*9c5db199SXin Li mac_addr) 182*9c5db199SXin Li request_message.set_option( 183*9c5db199SXin Li dhcp_packet.OPTION_PARAMETER_REQUEST_LIST, 184*9c5db199SXin Li dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT) 185*9c5db199SXin Li # This is the pool of settings the DHCP server will seem to draw from to 186*9c5db199SXin Li # answer queries from the client. This information is written into packets 187*9c5db199SXin Li # through the handling rules. 188*9c5db199SXin Li dhcp_server_config = { 189*9c5db199SXin Li dhcp_packet.OPTION_SERVER_ID : server_ip, 190*9c5db199SXin Li dhcp_packet.OPTION_SUBNET_MASK : subnet_mask, 191*9c5db199SXin Li dhcp_packet.OPTION_IP_LEASE_TIME : lease_time_seconds, 192*9c5db199SXin Li dhcp_packet.OPTION_REQUESTED_IP : intended_ip, 193*9c5db199SXin Li } 194*9c5db199SXin Li # Build up the handling rules for the server and start the test. 195*9c5db199SXin Li rules = [] 196*9c5db199SXin Li rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 197*9c5db199SXin Li intended_ip, 198*9c5db199SXin Li server_ip, 199*9c5db199SXin Li dhcp_server_config, {})) 200*9c5db199SXin Li rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 201*9c5db199SXin Li intended_ip, 202*9c5db199SXin Li server_ip, 203*9c5db199SXin Li dhcp_server_config, {})) 204*9c5db199SXin Li rules[-1].is_final_handler = True 205*9c5db199SXin Li server.start_test(rules, test_timeout) 206*9c5db199SXin Li # Because we don't want to require root permissions to run these tests, 207*9c5db199SXin Li # listen on the loopback device, don't broadcast, and don't use reserved 208*9c5db199SXin Li # ports (like the actual DHCP ports). Use 8068/8067 instead. 209*9c5db199SXin Li client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 210*9c5db199SXin Li client_socket.bind(("127.0.0.1", 8068)) 211*9c5db199SXin Li client_socket.settimeout(0.1) 212*9c5db199SXin Li client_socket.sendto(discovery_message.to_binary_string(), 213*9c5db199SXin Li (server_ip, 8067)) 214*9c5db199SXin Li 215*9c5db199SXin Li offer_packet = receive_packet(client_socket) 216*9c5db199SXin Li if offer_packet is None: 217*9c5db199SXin Li return False 218*9c5db199SXin Li 219*9c5db199SXin Li if (offer_packet.message_type != dhcp_packet.MESSAGE_TYPE_OFFER): 220*9c5db199SXin Li print("Type of DHCP response is not offer.") 221*9c5db199SXin Li return False 222*9c5db199SXin Li 223*9c5db199SXin Li if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip: 224*9c5db199SXin Li print("Server didn't offer the IP we expected.") 225*9c5db199SXin Li return False 226*9c5db199SXin Li 227*9c5db199SXin Li print("Offer looks good to the client, sending request.") 228*9c5db199SXin Li # In real tests, dhcpcd formats all the DISCOVERY and REQUEST messages. In 229*9c5db199SXin Li # our unit test, we have to do this ourselves. 230*9c5db199SXin Li request_message.set_option( 231*9c5db199SXin Li dhcp_packet.OPTION_SERVER_ID, 232*9c5db199SXin Li offer_packet.get_option(dhcp_packet.OPTION_SERVER_ID)) 233*9c5db199SXin Li request_message.set_option( 234*9c5db199SXin Li dhcp_packet.OPTION_SUBNET_MASK, 235*9c5db199SXin Li offer_packet.get_option(dhcp_packet.OPTION_SUBNET_MASK)) 236*9c5db199SXin Li request_message.set_option( 237*9c5db199SXin Li dhcp_packet.OPTION_IP_LEASE_TIME, 238*9c5db199SXin Li offer_packet.get_option(dhcp_packet.OPTION_IP_LEASE_TIME)) 239*9c5db199SXin Li request_message.set_option( 240*9c5db199SXin Li dhcp_packet.OPTION_REQUESTED_IP, 241*9c5db199SXin Li offer_packet.get_option(dhcp_packet.OPTION_REQUESTED_IP)) 242*9c5db199SXin Li # Send the REQUEST message. 243*9c5db199SXin Li client_socket.sendto(request_message.to_binary_string(), 244*9c5db199SXin Li (server_ip, 8067)) 245*9c5db199SXin Li ack_packet = receive_packet(client_socket) 246*9c5db199SXin Li if ack_packet is None: 247*9c5db199SXin Li return False 248*9c5db199SXin Li 249*9c5db199SXin Li if (ack_packet.message_type != dhcp_packet.MESSAGE_TYPE_ACK): 250*9c5db199SXin Li print("Type of DHCP response is not acknowledgement.") 251*9c5db199SXin Li return False 252*9c5db199SXin Li 253*9c5db199SXin Li if ack_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip: 254*9c5db199SXin Li print("Server didn't give us the IP we expected.") 255*9c5db199SXin Li return False 256*9c5db199SXin Li 257*9c5db199SXin Li print("Waiting for the server to finish.") 258*9c5db199SXin Li server.wait_for_test_to_finish() 259*9c5db199SXin Li print("Server agrees that the test is over.") 260*9c5db199SXin Li if not server.last_test_passed: 261*9c5db199SXin Li print("Server is unhappy with the test result.") 262*9c5db199SXin Li return False 263*9c5db199SXin Li 264*9c5db199SXin Li print("test_simple_server_exchange PASSED.") 265*9c5db199SXin Li return True 266*9c5db199SXin Li 267*9c5db199SXin Lidef test_server_dialogue(): 268*9c5db199SXin Li server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1", 269*9c5db199SXin Li ingress_port=8067, 270*9c5db199SXin Li broadcast_address="127.0.0.1", 271*9c5db199SXin Li broadcast_port=8068) 272*9c5db199SXin Li server.start() 273*9c5db199SXin Li ret = False 274*9c5db199SXin Li if server.is_healthy: 275*9c5db199SXin Li ret = test_simple_server_exchange(server) 276*9c5db199SXin Li else: 277*9c5db199SXin Li print("Server isn't healthy, aborting.") 278*9c5db199SXin Li print("Sending server stop() signal.") 279*9c5db199SXin Li server.stop() 280*9c5db199SXin Li print("Stop signal sent.") 281*9c5db199SXin Li return ret 282*9c5db199SXin Li 283*9c5db199SXin Lidef run_tests(): 284*9c5db199SXin Li logger = logging.getLogger("dhcp") 285*9c5db199SXin Li logger.setLevel(logging.DEBUG) 286*9c5db199SXin Li stream_handler = logging.StreamHandler() 287*9c5db199SXin Li stream_handler.setLevel(logging.DEBUG) 288*9c5db199SXin Li logger.addHandler(stream_handler) 289*9c5db199SXin Li retval = test_packet_serialization() 290*9c5db199SXin Li retval &= test_classless_static_route_parsing() 291*9c5db199SXin Li retval &= test_classless_static_route_serialization() 292*9c5db199SXin Li retval &= test_domain_search_list_parsing() 293*9c5db199SXin Li retval &= test_domain_search_list_serialization() 294*9c5db199SXin Li retval &= test_broken_domain_search_list_parsing() 295*9c5db199SXin Li retval &= test_server_dialogue() 296*9c5db199SXin Li if retval: 297*9c5db199SXin Li print("All tests PASSED.") 298*9c5db199SXin Li return 0 299*9c5db199SXin Li else: 300*9c5db199SXin Li print("Some tests FAILED") 301*9c5db199SXin Li return -1 302*9c5db199SXin Li 303*9c5db199SXin Liif __name__ == "__main__": 304*9c5db199SXin Li sys.exit(run_tests()) 305