1*9c5db199SXin Li#!/usr/bin/python3 2*9c5db199SXin Li# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be 4*9c5db199SXin Li# found in the LICENSE file. 5*9c5db199SXin Li 6*9c5db199SXin Liimport unittest 7*9c5db199SXin Li 8*9c5db199SXin Liimport dpkt 9*9c5db199SXin Liimport socket 10*9c5db199SXin Li 11*9c5db199SXin Liimport common 12*9c5db199SXin Li 13*9c5db199SXin Lifrom autotest_lib.client.cros.netprotos import fake_host 14*9c5db199SXin Lifrom autotest_lib.client.cros.netprotos import zeroconf 15*9c5db199SXin Li 16*9c5db199SXin Li 17*9c5db199SXin LiFAKE_HOSTNAME = 'fakehost1' 18*9c5db199SXin Li 19*9c5db199SXin LiFAKE_IPADDR = '192.168.11.22' 20*9c5db199SXin Li 21*9c5db199SXin Li 22*9c5db199SXin Liclass TestZeroconfDaemon(unittest.TestCase): 23*9c5db199SXin Li """Test class for ZeroconfDaemon.""" 24*9c5db199SXin Li 25*9c5db199SXin Li def setUp(self): 26*9c5db199SXin Li self._host = fake_host.FakeHost(FAKE_IPADDR) 27*9c5db199SXin Li self._zero = zeroconf.ZeroconfDaemon(self._host, FAKE_HOSTNAME) 28*9c5db199SXin Li 29*9c5db199SXin Li 30*9c5db199SXin Li def _query_A(self, name): 31*9c5db199SXin Li """Returns the list of A records matching the given name. 32*9c5db199SXin Li 33*9c5db199SXin Li @param name: A domain name. 34*9c5db199SXin Li @return a list of dpkt.dns.DNS.RR objects, one for each matching record. 35*9c5db199SXin Li """ 36*9c5db199SXin Li q = dpkt.dns.DNS.Q(name=name, type=dpkt.dns.DNS_A) 37*9c5db199SXin Li return self._zero._process_A(q) 38*9c5db199SXin Li 39*9c5db199SXin Li 40*9c5db199SXin Li def testRegisterService(self): 41*9c5db199SXin Li """Tests that we get appropriate records after registering a service.""" 42*9c5db199SXin Li SERVICE_PORT = 9 43*9c5db199SXin Li SERVICE_TXT_LIST = ['lies=lies'] 44*9c5db199SXin Li self._zero.register_service('unique_prefix', '_service_type', 45*9c5db199SXin Li '_tcp', SERVICE_PORT, SERVICE_TXT_LIST) 46*9c5db199SXin Li name = '_service_type._tcp.local' 47*9c5db199SXin Li fq_name = 'unique_prefix.' + name 48*9c5db199SXin Li # Issue SRV, PTR, and TXT queries 49*9c5db199SXin Li q_srv = dpkt.dns.DNS.Q(name=fq_name, type=dpkt.dns.DNS_SRV) 50*9c5db199SXin Li q_txt = dpkt.dns.DNS.Q(name=fq_name, type=dpkt.dns.DNS_TXT) 51*9c5db199SXin Li q_ptr = dpkt.dns.DNS.Q(name=name, type=dpkt.dns.DNS_PTR) 52*9c5db199SXin Li ptr_responses = self._zero._process_PTR(q_ptr) 53*9c5db199SXin Li srv_responses = self._zero._process_SRV(q_srv) 54*9c5db199SXin Li txt_responses = self._zero._process_TXT(q_txt) 55*9c5db199SXin Li self.assertTrue(ptr_responses) 56*9c5db199SXin Li self.assertTrue(srv_responses) 57*9c5db199SXin Li self.assertTrue(txt_responses) 58*9c5db199SXin Li ptr_resp = ptr_responses[0] 59*9c5db199SXin Li srv_resp = [resp for resp in srv_responses 60*9c5db199SXin Li if resp.type == dpkt.dns.DNS_SRV][0] 61*9c5db199SXin Li txt_resp = txt_responses[0] 62*9c5db199SXin Li # Check that basic things are right. 63*9c5db199SXin Li self.assertEqual(fq_name, ptr_resp.ptrname) 64*9c5db199SXin Li self.assertEqual(FAKE_HOSTNAME + '.' + self._zero.domain, 65*9c5db199SXin Li srv_resp.srvname) 66*9c5db199SXin Li self.assertEqual(SERVICE_PORT, srv_resp.port) 67*9c5db199SXin Li self.assertEqual(SERVICE_TXT_LIST, txt_resp.text) 68*9c5db199SXin Li 69*9c5db199SXin Li 70*9c5db199SXin Li def testProperties(self): 71*9c5db199SXin Li """Test the initial properties set by the constructor.""" 72*9c5db199SXin Li self.assertEqual(self._zero.host, self._host) 73*9c5db199SXin Li self.assertEqual(self._zero.hostname, FAKE_HOSTNAME) 74*9c5db199SXin Li self.assertEqual(self._zero.domain, 'local') # Default domain 75*9c5db199SXin Li self.assertEqual(self._zero.full_hostname, FAKE_HOSTNAME + '.local') 76*9c5db199SXin Li 77*9c5db199SXin Li 78*9c5db199SXin Li def testSocketInit(self): 79*9c5db199SXin Li """Test that the constructor listens for mDNS traffic.""" 80*9c5db199SXin Li 81*9c5db199SXin Li # Should create an UDP socket and bind it to the mDNS address and port. 82*9c5db199SXin Li self.assertEqual(len(self._host._sockets), 1) 83*9c5db199SXin Li sock = self._host._sockets[0] 84*9c5db199SXin Li 85*9c5db199SXin Li self.assertEqual(sock._family, socket.AF_INET) # IPv4 86*9c5db199SXin Li self.assertEqual(sock._sock_type, socket.SOCK_DGRAM) # UDP 87*9c5db199SXin Li 88*9c5db199SXin Li # Check it is listening for UDP packets on the mDNS address and port. 89*9c5db199SXin Li self.assertTrue(sock._bound) 90*9c5db199SXin Li self.assertEqual(sock._bind_ip_addr, '224.0.0.251') # mDNS address 91*9c5db199SXin Li self.assertEqual(sock._bind_port, 5353) # mDNS port 92*9c5db199SXin Li self.assertTrue(callable(sock._bind_recv_callback)) 93*9c5db199SXin Li 94*9c5db199SXin Li 95*9c5db199SXin Li def testRecordsInit(self): 96*9c5db199SXin Li """Test the A record of the host is registered.""" 97*9c5db199SXin Li host_A = self._query_A(self._zero.full_hostname) 98*9c5db199SXin Li self.assertGreater(len(host_A), 0) 99*9c5db199SXin Li 100*9c5db199SXin Li record = host_A[0] 101*9c5db199SXin Li # Check the hostname and the packed IP address. 102*9c5db199SXin Li self.assertEqual(record.name, self._zero.full_hostname) 103*9c5db199SXin Li self.assertEqual(record.ip, socket.inet_aton(self._host.ip_addr)) 104*9c5db199SXin Li 105*9c5db199SXin Li 106*9c5db199SXin Li def testDoubleTXTProcessing(self): 107*9c5db199SXin Li """Test when more than one TXT record is present in a packet. 108*9c5db199SXin Li 109*9c5db199SXin Li A mDNS packet can include several answer records for several domains and 110*9c5db199SXin Li record type. A corner case found on the field presents a mDNS packet 111*9c5db199SXin Li with two TXT records for the same domain name on the same packet on its 112*9c5db199SXin Li authoritative answers section while the packet itself is a query. 113*9c5db199SXin Li """ 114*9c5db199SXin Li # Build the mDNS packet with two TXT records. 115*9c5db199SXin Li domain_name = 'other_host.local' 116*9c5db199SXin Li answers = [ 117*9c5db199SXin Li dpkt.dns.DNS.RR(type=dpkt.dns.DNS_TXT, 118*9c5db199SXin Li cls=dpkt.dns.DNS_IN, 119*9c5db199SXin Li ttl=120, 120*9c5db199SXin Li name=domain_name, 121*9c5db199SXin Li text=['one'.encode(), 'two'.encode()]), 122*9c5db199SXin Li dpkt.dns.DNS.RR(type=dpkt.dns.DNS_TXT, 123*9c5db199SXin Li cls=dpkt.dns.DNS_IN, 124*9c5db199SXin Li ttl=120, 125*9c5db199SXin Li name=domain_name, 126*9c5db199SXin Li text=['two'.encode()]) 127*9c5db199SXin Li ] 128*9c5db199SXin Li # The packet is a query packet, with extra answers on the autoritative 129*9c5db199SXin Li # section. 130*9c5db199SXin Li mdns = dpkt.dns.DNS( 131*9c5db199SXin Li op = dpkt.dns.DNS_QUERY, # Standard query 132*9c5db199SXin Li rcode = dpkt.dns.DNS_RCODE_NOERR, 133*9c5db199SXin Li q = [], 134*9c5db199SXin Li an = [], 135*9c5db199SXin Li ns = answers) 136*9c5db199SXin Li 137*9c5db199SXin Li # Record the new answers received on the answer_calls list. 138*9c5db199SXin Li answer_calls = [] 139*9c5db199SXin Li self._zero.add_answer_observer(lambda args: answer_calls.extend(args)) 140*9c5db199SXin Li 141*9c5db199SXin Li # Send the packet to the registered callback. 142*9c5db199SXin Li sock = self._host._sockets[0] 143*9c5db199SXin Li cbk = sock._bind_recv_callback 144*9c5db199SXin Li cbk(bytes(mdns), 1234, 5353) 145*9c5db199SXin Li 146*9c5db199SXin Li # Check that the answers callback is called with all the answers in the 147*9c5db199SXin Li # received order. 148*9c5db199SXin Li self.assertEqual(len(answer_calls), 2) 149*9c5db199SXin Li ans1, ans2 = answer_calls # Each ans is a (rrtype, rrname, data) 150*9c5db199SXin Li self.assertEqual(ans1[2], ('one', 'two')) 151*9c5db199SXin Li self.assertEqual(ans2[2], ('two',)) 152*9c5db199SXin Li 153*9c5db199SXin Li # Check that the two records were cached. 154*9c5db199SXin Li records = self._zero.cached_results(domain_name, dpkt.dns.DNS_TXT) 155*9c5db199SXin Li self.assertEqual(len(records), 2) 156*9c5db199SXin Li 157*9c5db199SXin Li 158*9c5db199SXin Liif __name__ == '__main__': 159*9c5db199SXin Li unittest.main() 160