1#!/usr/bin/env python3 2# 3# Copyright (c) 2021, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import unittest 31 32import config 33import thread_cert 34 35ROUTER1 = 1 36ROUTER2 = 2 37ROUTER3 = 3 38ROUTER4 = 4 39ROUTER5 = 5 40 41TEST_PREFIX = '2001:dead:beef:cafe::/64' 42TEST_UDP_BYTES = 16 43TEST_UDP_PORT = 12345 44 45 46class TestAnycast(thread_cert.TestCase): 47 SUPPORT_NCP = False 48 49 TOPOLOGY = { 50 ROUTER1: { 51 'mode': 'rdn', 52 'allowlist': [ROUTER2], 53 }, 54 ROUTER2: { 55 'mode': 'rdn', 56 'allowlist': [ROUTER1, ROUTER3], 57 }, 58 ROUTER3: { 59 'mode': 'rdn', 60 'allowlist': [ROUTER2, ROUTER4], 61 }, 62 ROUTER4: { 63 'mode': 'rdn', 64 'allowlist': [ROUTER3, ROUTER5], 65 }, 66 ROUTER5: { 67 'mode': 'rdn', 68 'allowlist': [ROUTER4], 69 }, 70 } 71 72 def test(self): 73 self.nodes[ROUTER1].start() 74 self.simulator.go(config.LEADER_STARTUP_DELAY) 75 self.assertEqual(self.nodes[ROUTER1].get_state(), 'leader') 76 self.nodes[ROUTER1].udp_start('::', TEST_UDP_PORT) 77 78 for i in range(ROUTER2, ROUTER5 + 1): 79 self.nodes[i].start() 80 self.simulator.go(config.ROUTER_STARTUP_DELAY) 81 self.assertEqual(self.nodes[i].get_state(), 'router') 82 self.nodes[i].udp_start('::', TEST_UDP_PORT) 83 84 self.__test('ds') # DHCPv6 Agent (IPv6 Address) 85 self.__test('cs') # DHCPv6 Agent (Other) 86 self.__test('ns') # Neighbor Discovery Agent 87 88 def __test(self, borderRouterFlags): 89 # Single Server 90 self.nodes[ROUTER2].add_prefix(TEST_PREFIX, borderRouterFlags) 91 self.nodes[ROUTER2].register_netdata() 92 self.simulator.go(5) 93 print(self.nodes[ROUTER2].get_ip6_address(config.ADDRESS_TYPE.ALOC)) 94 self.assertTrue(len(self.nodes[ROUTER2].get_ip6_address(config.ADDRESS_TYPE.ALOC)) == 1) 95 96 self.nodes[ROUTER3].udp_send(TEST_UDP_BYTES, self.nodes[ROUTER2].get_ip6_address(config.ADDRESS_TYPE.ALOC)[0], 97 TEST_UDP_PORT) 98 self.simulator.go(1) 99 self.nodes[ROUTER2].udp_check_rx(TEST_UDP_BYTES) 100 101 # Multiple Servers 102 self.nodes[ROUTER5].add_prefix(TEST_PREFIX, borderRouterFlags) 103 self.nodes[ROUTER5].register_netdata() 104 self.simulator.go(5) 105 self.assertTrue(len(self.nodes[ROUTER5].get_ip6_address(config.ADDRESS_TYPE.ALOC)) == 1) 106 107 self.nodes[ROUTER3].udp_send(TEST_UDP_BYTES, self.nodes[ROUTER2].get_ip6_address(config.ADDRESS_TYPE.ALOC)[0], 108 TEST_UDP_PORT) 109 self.simulator.go(1) 110 self.nodes[ROUTER2].udp_check_rx(TEST_UDP_BYTES) 111 112 self.nodes[ROUTER4].udp_send(TEST_UDP_BYTES, self.nodes[ROUTER2].get_ip6_address(config.ADDRESS_TYPE.ALOC)[0], 113 TEST_UDP_PORT) 114 self.simulator.go(1) 115 self.nodes[ROUTER5].udp_check_rx(TEST_UDP_BYTES) 116 117 # Single Server 118 self.nodes[ROUTER2].remove_prefix(TEST_PREFIX) 119 self.nodes[ROUTER2].register_netdata() 120 self.simulator.go(5) 121 self.assertTrue(len(self.nodes[ROUTER2].get_ip6_address(config.ADDRESS_TYPE.ALOC)) == 0) 122 123 self.nodes[ROUTER3].udp_send(TEST_UDP_BYTES, self.nodes[ROUTER5].get_ip6_address(config.ADDRESS_TYPE.ALOC)[0], 124 TEST_UDP_PORT) 125 self.simulator.go(1) 126 self.nodes[ROUTER5].udp_check_rx(TEST_UDP_BYTES) 127 128 self.nodes[ROUTER4].udp_send(TEST_UDP_BYTES, self.nodes[ROUTER5].get_ip6_address(config.ADDRESS_TYPE.ALOC)[0], 129 TEST_UDP_PORT) 130 self.simulator.go(1) 131 self.nodes[ROUTER5].udp_check_rx(TEST_UDP_BYTES) 132 133 self.nodes[ROUTER5].remove_prefix(TEST_PREFIX) 134 self.nodes[ROUTER5].register_netdata() 135 self.simulator.go(5) 136 self.assertTrue(len(self.nodes[ROUTER5].get_ip6_address(config.ADDRESS_TYPE.ALOC)) == 0) 137 138 139if __name__ == '__main__': 140 unittest.main() 141