xref: /aosp_15_r20/external/openthread/tests/scripts/thread-cert/v1_2_test_enhanced_keep_alive.py (revision cfb92d1480a9e65faed56933e9c12405f45898b4)
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2019, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import unittest
31
32import thread_cert
33import config
34import mle
35
36LEADER = 1
37SED_1 = 2
38
39CHILD_TIMEOUT = 30
40
41DEFAULT_POLL_PERIOD = CHILD_TIMEOUT - 4
42"""The default poll period calculated by ot::Mac::DataPollSender::GetDefaultPollPeriod()."""
43
44USER_POLL_PERIOD = CHILD_TIMEOUT // 3
45"""The poll period explicitly set by this test for verifying enhanced keep-alive."""
46
47
48class SED_EnhancedKeepAlive(thread_cert.TestCase):
49    TOPOLOGY = {
50        LEADER: {
51            'version': '1.2'
52        },
53        SED_1: {
54            'mode': '-',
55            'version': '1.2',
56        },
57    }
58    """All nodes are created with default configurations"""
59
60    def test(self):
61        self.nodes[SED_1].set_timeout(CHILD_TIMEOUT)
62        self.nodes[SED_1].set_pollperiod(USER_POLL_PERIOD * 1000)
63
64        self.nodes[LEADER].start()
65        self.simulator.go(config.LEADER_STARTUP_DELAY)
66        self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
67
68        self.nodes[SED_1].start()
69        self.simulator.go(7)
70        self.assertEqual(self.nodes[SED_1].get_state(), 'child')
71
72        leader_messages = self.simulator.get_messages_sent_by(LEADER)
73        sed_messages = self.simulator.get_messages_sent_by(SED_1)
74
75        # 1 - Leader transmits MLE advertisements
76        msg = leader_messages.next_mle_message(mle.CommandType.ADVERTISEMENT)
77        msg.assertSentWithHopLimit(255)
78        msg.assertSentToDestinationAddress('ff02::1')
79        msg.assertMleMessageContainsTlv(mle.SourceAddress)
80        msg.assertMleMessageContainsTlv(mle.LeaderData)
81        msg.assertMleMessageContainsTlv(mle.Route64)
82
83        # 2 - SED_1 begins attach process by sending a multicast MLE Parent Request
84        msg = sed_messages.next_mle_message(mle.CommandType.PARENT_REQUEST)
85        msg.assertSentWithHopLimit(255)
86        msg.assertSentToDestinationAddress('ff02::2')
87        msg.assertMleMessageContainsTlv(mle.Mode)
88        msg.assertMleMessageContainsTlv(mle.Challenge)
89        msg.assertMleMessageContainsTlv(mle.ScanMask)
90        msg.assertMleMessageContainsTlv(mle.Version)
91        self.assertGreaterEqual(msg.get_mle_message_tlv(mle.Version).version, config.THREAD_VERSION_1_2)
92
93        scan_mask_tlv = msg.get_mle_message_tlv(mle.ScanMask)
94        self.assertEqual(1, scan_mask_tlv.router)
95        self.assertEqual(0, scan_mask_tlv.end_device)
96
97        # 3 - Leader sends a MLE Parent Response
98        msg = leader_messages.next_mle_message(mle.CommandType.PARENT_RESPONSE)
99        msg.assertSentToNode(self.nodes[SED_1])
100        msg.assertMleMessageContainsTlv(mle.SourceAddress)
101        msg.assertMleMessageContainsTlv(mle.LeaderData)
102        msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter)
103        msg.assertMleMessageContainsOptionalTlv(mle.MleFrameCounter)
104        msg.assertMleMessageContainsTlv(mle.Response)
105        msg.assertMleMessageContainsTlv(mle.Challenge)
106        msg.assertMleMessageContainsTlv(mle.LinkMargin)
107        msg.assertMleMessageContainsTlv(mle.Connectivity)
108        msg.assertMleMessageContainsTlv(mle.Version)
109        self.assertGreaterEqual(msg.get_mle_message_tlv(mle.Version).version, config.THREAD_VERSION_1_2)
110
111        # 4 - SED_1 receives the MLE Parent Response and sends a Child ID Request
112        msg = sed_messages.next_mle_message(mle.CommandType.CHILD_ID_REQUEST)
113        msg.assertSentToNode(self.nodes[LEADER])
114        msg.assertMleMessageContainsTlv(mle.Response)
115        msg.assertMleMessageContainsTlv(mle.LinkLayerFrameCounter)
116        msg.assertMleMessageContainsOptionalTlv(mle.MleFrameCounter)
117        msg.assertMleMessageContainsTlv(mle.Mode)
118        msg.assertMleMessageContainsTlv(mle.Timeout)
119        msg.assertMleMessageContainsTlv(mle.Version)
120        msg.assertMleMessageContainsTlv(mle.TlvRequest)
121        self.assertGreaterEqual(msg.get_mle_message_tlv(mle.Version).version, config.THREAD_VERSION_1_2)
122
123        # 5 - Leader responds with a Child ID Response
124        msg = leader_messages.next_mle_message(mle.CommandType.CHILD_ID_RESPONSE)
125        msg.assertSentToNode(self.nodes[SED_1])
126        msg.assertMleMessageContainsTlv(mle.SourceAddress)
127        msg.assertMleMessageContainsTlv(mle.LeaderData)
128        msg.assertMleMessageContainsTlv(mle.Address16)
129        msg.assertMleMessageContainsOptionalTlv(mle.NetworkData)
130        msg.assertMleMessageContainsOptionalTlv(mle.Route64)
131        msg.assertMleMessageContainsOptionalTlv(mle.AddressRegistration)
132
133        leader_aloc = self.nodes[LEADER].get_addr_leader_aloc()
134        self.assertTrue(self.nodes[SED_1].ping(leader_aloc, timeout=USER_POLL_PERIOD * 2))
135
136        # 6 - Timeout Child
137        self.nodes[SED_1].set_pollperiod(CHILD_TIMEOUT * 1000 * 4)
138        self.simulator.go(CHILD_TIMEOUT + 1)
139        self.assertEqual(self.nodes[SED_1].get_state(), 'child')
140        self.assertFalse(self.nodes[SED_1].ping(leader_aloc, timeout=USER_POLL_PERIOD * 2))
141        self.flush_all()
142
143        self.nodes[SED_1].stop()
144        self.nodes[SED_1].set_pollperiod(USER_POLL_PERIOD * 1000)
145        self.nodes[SED_1].start()
146
147        # 7 - Wait SED_1 to re-attach
148        self.simulator.go(240)
149        self.assertEqual(self.nodes[SED_1].get_state(), 'child')
150        leader_messages = self.simulator.get_messages_sent_by(LEADER)
151        msg = leader_messages.next_mle_message(mle.CommandType.CHILD_ID_RESPONSE)
152        msg.assertSentToNode(self.nodes[SED_1])
153        msg.assertMleMessageContainsTlv(mle.SourceAddress)
154        msg.assertMleMessageContainsTlv(mle.LeaderData)
155        msg.assertMleMessageContainsTlv(mle.Address16)
156        msg.assertMleMessageContainsOptionalTlv(mle.NetworkData)
157        msg.assertMleMessageContainsOptionalTlv(mle.Route64)
158        msg.assertMleMessageContainsOptionalTlv(mle.AddressRegistration)
159        self.assertTrue(self.nodes[SED_1].ping(leader_aloc, timeout=USER_POLL_PERIOD * 2))
160        self.flush_all()
161
162        # 8 - Verify enhanced keep-alive works
163        self.nodes[SED_1].set_pollperiod(CHILD_TIMEOUT * 1000 * 4)
164        self.simulator.go(CHILD_TIMEOUT // 2)
165        self.assertEqual(self.nodes[SED_1].get_state(), 'child')
166        non_exist_addr = leader_aloc.replace('fc00', 'fc12')
167        self.assertFalse(self.nodes[SED_1].ping(non_exist_addr))
168        self.simulator.go(CHILD_TIMEOUT // 2)
169        self.nodes[SED_1].set_pollperiod(USER_POLL_PERIOD * 1000)
170        self.assertTrue(self.nodes[SED_1].ping(leader_aloc, timeout=USER_POLL_PERIOD * 2))
171
172        # 9 - Verify child resets keep-alive timer
173        self.nodes[SED_1].set_pollperiod(DEFAULT_POLL_PERIOD * 1000)
174        self.simulator.go(DEFAULT_POLL_PERIOD // 3 * 2)
175        self.flush_all()
176        self.nodes[SED_1].ping(leader_aloc, timeout=1)
177        self.simulator.go(DEFAULT_POLL_PERIOD // 3 * 2)
178        sed_messages = self.simulator.get_messages_sent_by(SED_1)
179        self.assertEqual(sed_messages.next_data_poll(), None)
180
181
182if __name__ == '__main__':
183    unittest.main()
184