xref: /aosp_15_r20/external/autotest/client/cros/dhcp_unittest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li#!/usr/bin/python3
2*9c5db199SXin Li
3*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
5*9c5db199SXin Li# found in the LICENSE file.
6*9c5db199SXin Li
7*9c5db199SXin Lifrom __future__ import absolute_import
8*9c5db199SXin Lifrom __future__ import division
9*9c5db199SXin Lifrom __future__ import print_function
10*9c5db199SXin Li
11*9c5db199SXin Liimport logging
12*9c5db199SXin Lifrom six.moves import range
13*9c5db199SXin Liimport socket
14*9c5db199SXin Liimport sys
15*9c5db199SXin Liimport time
16*9c5db199SXin Li
17*9c5db199SXin Liimport common
18*9c5db199SXin Li
19*9c5db199SXin Lifrom autotest_lib.client.cros import dhcp_handling_rule
20*9c5db199SXin Lifrom autotest_lib.client.cros import dhcp_packet
21*9c5db199SXin Lifrom autotest_lib.client.cros import dhcp_test_server
22*9c5db199SXin Li
23*9c5db199SXin LiTEST_DATA_PATH_PREFIX = "client/cros/dhcp_test_data/"
24*9c5db199SXin Li
25*9c5db199SXin LiTEST_CLASSLESS_STATIC_ROUTE_DATA = \
26*9c5db199SXin Li        b"\x12\x0a\x09\xc0\xac\x1f\x9b\x0a" \
27*9c5db199SXin Li        b"\x00\xc0\xa8\x00\xfe"
28*9c5db199SXin Li
29*9c5db199SXin LiTEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED = [
30*9c5db199SXin Li        (18, "10.9.192.0", "172.31.155.10"),
31*9c5db199SXin Li        (0, "0.0.0.0", "192.168.0.254")
32*9c5db199SXin Li        ]
33*9c5db199SXin Li
34*9c5db199SXin LiTEST_DOMAIN_SEARCH_LIST_COMPRESSED = \
35*9c5db199SXin Li        b"\x03eng\x06google\x03com\x00\x09marketing\xC0\x04"
36*9c5db199SXin Li
37*9c5db199SXin LiTEST_DOMAIN_SEARCH_LIST_PARSED = ("eng.google.com", "marketing.google.com")
38*9c5db199SXin Li
39*9c5db199SXin Li# At this time, we don't support the compression allowed in the RFC.
40*9c5db199SXin Li# This is correct and sufficient for our purposes.
41*9c5db199SXin LiTEST_DOMAIN_SEARCH_LIST_EXPECTED = \
42*9c5db199SXin Li        b"\x03eng\x06google\x03com\x00\x09marketing\x06google\x03com\x00"
43*9c5db199SXin Li
44*9c5db199SXin LiTEST_DOMAIN_SEARCH_LIST1 = \
45*9c5db199SXin Li        b"w\x10\x03eng\x06google\x03com\x00"
46*9c5db199SXin Li
47*9c5db199SXin LiTEST_DOMAIN_SEARCH_LIST2 = \
48*9c5db199SXin Li        b"w\x16\x09marketing\x06google\x03com\x00"
49*9c5db199SXin Li
50*9c5db199SXin Li
51*9c5db199SXin Lidef bin2hex(byte_str, justification=20):
52*9c5db199SXin Li    """
53*9c5db199SXin Li    Turn big hex strings into prettier strings of hex bytes.  Group those hex
54*9c5db199SXin Li    bytes into lines justification bytes long.
55*9c5db199SXin Li    """
56*9c5db199SXin Li    chars = ["x" + (hex(ord(c))[2:].zfill(2)) for c in byte_str]
57*9c5db199SXin Li    groups = []
58*9c5db199SXin Li    for i in range(0, len(chars), justification):
59*9c5db199SXin Li        groups.append("".join(chars[i:i+justification]))
60*9c5db199SXin Li    return "\n".join(groups)
61*9c5db199SXin Li
62*9c5db199SXin Lidef test_packet_serialization():
63*9c5db199SXin Li    log_file = open(TEST_DATA_PATH_PREFIX + "dhcp_discovery.log", "rb")
64*9c5db199SXin Li    binary_discovery_packet = log_file.read()
65*9c5db199SXin Li    log_file.close()
66*9c5db199SXin Li    discovery_packet = dhcp_packet.DhcpPacket(byte_str=binary_discovery_packet)
67*9c5db199SXin Li    if not discovery_packet.is_valid:
68*9c5db199SXin Li        return False
69*9c5db199SXin Li    generated_string = discovery_packet.to_binary_string()
70*9c5db199SXin Li    if generated_string is None:
71*9c5db199SXin Li        print("Failed to generate string from packet object.")
72*9c5db199SXin Li        return False
73*9c5db199SXin Li    if generated_string != binary_discovery_packet:
74*9c5db199SXin Li        print("Packets didn't match: ")
75*9c5db199SXin Li        print("Generated: \n%s" % bin2hex(generated_string))
76*9c5db199SXin Li        print("Expected: \n%s" % bin2hex(binary_discovery_packet))
77*9c5db199SXin Li        return False
78*9c5db199SXin Li    print("test_packet_serialization PASSED")
79*9c5db199SXin Li    return True
80*9c5db199SXin Li
81*9c5db199SXin Lidef test_classless_static_route_parsing():
82*9c5db199SXin Li    parsed_routes = dhcp_packet.ClasslessStaticRoutesOption.unpack(
83*9c5db199SXin Li            TEST_CLASSLESS_STATIC_ROUTE_DATA)
84*9c5db199SXin Li    if parsed_routes != TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED:
85*9c5db199SXin Li        print("Parsed binary domain list and got %s but expected %s" %
86*9c5db199SXin Li               (repr(parsed_routes),
87*9c5db199SXin Li                repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED)))
88*9c5db199SXin Li        return False
89*9c5db199SXin Li    print("test_classless_static_route_parsing PASSED")
90*9c5db199SXin Li    return True
91*9c5db199SXin Li
92*9c5db199SXin Lidef test_classless_static_route_serialization():
93*9c5db199SXin Li    byte_string = dhcp_packet.ClasslessStaticRoutesOption.pack(
94*9c5db199SXin Li            TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED)
95*9c5db199SXin Li    if byte_string != TEST_CLASSLESS_STATIC_ROUTE_DATA:
96*9c5db199SXin Li        # Turn the strings into printable hex strings on a single line.
97*9c5db199SXin Li        pretty_actual = bin2hex(byte_string, 100)
98*9c5db199SXin Li        pretty_expected = bin2hex(TEST_CLASSLESS_STATIC_ROUTE_DATA, 100)
99*9c5db199SXin Li        print("Expected to serialize %s to %s but instead got %s." %
100*9c5db199SXin Li               (repr(TEST_CLASSLESS_STATIC_ROUTE_LIST_PARSED), pretty_expected,
101*9c5db199SXin Li                     pretty_actual))
102*9c5db199SXin Li        return False
103*9c5db199SXin Li    print("test_classless_static_route_serialization PASSED")
104*9c5db199SXin Li    return True
105*9c5db199SXin Li
106*9c5db199SXin Lidef test_domain_search_list_parsing():
107*9c5db199SXin Li    parsed_domains = dhcp_packet.DomainListOption.unpack(
108*9c5db199SXin Li            TEST_DOMAIN_SEARCH_LIST_COMPRESSED)
109*9c5db199SXin Li    # Order matters too.
110*9c5db199SXin Li    parsed_domains = tuple(parsed_domains)
111*9c5db199SXin Li    if parsed_domains != TEST_DOMAIN_SEARCH_LIST_PARSED:
112*9c5db199SXin Li        print("Parsed binary domain list and got %s but expected %s" %
113*9c5db199SXin Li               (parsed_domains, TEST_DOMAIN_SEARCH_LIST_EXPECTED))
114*9c5db199SXin Li        return False
115*9c5db199SXin Li    print("test_domain_search_list_parsing PASSED")
116*9c5db199SXin Li    return True
117*9c5db199SXin Li
118*9c5db199SXin Lidef test_domain_search_list_serialization():
119*9c5db199SXin Li    byte_string = dhcp_packet.DomainListOption.pack(
120*9c5db199SXin Li            TEST_DOMAIN_SEARCH_LIST_PARSED)
121*9c5db199SXin Li    if byte_string != TEST_DOMAIN_SEARCH_LIST_EXPECTED:
122*9c5db199SXin Li        # Turn the strings into printable hex strings on a single line.
123*9c5db199SXin Li        pretty_actual = bin2hex(byte_string, 100)
124*9c5db199SXin Li        pretty_expected = bin2hex(TEST_DOMAIN_SEARCH_LIST_EXPECTED, 100)
125*9c5db199SXin Li        print("Expected to serialize %s to %s but instead got %s." %
126*9c5db199SXin Li               (TEST_DOMAIN_SEARCH_LIST_PARSED, pretty_expected, pretty_actual))
127*9c5db199SXin Li        return False
128*9c5db199SXin Li    print("test_domain_search_list_serialization PASSED")
129*9c5db199SXin Li    return True
130*9c5db199SXin Li
131*9c5db199SXin Lidef test_broken_domain_search_list_parsing():
132*9c5db199SXin Li    byte_string = b'\x00' * 240 + TEST_DOMAIN_SEARCH_LIST1 + TEST_DOMAIN_SEARCH_LIST2 + b'\xff'
133*9c5db199SXin Li    packet = dhcp_packet.DhcpPacket(byte_str=byte_string)
134*9c5db199SXin Li    if len(packet._options) != 1:
135*9c5db199SXin Li        print("Expected domain list of length 1")
136*9c5db199SXin Li        return False
137*9c5db199SXin Li    for k, v in packet._options.items():
138*9c5db199SXin Li        if tuple(v) != TEST_DOMAIN_SEARCH_LIST_PARSED:
139*9c5db199SXin Li            print("Expected binary domain list and got %s but expected %s" %
140*9c5db199SXin Li                    (tuple(v), TEST_DOMAIN_SEARCH_LIST_PARSED))
141*9c5db199SXin Li            return False
142*9c5db199SXin Li    print("test_broken_domain_search_list_parsing PASSED")
143*9c5db199SXin Li    return True
144*9c5db199SXin Li
145*9c5db199SXin Lidef receive_packet(a_socket, timeout_seconds=1.0):
146*9c5db199SXin Li    data = None
147*9c5db199SXin Li    start_time = time.time()
148*9c5db199SXin Li    while data is None and start_time + timeout_seconds > time.time():
149*9c5db199SXin Li        try:
150*9c5db199SXin Li            data, _ = a_socket.recvfrom(1024)
151*9c5db199SXin Li        except socket.timeout:
152*9c5db199SXin Li            pass # We expect many timeouts.
153*9c5db199SXin Li    if data is None:
154*9c5db199SXin Li        print("Timed out before we received a response from the server.")
155*9c5db199SXin Li        return None
156*9c5db199SXin Li
157*9c5db199SXin Li    print("Client received a packet of length %d from the server." % len(data))
158*9c5db199SXin Li    packet = dhcp_packet.DhcpPacket(byte_str=data)
159*9c5db199SXin Li    if not packet.is_valid:
160*9c5db199SXin Li        print("Received an invalid response from DHCP server.")
161*9c5db199SXin Li        return None
162*9c5db199SXin Li
163*9c5db199SXin Li    return packet
164*9c5db199SXin Li
165*9c5db199SXin Lidef test_simple_server_exchange(server):
166*9c5db199SXin Li    intended_ip = "127.0.0.42"
167*9c5db199SXin Li    subnet_mask = "255.255.255.0"
168*9c5db199SXin Li    server_ip = "127.0.0.1"
169*9c5db199SXin Li    lease_time_seconds = 60
170*9c5db199SXin Li    test_timeout = 3.0
171*9c5db199SXin Li    mac_addr = b"\x01\x02\x03\x04\x05\x06"
172*9c5db199SXin Li    # Build up our packets and have them request some default option values,
173*9c5db199SXin Li    # like the IP we're being assigned and the address of the server assigning
174*9c5db199SXin Li    # it.
175*9c5db199SXin Li    discovery_message = dhcp_packet.DhcpPacket.create_discovery_packet(mac_addr)
176*9c5db199SXin Li    discovery_message.set_option(
177*9c5db199SXin Li            dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
178*9c5db199SXin Li            dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
179*9c5db199SXin Li    request_message = dhcp_packet.DhcpPacket.create_request_packet(
180*9c5db199SXin Li            discovery_message.transaction_id,
181*9c5db199SXin Li            mac_addr)
182*9c5db199SXin Li    request_message.set_option(
183*9c5db199SXin Li            dhcp_packet.OPTION_PARAMETER_REQUEST_LIST,
184*9c5db199SXin Li            dhcp_packet.OPTION_VALUE_PARAMETER_REQUEST_LIST_DEFAULT)
185*9c5db199SXin Li    # This is the pool of settings the DHCP server will seem to draw from to
186*9c5db199SXin Li    # answer queries from the client.  This information is written into packets
187*9c5db199SXin Li    # through the handling rules.
188*9c5db199SXin Li    dhcp_server_config = {
189*9c5db199SXin Li            dhcp_packet.OPTION_SERVER_ID : server_ip,
190*9c5db199SXin Li            dhcp_packet.OPTION_SUBNET_MASK : subnet_mask,
191*9c5db199SXin Li            dhcp_packet.OPTION_IP_LEASE_TIME : lease_time_seconds,
192*9c5db199SXin Li            dhcp_packet.OPTION_REQUESTED_IP : intended_ip,
193*9c5db199SXin Li            }
194*9c5db199SXin Li    # Build up the handling rules for the server and start the test.
195*9c5db199SXin Li    rules = []
196*9c5db199SXin Li    rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
197*9c5db199SXin Li            intended_ip,
198*9c5db199SXin Li            server_ip,
199*9c5db199SXin Li            dhcp_server_config, {}))
200*9c5db199SXin Li    rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
201*9c5db199SXin Li            intended_ip,
202*9c5db199SXin Li            server_ip,
203*9c5db199SXin Li            dhcp_server_config, {}))
204*9c5db199SXin Li    rules[-1].is_final_handler = True
205*9c5db199SXin Li    server.start_test(rules, test_timeout)
206*9c5db199SXin Li    # Because we don't want to require root permissions to run these tests,
207*9c5db199SXin Li    # listen on the loopback device, don't broadcast, and don't use reserved
208*9c5db199SXin Li    # ports (like the actual DHCP ports).  Use 8068/8067 instead.
209*9c5db199SXin Li    client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
210*9c5db199SXin Li    client_socket.bind(("127.0.0.1", 8068))
211*9c5db199SXin Li    client_socket.settimeout(0.1)
212*9c5db199SXin Li    client_socket.sendto(discovery_message.to_binary_string(),
213*9c5db199SXin Li                         (server_ip, 8067))
214*9c5db199SXin Li
215*9c5db199SXin Li    offer_packet = receive_packet(client_socket)
216*9c5db199SXin Li    if offer_packet is None:
217*9c5db199SXin Li        return False
218*9c5db199SXin Li
219*9c5db199SXin Li    if (offer_packet.message_type != dhcp_packet.MESSAGE_TYPE_OFFER):
220*9c5db199SXin Li        print("Type of DHCP response is not offer.")
221*9c5db199SXin Li        return False
222*9c5db199SXin Li
223*9c5db199SXin Li    if offer_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
224*9c5db199SXin Li        print("Server didn't offer the IP we expected.")
225*9c5db199SXin Li        return False
226*9c5db199SXin Li
227*9c5db199SXin Li    print("Offer looks good to the client, sending request.")
228*9c5db199SXin Li    # In real tests, dhcpcd formats all the DISCOVERY and REQUEST messages.  In
229*9c5db199SXin Li    # our unit test, we have to do this ourselves.
230*9c5db199SXin Li    request_message.set_option(
231*9c5db199SXin Li            dhcp_packet.OPTION_SERVER_ID,
232*9c5db199SXin Li            offer_packet.get_option(dhcp_packet.OPTION_SERVER_ID))
233*9c5db199SXin Li    request_message.set_option(
234*9c5db199SXin Li            dhcp_packet.OPTION_SUBNET_MASK,
235*9c5db199SXin Li            offer_packet.get_option(dhcp_packet.OPTION_SUBNET_MASK))
236*9c5db199SXin Li    request_message.set_option(
237*9c5db199SXin Li            dhcp_packet.OPTION_IP_LEASE_TIME,
238*9c5db199SXin Li            offer_packet.get_option(dhcp_packet.OPTION_IP_LEASE_TIME))
239*9c5db199SXin Li    request_message.set_option(
240*9c5db199SXin Li            dhcp_packet.OPTION_REQUESTED_IP,
241*9c5db199SXin Li            offer_packet.get_option(dhcp_packet.OPTION_REQUESTED_IP))
242*9c5db199SXin Li    # Send the REQUEST message.
243*9c5db199SXin Li    client_socket.sendto(request_message.to_binary_string(),
244*9c5db199SXin Li                         (server_ip, 8067))
245*9c5db199SXin Li    ack_packet = receive_packet(client_socket)
246*9c5db199SXin Li    if ack_packet is None:
247*9c5db199SXin Li        return False
248*9c5db199SXin Li
249*9c5db199SXin Li    if (ack_packet.message_type != dhcp_packet.MESSAGE_TYPE_ACK):
250*9c5db199SXin Li        print("Type of DHCP response is not acknowledgement.")
251*9c5db199SXin Li        return False
252*9c5db199SXin Li
253*9c5db199SXin Li    if ack_packet.get_field(dhcp_packet.FIELD_YOUR_IP) != intended_ip:
254*9c5db199SXin Li        print("Server didn't give us the IP we expected.")
255*9c5db199SXin Li        return False
256*9c5db199SXin Li
257*9c5db199SXin Li    print("Waiting for the server to finish.")
258*9c5db199SXin Li    server.wait_for_test_to_finish()
259*9c5db199SXin Li    print("Server agrees that the test is over.")
260*9c5db199SXin Li    if not server.last_test_passed:
261*9c5db199SXin Li        print("Server is unhappy with the test result.")
262*9c5db199SXin Li        return False
263*9c5db199SXin Li
264*9c5db199SXin Li    print("test_simple_server_exchange PASSED.")
265*9c5db199SXin Li    return True
266*9c5db199SXin Li
267*9c5db199SXin Lidef test_server_dialogue():
268*9c5db199SXin Li    server = dhcp_test_server.DhcpTestServer(ingress_address="127.0.0.1",
269*9c5db199SXin Li                                             ingress_port=8067,
270*9c5db199SXin Li                                             broadcast_address="127.0.0.1",
271*9c5db199SXin Li                                             broadcast_port=8068)
272*9c5db199SXin Li    server.start()
273*9c5db199SXin Li    ret = False
274*9c5db199SXin Li    if server.is_healthy:
275*9c5db199SXin Li        ret = test_simple_server_exchange(server)
276*9c5db199SXin Li    else:
277*9c5db199SXin Li        print("Server isn't healthy, aborting.")
278*9c5db199SXin Li    print("Sending server stop() signal.")
279*9c5db199SXin Li    server.stop()
280*9c5db199SXin Li    print("Stop signal sent.")
281*9c5db199SXin Li    return ret
282*9c5db199SXin Li
283*9c5db199SXin Lidef run_tests():
284*9c5db199SXin Li    logger = logging.getLogger("dhcp")
285*9c5db199SXin Li    logger.setLevel(logging.DEBUG)
286*9c5db199SXin Li    stream_handler = logging.StreamHandler()
287*9c5db199SXin Li    stream_handler.setLevel(logging.DEBUG)
288*9c5db199SXin Li    logger.addHandler(stream_handler)
289*9c5db199SXin Li    retval = test_packet_serialization()
290*9c5db199SXin Li    retval &= test_classless_static_route_parsing()
291*9c5db199SXin Li    retval &= test_classless_static_route_serialization()
292*9c5db199SXin Li    retval &= test_domain_search_list_parsing()
293*9c5db199SXin Li    retval &= test_domain_search_list_serialization()
294*9c5db199SXin Li    retval &= test_broken_domain_search_list_parsing()
295*9c5db199SXin Li    retval &= test_server_dialogue()
296*9c5db199SXin Li    if retval:
297*9c5db199SXin Li        print("All tests PASSED.")
298*9c5db199SXin Li        return 0
299*9c5db199SXin Li    else:
300*9c5db199SXin Li        print("Some tests FAILED")
301*9c5db199SXin Li        return -1
302*9c5db199SXin Li
303*9c5db199SXin Liif __name__ == "__main__":
304*9c5db199SXin Li    sys.exit(run_tests())
305