1#!/usr/bin/python3 2 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7from __future__ import absolute_import 8from __future__ import division 9from __future__ import print_function 10 11import logging 12from six.moves import range 13import socket 14import sys 15import time 16 17import common 18 19from autotest_lib.client.cros import dhcp_handling_rule 20from autotest_lib.client.cros import dhcp_packet 21from autotest_lib.client.cros import dhcp_test_server 22 23TEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/" 24 25TEST_CLASSLESS_STATIC_ROUTE_DATA = \ 26 b"\x12\x0a\x09\xc0\xac\x1f\x9b\x0a" \ 27 b"\x00\xc0\xa8\x00\xfe" 28 29TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED = [ 30 (18, "10.9.192.0", "172.31.155.10"), 31 (0, "0.0.0.0", "192.168.0.254") 32 ] 33 34TEST_DOMAIN_SEARCH_LIST_COMPRESSED = \ 35 b"\x03eng\x06google\x03com\x00\x09marketing\xC0\x04" 36 37TEST_DOMAIN_SEARCH_LIST_PARSED = ("eng.google.com", "marketing.google.com") 38 39# At this time, we don't support the compression allowed in the RFC. 40# This is correct and sufficient for our purposes. 41TEST_DOMAIN_SEARCH_LIST_EXPECTED = \ 42 b"\x03eng\x06google\x03com\x00\x09marketing\x06google\x03com\x00" 43 44TEST_DOMAIN_SEARCH_LIST1 = \ 45 b"w\x10\x03eng\x06google\x03com\x00" 46 47TEST_DOMAIN_SEARCH_LIST2 = \ 48 b"w\x16\x09marketing\x06google\x03com\x00" 49 50 51def bin2hex(byte_str, justification=20): 52 """ 53 Turn big hex strings into prettier strings of hex bytes. Group those hex 54 bytes into lines justification bytes long. 55 """ 56 chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str] 57 groups = [] 58 for i in range(0, len(chars), justification): 59 groups.append("".join(chars[i:i+justification])) 60 return "\n".join(groups) 61 62def test_packet_serialization(): 63 log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb") 64 binary_discovery_packet = log_file.read() 65 log_file.close() 66 discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet) 67 if not discovery_packet.is_valid: 68 return False 69 generated_string = discovery_packet.to_binary_string() 70 if generated_string is None: 71 print("Failed to generate string from packet object.") 72 return False 73 if generated_string != binary_discovery_packet: 74 print("Packets didn't match: ") 75 print("Generated: \n%s" % bin2hex(generated_string)) 76 print("Expected: \n%s" % bin2hex(binary_discovery_packet)) 77 return False 78 print("test_packet_serialization PASSED") 79 return True 80 81def test_classless_static_route_parsing(): 82 parsed_routes = dhcp_packet.ClasslessStaticRoutesOption.unpack( 83 TEST_CLASSLESS_STATIC_ROUTE_DATA) 84 if parsed_routes != TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED: 85 print("Parsed binary domain list and got %s but expected %s" % 86 (repr(parsed_routes), 87 repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED))) 88 return False 89 print("test_classless_static_route_parsing PASSED") 90 return True 91 92def test_classless_static_route_serialization(): 93 byte_string = dhcp_packet.ClasslessStaticRoutesOption.pack( 94 TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED) 95 if byte_string != TEST_CLASSLESS_STATIC_ROUTE_DATA: 96 # Turn the strings into printable hex strings on a single line. 97 pretty_actual = bin2hex(byte_string, 100) 98 pretty_expected = bin2hex(TEST_CLASSLESS_STATIC_ROUTE_DATA, 100) 99 print("Expected to serialize %s to %s but instead got %s." % 100 (repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED), pretty_expected, 101 pretty_actual)) 102 return False 103 print("test_classless_static_route_serialization PASSED") 104 return True 105 106def test_domain_search_list_parsing(): 107 parsed_domains = dhcp_packet.DomainListOption.unpack( 108 TEST_DOMAIN_SEARCH_LIST_COMPRESSED) 109 # Order matters too. 110 parsed_domains = tuple(parsed_domains) 111 if parsed_domains != TEST_DOMAIN_SEARCH_LIST_PARSED: 112 print("Parsed binary domain list and got %s but expected %s" % 113 (parsed_domains, TEST_DOMAIN_SEARCH_LIST_EXPECTED)) 114 return False 115 print("test_domain_search_list_parsing PASSED") 116 return True 117 118def test_domain_search_list_serialization(): 119 byte_string = dhcp_packet.DomainListOption.pack( 120 TEST_DOMAIN_SEARCH_LIST_PARSED) 121 if byte_string != TEST_DOMAIN_SEARCH_LIST_EXPECTED: 122 # Turn the strings into printable hex strings on a single line. 123 pretty_actual = bin2hex(byte_string, 100) 124 pretty_expected = bin2hex(TEST_DOMAIN_SEARCH_LIST_EXPECTED, 100) 125 print("Expected to serialize %s to %s but instead got %s." % 126 (TEST_DOMAIN_SEARCH_LIST_PARSED, pretty_expected, pretty_actual)) 127 return False 128 print("test_domain_search_list_serialization PASSED") 129 return True 130 131def test_broken_domain_search_list_parsing(): 132 byte_string = b'\x00' * 240 + TEST_DOMAIN_SEARCH_LIST1 + TEST_DOMAIN_SEARCH_LIST2 + b'\xff' 133 packet = dhcp_packet.DhcpPacket(byte_str=byte_string) 134 if len(packet._options) != 1: 135 print("Expected domain list of length 1") 136 return False 137 for k, v in packet._options.items(): 138 if tuple(v) != TEST_DOMAIN_SEARCH_LIST_PARSED: 139 print("Expected binary domain list and got %s but expected %s" % 140 (tuple(v), TEST_DOMAIN_SEARCH_LIST_PARSED)) 141 return False 142 print("test_broken_domain_search_list_parsing PASSED") 143 return True 144 145def receive_packet(a_socket, timeout_seconds=1.0): 146 data = None 147 start_time = time.time() 148 while data is None and start_time + timeout_seconds > time.time(): 149 try: 150 data, _ = a_socket.recvfrom(1024) 151 except socket.timeout: 152 pass # We expect many timeouts. 153 if data is None: 154 print("Timed out before we received a response from the server.") 155 return None 156 157 print("Client received a packet of length %d from the server." % len(data)) 158 packet = dhcp_packet.DhcpPacket(byte_str=data) 159 if not packet.is_valid: 160 print("Received an invalid response from DHCP server.") 161 return None 162 163 return packet 164 165def test_simple_server_exchange(server): 166 intended_ip = "127.0.0.42" 167 subnet_mask = "255.255.255.0" 168 server_ip = "127.0.0.1" 169 lease_time_seconds = 60 170 test_timeout = 3.0 171 mac_addr = b"\x01\x02\x03\x04\x05\x06" 172 # Build up our packets and have them request some default option values, 173 # like the IP we're being assigned and the address of the server assigning 174 # it. 175 discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(mac_addr) 176 discovery_message.set_option( 177 dhcp_packet.OPTION_PARAMETER_REQUEST_LIST, 178 dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT) 179 request_message = dhcp_packet.DhcpPacket.create_request_packet( 180 discovery_message.transaction_id, 181 mac_addr) 182 request_message.set_option( 183 dhcp_packet.OPTION_PARAMETER_REQUEST_LIST, 184 dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT) 185 # This is the pool of settings the DHCP server will seem to draw from to 186 # answer queries from the client. This information is written into packets 187 # through the handling rules. 188 dhcp_server_config = { 189 dhcp_packet.OPTION_SERVER_ID : server_ip, 190 dhcp_packet.OPTION_SUBNET_MASK : subnet_mask, 191 dhcp_packet.OPTION_IP_LEASE_TIME : lease_time_seconds, 192 dhcp_packet.OPTION_REQUESTED_IP : intended_ip, 193 } 194 # Build up the handling rules for the server and start the test. 195 rules = [] 196 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery( 197 intended_ip, 198 server_ip, 199 dhcp_server_config, {})) 200 rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest( 201 intended_ip, 202 server_ip, 203 dhcp_server_config, {})) 204 rules[-1].is_final_handler = True 205 server.start_test(rules, test_timeout) 206 # Because we don't want to require root permissions to run these tests, 207 # listen on the loopback device, don't broadcast, and don't use reserved 208 # ports (like the actual DHCP ports). Use 8068/8067 instead. 209 client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 210 client_socket.bind(("127.0.0.1", 8068)) 211 client_socket.settimeout(0.1) 212 client_socket.sendto(discovery_message.to_binary_string(), 213 (server_ip, 8067)) 214 215 offer_packet = receive_packet(client_socket) 216 if offer_packet is None: 217 return False 218 219 if (offer_packet.message_type != dhcp_packet.MESSAGE_TYPE_OFFER): 220 print("Type of DHCP response is not offer.") 221 return False 222 223 if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip: 224 print("Server didn't offer the IP we expected.") 225 return False 226 227 print("Offer looks good to the client, sending request.") 228 # In real tests, dhcpcd formats all the DISCOVERY and REQUEST messages. In 229 # our unit test, we have to do this ourselves. 230 request_message.set_option( 231 dhcp_packet.OPTION_SERVER_ID, 232 offer_packet.get_option(dhcp_packet.OPTION_SERVER_ID)) 233 request_message.set_option( 234 dhcp_packet.OPTION_SUBNET_MASK, 235 offer_packet.get_option(dhcp_packet.OPTION_SUBNET_MASK)) 236 request_message.set_option( 237 dhcp_packet.OPTION_IP_LEASE_TIME, 238 offer_packet.get_option(dhcp_packet.OPTION_IP_LEASE_TIME)) 239 request_message.set_option( 240 dhcp_packet.OPTION_REQUESTED_IP, 241 offer_packet.get_option(dhcp_packet.OPTION_REQUESTED_IP)) 242 # Send the REQUEST message. 243 client_socket.sendto(request_message.to_binary_string(), 244 (server_ip, 8067)) 245 ack_packet = receive_packet(client_socket) 246 if ack_packet is None: 247 return False 248 249 if (ack_packet.message_type != dhcp_packet.MESSAGE_TYPE_ACK): 250 print("Type of DHCP response is not acknowledgement.") 251 return False 252 253 if ack_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip: 254 print("Server didn't give us the IP we expected.") 255 return False 256 257 print("Waiting for the server to finish.") 258 server.wait_for_test_to_finish() 259 print("Server agrees that the test is over.") 260 if not server.last_test_passed: 261 print("Server is unhappy with the test result.") 262 return False 263 264 print("test_simple_server_exchange PASSED.") 265 return True 266 267def test_server_dialogue(): 268 server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1", 269 ingress_port=8067, 270 broadcast_address="127.0.0.1", 271 broadcast_port=8068) 272 server.start() 273 ret = False 274 if server.is_healthy: 275 ret = test_simple_server_exchange(server) 276 else: 277 print("Server isn't healthy, aborting.") 278 print("Sending server stop() signal.") 279 server.stop() 280 print("Stop signal sent.") 281 return ret 282 283def run_tests(): 284 logger = logging.getLogger("dhcp") 285 logger.setLevel(logging.DEBUG) 286 stream_handler = logging.StreamHandler() 287 stream_handler.setLevel(logging.DEBUG) 288 logger.addHandler(stream_handler) 289 retval = test_packet_serialization() 290 retval &= test_classless_static_route_parsing() 291 retval &= test_classless_static_route_serialization() 292 retval &= test_domain_search_list_parsing() 293 retval &= test_domain_search_list_serialization() 294 retval &= test_broken_domain_search_list_parsing() 295 retval &= test_server_dialogue() 296 if retval: 297 print("All tests PASSED.") 298 return 0 299 else: 300 print("Some tests FAILED") 301 return -1 302 303if __name__ == "__main__": 304 sys.exit(run_tests()) 305