xref: /aosp_15_r20/external/openthread/tests/scripts/thread-cert/ipv6.py (revision cfb92d1480a9e65faed56933e9c12405f45898b4)
1*cfb92d14SAndroid Build Coastguard Worker#!/usr/bin/env python3
2*cfb92d14SAndroid Build Coastguard Worker#
3*cfb92d14SAndroid Build Coastguard Worker#  Copyright (c) 2016, The OpenThread Authors.
4*cfb92d14SAndroid Build Coastguard Worker#  All rights reserved.
5*cfb92d14SAndroid Build Coastguard Worker#
6*cfb92d14SAndroid Build Coastguard Worker#  Redistribution and use in source and binary forms, with or without
7*cfb92d14SAndroid Build Coastguard Worker#  modification, are permitted provided that the following conditions are met:
8*cfb92d14SAndroid Build Coastguard Worker#  1. Redistributions of source code must retain the above copyright
9*cfb92d14SAndroid Build Coastguard Worker#     notice, this list of conditions and the following disclaimer.
10*cfb92d14SAndroid Build Coastguard Worker#  2. Redistributions in binary form must reproduce the above copyright
11*cfb92d14SAndroid Build Coastguard Worker#     notice, this list of conditions and the following disclaimer in the
12*cfb92d14SAndroid Build Coastguard Worker#     documentation and/or other materials provided with the distribution.
13*cfb92d14SAndroid Build Coastguard Worker#  3. Neither the name of the copyright holder nor the
14*cfb92d14SAndroid Build Coastguard Worker#     names of its contributors may be used to endorse or promote products
15*cfb92d14SAndroid Build Coastguard Worker#     derived from this software without specific prior written permission.
16*cfb92d14SAndroid Build Coastguard Worker#
17*cfb92d14SAndroid Build Coastguard Worker#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18*cfb92d14SAndroid Build Coastguard Worker#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19*cfb92d14SAndroid Build Coastguard Worker#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20*cfb92d14SAndroid Build Coastguard Worker#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21*cfb92d14SAndroid Build Coastguard Worker#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22*cfb92d14SAndroid Build Coastguard Worker#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23*cfb92d14SAndroid Build Coastguard Worker#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24*cfb92d14SAndroid Build Coastguard Worker#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25*cfb92d14SAndroid Build Coastguard Worker#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26*cfb92d14SAndroid Build Coastguard Worker#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27*cfb92d14SAndroid Build Coastguard Worker#  POSSIBILITY OF SUCH DAMAGE.
28*cfb92d14SAndroid Build Coastguard Worker#
29*cfb92d14SAndroid Build Coastguard Worker
30*cfb92d14SAndroid Build Coastguard Workerimport abc
31*cfb92d14SAndroid Build Coastguard Workerimport io
32*cfb92d14SAndroid Build Coastguard Workerimport ipaddress
33*cfb92d14SAndroid Build Coastguard Workerimport struct
34*cfb92d14SAndroid Build Coastguard Worker
35*cfb92d14SAndroid Build Coastguard Workerfrom binascii import hexlify
36*cfb92d14SAndroid Build Coastguard Worker
37*cfb92d14SAndroid Build Coastguard Workerimport common
38*cfb92d14SAndroid Build Coastguard Worker
39*cfb92d14SAndroid Build Coastguard Workertry:
40*cfb92d14SAndroid Build Coastguard Worker    from itertools import izip_longest as zip_longest
41*cfb92d14SAndroid Build Coastguard Workerexcept ImportError:
42*cfb92d14SAndroid Build Coastguard Worker    from itertools import zip_longest
43*cfb92d14SAndroid Build Coastguard Worker
44*cfb92d14SAndroid Build Coastguard Worker# Next headers for IPv6 protocols
45*cfb92d14SAndroid Build Coastguard WorkerIPV6_NEXT_HEADER_HOP_BY_HOP = 0
46*cfb92d14SAndroid Build Coastguard WorkerIPV6_NEXT_HEADER_TCP = 6
47*cfb92d14SAndroid Build Coastguard WorkerIPV6_NEXT_HEADER_UDP = 17
48*cfb92d14SAndroid Build Coastguard WorkerIPV6_NEXT_HEADER_FRAGMENT = 44
49*cfb92d14SAndroid Build Coastguard WorkerIPV6_NEXT_HEADER_ICMP = 58
50*cfb92d14SAndroid Build Coastguard Worker
51*cfb92d14SAndroid Build Coastguard WorkerUPPER_LAYER_PROTOCOLS = [
52*cfb92d14SAndroid Build Coastguard Worker    IPV6_NEXT_HEADER_TCP,
53*cfb92d14SAndroid Build Coastguard Worker    IPV6_NEXT_HEADER_UDP,
54*cfb92d14SAndroid Build Coastguard Worker    IPV6_NEXT_HEADER_ICMP,
55*cfb92d14SAndroid Build Coastguard Worker]
56*cfb92d14SAndroid Build Coastguard Worker
57*cfb92d14SAndroid Build Coastguard Worker# ICMP Protocol codes
58*cfb92d14SAndroid Build Coastguard WorkerICMP_DESTINATION_UNREACHABLE = 1
59*cfb92d14SAndroid Build Coastguard WorkerICMP_TIME_EXCEEDED = 3
60*cfb92d14SAndroid Build Coastguard WorkerICMP_ECHO_REQUEST = 128
61*cfb92d14SAndroid Build Coastguard WorkerICMP_ECHO_RESPONSE = 129
62*cfb92d14SAndroid Build Coastguard Worker
63*cfb92d14SAndroid Build Coastguard Worker# Default hop limit for IPv6
64*cfb92d14SAndroid Build Coastguard WorkerHOP_LIMIT_DEFAULT = 64
65*cfb92d14SAndroid Build Coastguard Worker
66*cfb92d14SAndroid Build Coastguard Worker
67*cfb92d14SAndroid Build Coastguard Workerdef calculate_checksum(data):
68*cfb92d14SAndroid Build Coastguard Worker    """ Calculate checksum from data bytes.
69*cfb92d14SAndroid Build Coastguard Worker
70*cfb92d14SAndroid Build Coastguard Worker    How to calculate checksum (RFC 2460):
71*cfb92d14SAndroid Build Coastguard Worker        https://tools.ietf.org/html/rfc2460#page-27
72*cfb92d14SAndroid Build Coastguard Worker
73*cfb92d14SAndroid Build Coastguard Worker    Args:
74*cfb92d14SAndroid Build Coastguard Worker        data (bytes): input data from which checksum will be calculated
75*cfb92d14SAndroid Build Coastguard Worker
76*cfb92d14SAndroid Build Coastguard Worker    Returns:
77*cfb92d14SAndroid Build Coastguard Worker        int: calculated checksum
78*cfb92d14SAndroid Build Coastguard Worker    """
79*cfb92d14SAndroid Build Coastguard Worker    # Create halfwords from data bytes. Example: data[0] = 0x01, data[1] =
80*cfb92d14SAndroid Build Coastguard Worker    # 0xb2 => 0x01b2
81*cfb92d14SAndroid Build Coastguard Worker    halfwords = [((byte0 << 8) | byte1) for byte0, byte1 in zip_longest(data[::2], data[1::2], fillvalue=0x00)]
82*cfb92d14SAndroid Build Coastguard Worker
83*cfb92d14SAndroid Build Coastguard Worker    checksum = 0
84*cfb92d14SAndroid Build Coastguard Worker    for halfword in halfwords:
85*cfb92d14SAndroid Build Coastguard Worker        checksum += halfword
86*cfb92d14SAndroid Build Coastguard Worker        checksum = (checksum & 0xffff) + (checksum >> 16)
87*cfb92d14SAndroid Build Coastguard Worker
88*cfb92d14SAndroid Build Coastguard Worker    checksum ^= 0xffff
89*cfb92d14SAndroid Build Coastguard Worker
90*cfb92d14SAndroid Build Coastguard Worker    if checksum == 0:
91*cfb92d14SAndroid Build Coastguard Worker        return 0xffff
92*cfb92d14SAndroid Build Coastguard Worker    else:
93*cfb92d14SAndroid Build Coastguard Worker        return checksum
94*cfb92d14SAndroid Build Coastguard Worker
95*cfb92d14SAndroid Build Coastguard Worker
96*cfb92d14SAndroid Build Coastguard Workerdef synthesize_ip6_address(ip6_network: ipaddress.IPv6Network,
97*cfb92d14SAndroid Build Coastguard Worker                           ip4_address: ipaddress.IPv4Address) -> ipaddress.IPv6Address:
98*cfb92d14SAndroid Build Coastguard Worker    """ Synthesize an IPv6 address from a prefix for NAT64 and an IPv4 address.
99*cfb92d14SAndroid Build Coastguard Worker
100*cfb92d14SAndroid Build Coastguard Worker    Only supports /96 network for now.
101*cfb92d14SAndroid Build Coastguard Worker
102*cfb92d14SAndroid Build Coastguard Worker    Args:
103*cfb92d14SAndroid Build Coastguard Worker        ip6_network: The network for NAT64.
104*cfb92d14SAndroid Build Coastguard Worker        ip4_address: The IPv4 address.
105*cfb92d14SAndroid Build Coastguard Worker
106*cfb92d14SAndroid Build Coastguard Worker    Returns:
107*cfb92d14SAndroid Build Coastguard Worker        ipaddress.IPv6Address: The synthesized IPv6 address.
108*cfb92d14SAndroid Build Coastguard Worker    """
109*cfb92d14SAndroid Build Coastguard Worker    if ip6_network.prefixlen != 96:
110*cfb92d14SAndroid Build Coastguard Worker        # We are only using /96 networks in openthread
111*cfb92d14SAndroid Build Coastguard Worker        raise NotImplementedError("synthesize_ip6_address only supports /96 networks")
112*cfb92d14SAndroid Build Coastguard Worker    return ipaddress.IPv6Address(int(ip6_network.network_address) | int(ip4_address))
113*cfb92d14SAndroid Build Coastguard Worker
114*cfb92d14SAndroid Build Coastguard Worker
115*cfb92d14SAndroid Build Coastguard Workerclass PacketFactory(object):
116*cfb92d14SAndroid Build Coastguard Worker    """ Interface for classes that produce objects from data. """
117*cfb92d14SAndroid Build Coastguard Worker
118*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
119*cfb92d14SAndroid Build Coastguard Worker        """ Convert data to object.
120*cfb92d14SAndroid Build Coastguard Worker
121*cfb92d14SAndroid Build Coastguard Worker        Args:
122*cfb92d14SAndroid Build Coastguard Worker            data (BytesIO)
123*cfb92d14SAndroid Build Coastguard Worker            message_info (MessageInfo)
124*cfb92d14SAndroid Build Coastguard Worker
125*cfb92d14SAndroid Build Coastguard Worker        """
126*cfb92d14SAndroid Build Coastguard Worker        raise NotImplementedError
127*cfb92d14SAndroid Build Coastguard Worker
128*cfb92d14SAndroid Build Coastguard Worker
129*cfb92d14SAndroid Build Coastguard Workerclass BuildableFromBytes(object):
130*cfb92d14SAndroid Build Coastguard Worker    """ Interface for classes which can be built from bytes. """
131*cfb92d14SAndroid Build Coastguard Worker
132*cfb92d14SAndroid Build Coastguard Worker    @classmethod
133*cfb92d14SAndroid Build Coastguard Worker    def from_bytes(cls, data):
134*cfb92d14SAndroid Build Coastguard Worker        """ Convert data to object.
135*cfb92d14SAndroid Build Coastguard Worker
136*cfb92d14SAndroid Build Coastguard Worker        Args:
137*cfb92d14SAndroid Build Coastguard Worker            data (bytes)
138*cfb92d14SAndroid Build Coastguard Worker
139*cfb92d14SAndroid Build Coastguard Worker        """
140*cfb92d14SAndroid Build Coastguard Worker        raise NotImplementedError
141*cfb92d14SAndroid Build Coastguard Worker
142*cfb92d14SAndroid Build Coastguard Worker
143*cfb92d14SAndroid Build Coastguard Workerclass ConvertibleToBytes(object):
144*cfb92d14SAndroid Build Coastguard Worker    """ Interface for classes which can be converted to bytes. """
145*cfb92d14SAndroid Build Coastguard Worker
146*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
147*cfb92d14SAndroid Build Coastguard Worker        """ Convert object to data.
148*cfb92d14SAndroid Build Coastguard Worker
149*cfb92d14SAndroid Build Coastguard Worker        Returns:
150*cfb92d14SAndroid Build Coastguard Worker            bytes
151*cfb92d14SAndroid Build Coastguard Worker        """
152*cfb92d14SAndroid Build Coastguard Worker        raise NotImplementedError
153*cfb92d14SAndroid Build Coastguard Worker
154*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
155*cfb92d14SAndroid Build Coastguard Worker        """ Length of data (in bytes).
156*cfb92d14SAndroid Build Coastguard Worker
157*cfb92d14SAndroid Build Coastguard Worker        Returns:
158*cfb92d14SAndroid Build Coastguard Worker            int
159*cfb92d14SAndroid Build Coastguard Worker        """
160*cfb92d14SAndroid Build Coastguard Worker        raise NotImplementedError
161*cfb92d14SAndroid Build Coastguard Worker
162*cfb92d14SAndroid Build Coastguard Worker
163*cfb92d14SAndroid Build Coastguard Workerclass Header(object):
164*cfb92d14SAndroid Build Coastguard Worker    """ Interface for header classes. """
165*cfb92d14SAndroid Build Coastguard Worker
166*cfb92d14SAndroid Build Coastguard Worker    __metaclass__ = abc.ABCMeta
167*cfb92d14SAndroid Build Coastguard Worker
168*cfb92d14SAndroid Build Coastguard Worker    @abc.abstractproperty
169*cfb92d14SAndroid Build Coastguard Worker    def type(self):
170*cfb92d14SAndroid Build Coastguard Worker        """ Number which can be used in the next header field in IPv6 header or next headers.
171*cfb92d14SAndroid Build Coastguard Worker
172*cfb92d14SAndroid Build Coastguard Worker        Returns:
173*cfb92d14SAndroid Build Coastguard Worker            int
174*cfb92d14SAndroid Build Coastguard Worker        """
175*cfb92d14SAndroid Build Coastguard Worker
176*cfb92d14SAndroid Build Coastguard Worker
177*cfb92d14SAndroid Build Coastguard Workerclass ExtensionHeader(object):
178*cfb92d14SAndroid Build Coastguard Worker    """ Base for classes representing Extension Headers in IPv6 packets. """
179*cfb92d14SAndroid Build Coastguard Worker
180*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, next_header, hdr_ext_len=0):
181*cfb92d14SAndroid Build Coastguard Worker        self.next_header = next_header
182*cfb92d14SAndroid Build Coastguard Worker        self.hdr_ext_len = hdr_ext_len
183*cfb92d14SAndroid Build Coastguard Worker
184*cfb92d14SAndroid Build Coastguard Worker
185*cfb92d14SAndroid Build Coastguard Workerclass UpperLayerProtocol(Header, ConvertibleToBytes):
186*cfb92d14SAndroid Build Coastguard Worker    """ Base for classes representing upper layer protocol payload in IPv6 packets. """
187*cfb92d14SAndroid Build Coastguard Worker
188*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, header):
189*cfb92d14SAndroid Build Coastguard Worker        self.header = header
190*cfb92d14SAndroid Build Coastguard Worker
191*cfb92d14SAndroid Build Coastguard Worker    @property
192*cfb92d14SAndroid Build Coastguard Worker    def checksum(self):
193*cfb92d14SAndroid Build Coastguard Worker        """ Return checksum from upper layer protocol header. """
194*cfb92d14SAndroid Build Coastguard Worker        return self.header.checksum
195*cfb92d14SAndroid Build Coastguard Worker
196*cfb92d14SAndroid Build Coastguard Worker    @checksum.setter
197*cfb92d14SAndroid Build Coastguard Worker    def checksum(self, value):
198*cfb92d14SAndroid Build Coastguard Worker        """ Set checksum value in upper layer protocol header. """
199*cfb92d14SAndroid Build Coastguard Worker        self.header.checksum = value
200*cfb92d14SAndroid Build Coastguard Worker
201*cfb92d14SAndroid Build Coastguard Worker    def is_valid_checksum(self):
202*cfb92d14SAndroid Build Coastguard Worker        """ Return information if set checksum is valid.
203*cfb92d14SAndroid Build Coastguard Worker
204*cfb92d14SAndroid Build Coastguard Worker        It is not possible to get zero from checksum calculation.
205*cfb92d14SAndroid Build Coastguard Worker        Zero indicates invalid checksum value.
206*cfb92d14SAndroid Build Coastguard Worker
207*cfb92d14SAndroid Build Coastguard Worker        Returns:
208*cfb92d14SAndroid Build Coastguard Worker            bool
209*cfb92d14SAndroid Build Coastguard Worker        """
210*cfb92d14SAndroid Build Coastguard Worker        return self.checksum != 0
211*cfb92d14SAndroid Build Coastguard Worker
212*cfb92d14SAndroid Build Coastguard Worker
213*cfb92d14SAndroid Build Coastguard Workerclass IPv6PseudoHeader(ConvertibleToBytes):
214*cfb92d14SAndroid Build Coastguard Worker    """ Class representing IPv6 pseudo header which is required to calculate
215*cfb92d14SAndroid Build Coastguard Worker    upper layer protocol (like e.g. UDP or ICMPv6) checksum.
216*cfb92d14SAndroid Build Coastguard Worker
217*cfb92d14SAndroid Build Coastguard Worker    This class is used only during upper layer protocol checksum calculation. Do not use it outside of this module.
218*cfb92d14SAndroid Build Coastguard Worker
219*cfb92d14SAndroid Build Coastguard Worker    """
220*cfb92d14SAndroid Build Coastguard Worker
221*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, source_address, destination_address, payload_length, next_header):
222*cfb92d14SAndroid Build Coastguard Worker        self._source_address = self._convert_to_ipaddress(source_address)
223*cfb92d14SAndroid Build Coastguard Worker        self._destination_address = self._convert_to_ipaddress(destination_address)
224*cfb92d14SAndroid Build Coastguard Worker        self.payload_length = payload_length
225*cfb92d14SAndroid Build Coastguard Worker        self.next_header = next_header
226*cfb92d14SAndroid Build Coastguard Worker
227*cfb92d14SAndroid Build Coastguard Worker    def _convert_to_ipaddress(self, value):
228*cfb92d14SAndroid Build Coastguard Worker        if isinstance(value, bytearray):
229*cfb92d14SAndroid Build Coastguard Worker            value = bytes(value)
230*cfb92d14SAndroid Build Coastguard Worker
231*cfb92d14SAndroid Build Coastguard Worker        return ipaddress.ip_address(value)
232*cfb92d14SAndroid Build Coastguard Worker
233*cfb92d14SAndroid Build Coastguard Worker    @property
234*cfb92d14SAndroid Build Coastguard Worker    def source_address(self):
235*cfb92d14SAndroid Build Coastguard Worker        return self._source_address
236*cfb92d14SAndroid Build Coastguard Worker
237*cfb92d14SAndroid Build Coastguard Worker    @source_address.setter
238*cfb92d14SAndroid Build Coastguard Worker    def source_address(self, value):
239*cfb92d14SAndroid Build Coastguard Worker        self._source_address = self._convert_to_ipaddress(value)
240*cfb92d14SAndroid Build Coastguard Worker
241*cfb92d14SAndroid Build Coastguard Worker    @property
242*cfb92d14SAndroid Build Coastguard Worker    def destination_address(self):
243*cfb92d14SAndroid Build Coastguard Worker        return self._destination_address
244*cfb92d14SAndroid Build Coastguard Worker
245*cfb92d14SAndroid Build Coastguard Worker    @destination_address.setter
246*cfb92d14SAndroid Build Coastguard Worker    def destination_address(self, value):
247*cfb92d14SAndroid Build Coastguard Worker        self._source_address = self._convert_to_ipaddress(value)
248*cfb92d14SAndroid Build Coastguard Worker
249*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
250*cfb92d14SAndroid Build Coastguard Worker        data = bytearray()
251*cfb92d14SAndroid Build Coastguard Worker        data += self.source_address.packed
252*cfb92d14SAndroid Build Coastguard Worker        data += self.destination_address.packed
253*cfb92d14SAndroid Build Coastguard Worker        data += struct.pack(">I", self.payload_length)
254*cfb92d14SAndroid Build Coastguard Worker        data += struct.pack(">I", self.next_header)
255*cfb92d14SAndroid Build Coastguard Worker
256*cfb92d14SAndroid Build Coastguard Worker        return data
257*cfb92d14SAndroid Build Coastguard Worker
258*cfb92d14SAndroid Build Coastguard Worker
259*cfb92d14SAndroid Build Coastguard Workerclass IPv6Header(ConvertibleToBytes, BuildableFromBytes):
260*cfb92d14SAndroid Build Coastguard Worker    """ Class representing IPv6 packet header. """
261*cfb92d14SAndroid Build Coastguard Worker
262*cfb92d14SAndroid Build Coastguard Worker    _version = 6
263*cfb92d14SAndroid Build Coastguard Worker
264*cfb92d14SAndroid Build Coastguard Worker    _header_length = 40
265*cfb92d14SAndroid Build Coastguard Worker
266*cfb92d14SAndroid Build Coastguard Worker    def __init__(
267*cfb92d14SAndroid Build Coastguard Worker        self,
268*cfb92d14SAndroid Build Coastguard Worker        source_address,
269*cfb92d14SAndroid Build Coastguard Worker        destination_address,
270*cfb92d14SAndroid Build Coastguard Worker        traffic_class=0,
271*cfb92d14SAndroid Build Coastguard Worker        flow_label=0,
272*cfb92d14SAndroid Build Coastguard Worker        hop_limit=64,
273*cfb92d14SAndroid Build Coastguard Worker        payload_length=0,
274*cfb92d14SAndroid Build Coastguard Worker        next_header=0,
275*cfb92d14SAndroid Build Coastguard Worker    ):
276*cfb92d14SAndroid Build Coastguard Worker        self.version = self._version
277*cfb92d14SAndroid Build Coastguard Worker        self._source_address = self._convert_to_ipaddress(source_address)
278*cfb92d14SAndroid Build Coastguard Worker        self._destination_address = self._convert_to_ipaddress(destination_address)
279*cfb92d14SAndroid Build Coastguard Worker        self.traffic_class = traffic_class
280*cfb92d14SAndroid Build Coastguard Worker        self.flow_label = flow_label
281*cfb92d14SAndroid Build Coastguard Worker        self.hop_limit = hop_limit
282*cfb92d14SAndroid Build Coastguard Worker        self.payload_length = payload_length
283*cfb92d14SAndroid Build Coastguard Worker        self.next_header = next_header
284*cfb92d14SAndroid Build Coastguard Worker
285*cfb92d14SAndroid Build Coastguard Worker    def _convert_to_ipaddress(self, value):
286*cfb92d14SAndroid Build Coastguard Worker        if isinstance(value, bytearray):
287*cfb92d14SAndroid Build Coastguard Worker            value = bytes(value)
288*cfb92d14SAndroid Build Coastguard Worker
289*cfb92d14SAndroid Build Coastguard Worker        return ipaddress.ip_address(value)
290*cfb92d14SAndroid Build Coastguard Worker
291*cfb92d14SAndroid Build Coastguard Worker    @property
292*cfb92d14SAndroid Build Coastguard Worker    def source_address(self):
293*cfb92d14SAndroid Build Coastguard Worker        return self._source_address
294*cfb92d14SAndroid Build Coastguard Worker
295*cfb92d14SAndroid Build Coastguard Worker    @source_address.setter
296*cfb92d14SAndroid Build Coastguard Worker    def source_address(self, value):
297*cfb92d14SAndroid Build Coastguard Worker        self._source_address = self._convert_to_ipaddress(value)
298*cfb92d14SAndroid Build Coastguard Worker
299*cfb92d14SAndroid Build Coastguard Worker    @property
300*cfb92d14SAndroid Build Coastguard Worker    def destination_address(self):
301*cfb92d14SAndroid Build Coastguard Worker        return self._destination_address
302*cfb92d14SAndroid Build Coastguard Worker
303*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
304*cfb92d14SAndroid Build Coastguard Worker        data = bytearray([
305*cfb92d14SAndroid Build Coastguard Worker            ((self.version & 0x0F) << 4) | ((self.traffic_class >> 4) & 0x0F),
306*cfb92d14SAndroid Build Coastguard Worker            ((self.traffic_class & 0x0F) << 4) | ((self.flow_label >> 16) & 0x0F),
307*cfb92d14SAndroid Build Coastguard Worker            ((self.flow_label >> 8) & 0xff),
308*cfb92d14SAndroid Build Coastguard Worker            ((self.flow_label & 0xff)),
309*cfb92d14SAndroid Build Coastguard Worker        ])
310*cfb92d14SAndroid Build Coastguard Worker        data += struct.pack(">H", self.payload_length)
311*cfb92d14SAndroid Build Coastguard Worker        data += bytearray([self.next_header, self.hop_limit])
312*cfb92d14SAndroid Build Coastguard Worker        data += self.source_address.packed
313*cfb92d14SAndroid Build Coastguard Worker        data += self.destination_address.packed
314*cfb92d14SAndroid Build Coastguard Worker
315*cfb92d14SAndroid Build Coastguard Worker        return data
316*cfb92d14SAndroid Build Coastguard Worker
317*cfb92d14SAndroid Build Coastguard Worker    @classmethod
318*cfb92d14SAndroid Build Coastguard Worker    def from_bytes(cls, data):
319*cfb92d14SAndroid Build Coastguard Worker        b = bytearray(data.read(4))
320*cfb92d14SAndroid Build Coastguard Worker
321*cfb92d14SAndroid Build Coastguard Worker        (b[0] >> 4) & 0x0F
322*cfb92d14SAndroid Build Coastguard Worker        traffic_class = ((b[0] & 0x0F) << 4) | ((b[1] >> 4) & 0x0F)
323*cfb92d14SAndroid Build Coastguard Worker        flow_label = ((b[1] & 0x0F) << 16) | (b[2] << 8) | b[3]
324*cfb92d14SAndroid Build Coastguard Worker
325*cfb92d14SAndroid Build Coastguard Worker        payload_length = struct.unpack(">H", data.read(2))[0]
326*cfb92d14SAndroid Build Coastguard Worker        next_header = ord(data.read(1))
327*cfb92d14SAndroid Build Coastguard Worker        hop_limit = ord(data.read(1))
328*cfb92d14SAndroid Build Coastguard Worker        src_addr = bytearray(data.read(16))
329*cfb92d14SAndroid Build Coastguard Worker        dst_addr = bytearray(data.read(16))
330*cfb92d14SAndroid Build Coastguard Worker
331*cfb92d14SAndroid Build Coastguard Worker        return cls(
332*cfb92d14SAndroid Build Coastguard Worker            src_addr,
333*cfb92d14SAndroid Build Coastguard Worker            dst_addr,
334*cfb92d14SAndroid Build Coastguard Worker            traffic_class,
335*cfb92d14SAndroid Build Coastguard Worker            flow_label,
336*cfb92d14SAndroid Build Coastguard Worker            hop_limit,
337*cfb92d14SAndroid Build Coastguard Worker            payload_length,
338*cfb92d14SAndroid Build Coastguard Worker            next_header,
339*cfb92d14SAndroid Build Coastguard Worker        )
340*cfb92d14SAndroid Build Coastguard Worker
341*cfb92d14SAndroid Build Coastguard Worker    def __repr__(self):
342*cfb92d14SAndroid Build Coastguard Worker        return "IPv6Header(source_address={}, destination_address={}, next_header={}, payload_length={}, \
343*cfb92d14SAndroid Build Coastguard Worker            hop_limit={}, traffic_class={}, flow_label={})".format(
344*cfb92d14SAndroid Build Coastguard Worker            self.source_address.compressed,
345*cfb92d14SAndroid Build Coastguard Worker            self.destination_address.compressed,
346*cfb92d14SAndroid Build Coastguard Worker            self.next_header,
347*cfb92d14SAndroid Build Coastguard Worker            self.payload_length,
348*cfb92d14SAndroid Build Coastguard Worker            self.hop_limit,
349*cfb92d14SAndroid Build Coastguard Worker            self.traffic_class,
350*cfb92d14SAndroid Build Coastguard Worker            self.flow_label,
351*cfb92d14SAndroid Build Coastguard Worker        )
352*cfb92d14SAndroid Build Coastguard Worker
353*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
354*cfb92d14SAndroid Build Coastguard Worker        return self._header_length
355*cfb92d14SAndroid Build Coastguard Worker
356*cfb92d14SAndroid Build Coastguard Worker
357*cfb92d14SAndroid Build Coastguard Workerclass IPv6Packet(ConvertibleToBytes):
358*cfb92d14SAndroid Build Coastguard Worker    """ Class representing IPv6 packet.
359*cfb92d14SAndroid Build Coastguard Worker
360*cfb92d14SAndroid Build Coastguard Worker    IPv6 packet consists of IPv6 header, optional extension header, and upper layer protocol.
361*cfb92d14SAndroid Build Coastguard Worker
362*cfb92d14SAndroid Build Coastguard Worker                                            IPv6 packet
363*cfb92d14SAndroid Build Coastguard Worker
364*cfb92d14SAndroid Build Coastguard Worker    +-------------+----------------------------------+----------------------------------------------+
365*cfb92d14SAndroid Build Coastguard Worker    |             |                                  |                                              |
366*cfb92d14SAndroid Build Coastguard Worker    | IPv6 header | extension headers (zero or more) | upper layer protocol (e.g. UDP, TCP, ICMPv6) |
367*cfb92d14SAndroid Build Coastguard Worker    |             |                                  |                                              |
368*cfb92d14SAndroid Build Coastguard Worker    +-------------+----------------------------------+----------------------------------------------+
369*cfb92d14SAndroid Build Coastguard Worker
370*cfb92d14SAndroid Build Coastguard Worker    Extension headers:
371*cfb92d14SAndroid Build Coastguard Worker        - HopByHop
372*cfb92d14SAndroid Build Coastguard Worker        - Routing header (not implemented in this module)
373*cfb92d14SAndroid Build Coastguard Worker        - Fragment Header
374*cfb92d14SAndroid Build Coastguard Worker
375*cfb92d14SAndroid Build Coastguard Worker    Upper layer protocols:
376*cfb92d14SAndroid Build Coastguard Worker        - ICMPv6
377*cfb92d14SAndroid Build Coastguard Worker        - UDP
378*cfb92d14SAndroid Build Coastguard Worker        - TCP (not implemented in this module)
379*cfb92d14SAndroid Build Coastguard Worker
380*cfb92d14SAndroid Build Coastguard Worker    Example:
381*cfb92d14SAndroid Build Coastguard Worker        IPv6 packet construction without extension headers:
382*cfb92d14SAndroid Build Coastguard Worker
383*cfb92d14SAndroid Build Coastguard Worker        ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"),
384*cfb92d14SAndroid Build Coastguard Worker                                 ICMPv6(ICMPv6Header(128, 0),
385*cfb92d14SAndroid Build Coastguard Worker                                        ICMPv6EchoBody(0, 2,  bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
386*cfb92d14SAndroid Build Coastguard Worker                                                                     0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
387*cfb92d14SAndroid Build Coastguard Worker                                                                     0x41, 0x41]))))
388*cfb92d14SAndroid Build Coastguard Worker
389*cfb92d14SAndroid Build Coastguard Worker        IPv6 packet construction with extension headers:
390*cfb92d14SAndroid Build Coastguard Worker
391*cfb92d14SAndroid Build Coastguard Worker        ipv6_packet = IPv6Packet(IPv6Header("fd00:1234:4555::ff:fe00:1800", "ff03::1"),
392*cfb92d14SAndroid Build Coastguard Worker                                 ICMPv6(ICMPv6Header(128, 0),
393*cfb92d14SAndroid Build Coastguard Worker                                        ICMPv6EchoBody(0, 2,  bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
394*cfb92d14SAndroid Build Coastguard Worker                                                                     0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
395*cfb92d14SAndroid Build Coastguard Worker                                                                     0x41, 0x41])),
396*cfb92d14SAndroid Build Coastguard Worker                                 [HopByHop(options=[
397*cfb92d14SAndroid Build Coastguard Worker                                     HopByHopOption(HopByHopOptionHeader(_type=0x6d),
398*cfb92d14SAndroid Build Coastguard Worker                                                    MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18])))
399*cfb92d14SAndroid Build Coastguard Worker                                 ])])
400*cfb92d14SAndroid Build Coastguard Worker
401*cfb92d14SAndroid Build Coastguard Worker    """
402*cfb92d14SAndroid Build Coastguard Worker
403*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, ipv6_header, upper_layer_protocol, extension_headers=None):
404*cfb92d14SAndroid Build Coastguard Worker        self.ipv6_header = ipv6_header
405*cfb92d14SAndroid Build Coastguard Worker
406*cfb92d14SAndroid Build Coastguard Worker        self.upper_layer_protocol = upper_layer_protocol
407*cfb92d14SAndroid Build Coastguard Worker
408*cfb92d14SAndroid Build Coastguard Worker        self.extension_headers = (extension_headers if extension_headers is not None else [])
409*cfb92d14SAndroid Build Coastguard Worker
410*cfb92d14SAndroid Build Coastguard Worker        self._update_next_header_values_in_headers()
411*cfb92d14SAndroid Build Coastguard Worker
412*cfb92d14SAndroid Build Coastguard Worker        if not upper_layer_protocol.is_valid_checksum():
413*cfb92d14SAndroid Build Coastguard Worker            self.upper_layer_protocol.checksum = self.calculate_checksum()
414*cfb92d14SAndroid Build Coastguard Worker
415*cfb92d14SAndroid Build Coastguard Worker    def _validate_checksum(self):
416*cfb92d14SAndroid Build Coastguard Worker        checksum = self.calculate_checksum()
417*cfb92d14SAndroid Build Coastguard Worker
418*cfb92d14SAndroid Build Coastguard Worker        if self.upper_layer_protocol.checksum != checksum:
419*cfb92d14SAndroid Build Coastguard Worker            raise RuntimeError("Could not create IPv6 packet. "
420*cfb92d14SAndroid Build Coastguard Worker                               "Invalid checksum: {}!={}".format(self.upper_layer_protocol.checksum, checksum))
421*cfb92d14SAndroid Build Coastguard Worker
422*cfb92d14SAndroid Build Coastguard Worker        self.upper_layer_protocol.checksum = checksum
423*cfb92d14SAndroid Build Coastguard Worker
424*cfb92d14SAndroid Build Coastguard Worker    def _update_payload_length_value_in_ipv6_header(self):
425*cfb92d14SAndroid Build Coastguard Worker        self.ipv6_header.payload_length = len(self.upper_layer_protocol) + sum(
426*cfb92d14SAndroid Build Coastguard Worker            [len(extension_header) for extension_header in self.extension_headers])
427*cfb92d14SAndroid Build Coastguard Worker
428*cfb92d14SAndroid Build Coastguard Worker    def _update_next_header_values_in_headers(self):
429*cfb92d14SAndroid Build Coastguard Worker        last_header = self.ipv6_header
430*cfb92d14SAndroid Build Coastguard Worker
431*cfb92d14SAndroid Build Coastguard Worker        for extension_header in self.extension_headers:
432*cfb92d14SAndroid Build Coastguard Worker            last_header.next_header = extension_header.type
433*cfb92d14SAndroid Build Coastguard Worker            last_header = extension_header
434*cfb92d14SAndroid Build Coastguard Worker
435*cfb92d14SAndroid Build Coastguard Worker        last_header.next_header = self.upper_layer_protocol.type
436*cfb92d14SAndroid Build Coastguard Worker
437*cfb92d14SAndroid Build Coastguard Worker    def calculate_checksum(self):
438*cfb92d14SAndroid Build Coastguard Worker        saved_checksum = self.upper_layer_protocol.checksum
439*cfb92d14SAndroid Build Coastguard Worker
440*cfb92d14SAndroid Build Coastguard Worker        self.upper_layer_protocol.checksum = 0
441*cfb92d14SAndroid Build Coastguard Worker
442*cfb92d14SAndroid Build Coastguard Worker        upper_layer_protocol_bytes = self.upper_layer_protocol.to_bytes()
443*cfb92d14SAndroid Build Coastguard Worker
444*cfb92d14SAndroid Build Coastguard Worker        self.upper_layer_protocol.checksum = saved_checksum
445*cfb92d14SAndroid Build Coastguard Worker
446*cfb92d14SAndroid Build Coastguard Worker        pseudo_header = IPv6PseudoHeader(
447*cfb92d14SAndroid Build Coastguard Worker            self.ipv6_header.source_address,
448*cfb92d14SAndroid Build Coastguard Worker            self.ipv6_header.destination_address,
449*cfb92d14SAndroid Build Coastguard Worker            len(upper_layer_protocol_bytes),
450*cfb92d14SAndroid Build Coastguard Worker            self.upper_layer_protocol.type,
451*cfb92d14SAndroid Build Coastguard Worker        )
452*cfb92d14SAndroid Build Coastguard Worker
453*cfb92d14SAndroid Build Coastguard Worker        return calculate_checksum(pseudo_header.to_bytes() + upper_layer_protocol_bytes)
454*cfb92d14SAndroid Build Coastguard Worker
455*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
456*cfb92d14SAndroid Build Coastguard Worker        self._update_payload_length_value_in_ipv6_header()
457*cfb92d14SAndroid Build Coastguard Worker        self._update_next_header_values_in_headers()
458*cfb92d14SAndroid Build Coastguard Worker        self.upper_layer_protocol.checksum = self.calculate_checksum()
459*cfb92d14SAndroid Build Coastguard Worker
460*cfb92d14SAndroid Build Coastguard Worker        ipv6_packet = self.ipv6_header.to_bytes()
461*cfb92d14SAndroid Build Coastguard Worker
462*cfb92d14SAndroid Build Coastguard Worker        for extension_header in self.extension_headers:
463*cfb92d14SAndroid Build Coastguard Worker            ipv6_packet += extension_header.to_bytes()
464*cfb92d14SAndroid Build Coastguard Worker
465*cfb92d14SAndroid Build Coastguard Worker        ipv6_packet += self.upper_layer_protocol.to_bytes()
466*cfb92d14SAndroid Build Coastguard Worker
467*cfb92d14SAndroid Build Coastguard Worker        return ipv6_packet
468*cfb92d14SAndroid Build Coastguard Worker
469*cfb92d14SAndroid Build Coastguard Worker    def __repr__(self):
470*cfb92d14SAndroid Build Coastguard Worker        return "IPv6Packet(header={}, upper_layer_protocol={})".format(self.ipv6_header, self.upper_layer_protocol)
471*cfb92d14SAndroid Build Coastguard Worker
472*cfb92d14SAndroid Build Coastguard Worker
473*cfb92d14SAndroid Build Coastguard Workerclass UDPHeader(ConvertibleToBytes, BuildableFromBytes):
474*cfb92d14SAndroid Build Coastguard Worker    """ Class representing UDP datagram header.
475*cfb92d14SAndroid Build Coastguard Worker
476*cfb92d14SAndroid Build Coastguard Worker    This header is required to construct UDP datagram.
477*cfb92d14SAndroid Build Coastguard Worker
478*cfb92d14SAndroid Build Coastguard Worker    """
479*cfb92d14SAndroid Build Coastguard Worker
480*cfb92d14SAndroid Build Coastguard Worker    _header_length = 8
481*cfb92d14SAndroid Build Coastguard Worker
482*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, src_port, dst_port, payload_length=0, checksum=0):
483*cfb92d14SAndroid Build Coastguard Worker        self.src_port = src_port
484*cfb92d14SAndroid Build Coastguard Worker        self.dst_port = dst_port
485*cfb92d14SAndroid Build Coastguard Worker
486*cfb92d14SAndroid Build Coastguard Worker        self._payload_length = payload_length
487*cfb92d14SAndroid Build Coastguard Worker        self.checksum = checksum
488*cfb92d14SAndroid Build Coastguard Worker
489*cfb92d14SAndroid Build Coastguard Worker    @property
490*cfb92d14SAndroid Build Coastguard Worker    def type(self):
491*cfb92d14SAndroid Build Coastguard Worker        return 17
492*cfb92d14SAndroid Build Coastguard Worker
493*cfb92d14SAndroid Build Coastguard Worker    @property
494*cfb92d14SAndroid Build Coastguard Worker    def payload_length(self):
495*cfb92d14SAndroid Build Coastguard Worker        return self._payload_length
496*cfb92d14SAndroid Build Coastguard Worker
497*cfb92d14SAndroid Build Coastguard Worker    @payload_length.setter
498*cfb92d14SAndroid Build Coastguard Worker    def payload_length(self, value):
499*cfb92d14SAndroid Build Coastguard Worker        self._payload_length = self._header_length + value
500*cfb92d14SAndroid Build Coastguard Worker
501*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
502*cfb92d14SAndroid Build Coastguard Worker        data = struct.pack(">H", self.src_port)
503*cfb92d14SAndroid Build Coastguard Worker        data += struct.pack(">H", self.dst_port)
504*cfb92d14SAndroid Build Coastguard Worker        data += struct.pack(">H", self.payload_length)
505*cfb92d14SAndroid Build Coastguard Worker        data += struct.pack(">H", self.checksum)
506*cfb92d14SAndroid Build Coastguard Worker
507*cfb92d14SAndroid Build Coastguard Worker        return data
508*cfb92d14SAndroid Build Coastguard Worker
509*cfb92d14SAndroid Build Coastguard Worker    @classmethod
510*cfb92d14SAndroid Build Coastguard Worker    def from_bytes(cls, data):
511*cfb92d14SAndroid Build Coastguard Worker        src_port = struct.unpack(">H", data.read(2))[0]
512*cfb92d14SAndroid Build Coastguard Worker        dst_port = struct.unpack(">H", data.read(2))[0]
513*cfb92d14SAndroid Build Coastguard Worker        payload_length = struct.unpack(">H", data.read(2))[0]
514*cfb92d14SAndroid Build Coastguard Worker        checksum = struct.unpack(">H", data.read(2))[0]
515*cfb92d14SAndroid Build Coastguard Worker
516*cfb92d14SAndroid Build Coastguard Worker        return cls(src_port, dst_port, payload_length, checksum)
517*cfb92d14SAndroid Build Coastguard Worker
518*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
519*cfb92d14SAndroid Build Coastguard Worker        return self._header_length
520*cfb92d14SAndroid Build Coastguard Worker
521*cfb92d14SAndroid Build Coastguard Worker
522*cfb92d14SAndroid Build Coastguard Workerclass UDPDatagram(UpperLayerProtocol):
523*cfb92d14SAndroid Build Coastguard Worker    """ Class representing UDP datagram.
524*cfb92d14SAndroid Build Coastguard Worker
525*cfb92d14SAndroid Build Coastguard Worker    UDP is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol.
526*cfb92d14SAndroid Build Coastguard Worker
527*cfb92d14SAndroid Build Coastguard Worker    This class consists of a UDP header and payload. The example below shows how a UDP datagram can be constructed.
528*cfb92d14SAndroid Build Coastguard Worker
529*cfb92d14SAndroid Build Coastguard Worker    Example:
530*cfb92d14SAndroid Build Coastguard Worker        udp_dgram = UDPDatagram(UDPHeader(src_port=19788, dst_port=19788),
531*cfb92d14SAndroid Build Coastguard Worker                                bytes([0x00, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
532*cfb92d14SAndroid Build Coastguard Worker                                       0x00, 0x00, 0x01, 0x09, 0x01, 0x01, 0x0b, 0x03,
533*cfb92d14SAndroid Build Coastguard Worker                                       0x04, 0xc6, 0x69, 0x73, 0x51, 0x0e, 0x01, 0x80,
534*cfb92d14SAndroid Build Coastguard Worker                                       0x12, 0x02, 0x00, 0x01, 0xde, 0xad, 0xbe, 0xef]))
535*cfb92d14SAndroid Build Coastguard Worker
536*cfb92d14SAndroid Build Coastguard Worker    """
537*cfb92d14SAndroid Build Coastguard Worker
538*cfb92d14SAndroid Build Coastguard Worker    @property
539*cfb92d14SAndroid Build Coastguard Worker    def type(self):
540*cfb92d14SAndroid Build Coastguard Worker        return 17
541*cfb92d14SAndroid Build Coastguard Worker
542*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, header, payload):
543*cfb92d14SAndroid Build Coastguard Worker        super(UDPDatagram, self).__init__(header)
544*cfb92d14SAndroid Build Coastguard Worker        self.payload = payload
545*cfb92d14SAndroid Build Coastguard Worker
546*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
547*cfb92d14SAndroid Build Coastguard Worker        self.header.payload_length = len(self.payload)
548*cfb92d14SAndroid Build Coastguard Worker
549*cfb92d14SAndroid Build Coastguard Worker        data = bytearray()
550*cfb92d14SAndroid Build Coastguard Worker        data += self.header.to_bytes()
551*cfb92d14SAndroid Build Coastguard Worker        data += self.payload.to_bytes()
552*cfb92d14SAndroid Build Coastguard Worker        return data
553*cfb92d14SAndroid Build Coastguard Worker
554*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
555*cfb92d14SAndroid Build Coastguard Worker        return len(self.header) + len(self.payload)
556*cfb92d14SAndroid Build Coastguard Worker
557*cfb92d14SAndroid Build Coastguard Worker
558*cfb92d14SAndroid Build Coastguard Workerclass ICMPv6Header(ConvertibleToBytes, BuildableFromBytes):
559*cfb92d14SAndroid Build Coastguard Worker    """ Class representing ICMPv6 message header.
560*cfb92d14SAndroid Build Coastguard Worker
561*cfb92d14SAndroid Build Coastguard Worker    This header is required to construct ICMPv6 message.
562*cfb92d14SAndroid Build Coastguard Worker
563*cfb92d14SAndroid Build Coastguard Worker    """
564*cfb92d14SAndroid Build Coastguard Worker
565*cfb92d14SAndroid Build Coastguard Worker    _header_length = 4
566*cfb92d14SAndroid Build Coastguard Worker
567*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, _type, code, checksum=0):
568*cfb92d14SAndroid Build Coastguard Worker        self.type = _type
569*cfb92d14SAndroid Build Coastguard Worker        self.code = code
570*cfb92d14SAndroid Build Coastguard Worker
571*cfb92d14SAndroid Build Coastguard Worker        self.checksum = checksum
572*cfb92d14SAndroid Build Coastguard Worker
573*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
574*cfb92d14SAndroid Build Coastguard Worker        return bytearray([self.type, self.code]) + struct.pack(">H", self.checksum)
575*cfb92d14SAndroid Build Coastguard Worker
576*cfb92d14SAndroid Build Coastguard Worker    @classmethod
577*cfb92d14SAndroid Build Coastguard Worker    def from_bytes(cls, data):
578*cfb92d14SAndroid Build Coastguard Worker        _type = ord(data.read(1))
579*cfb92d14SAndroid Build Coastguard Worker        code = ord(data.read(1))
580*cfb92d14SAndroid Build Coastguard Worker        checksum = struct.unpack(">H", data.read(2))[0]
581*cfb92d14SAndroid Build Coastguard Worker
582*cfb92d14SAndroid Build Coastguard Worker        return cls(_type, code, checksum)
583*cfb92d14SAndroid Build Coastguard Worker
584*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
585*cfb92d14SAndroid Build Coastguard Worker        return self._header_length
586*cfb92d14SAndroid Build Coastguard Worker
587*cfb92d14SAndroid Build Coastguard Worker
588*cfb92d14SAndroid Build Coastguard Workerclass ICMPv6(UpperLayerProtocol):
589*cfb92d14SAndroid Build Coastguard Worker    """ Class representing ICMPv6 message.
590*cfb92d14SAndroid Build Coastguard Worker
591*cfb92d14SAndroid Build Coastguard Worker    ICMPv6 is an upper layer protocol for IPv6 so it can be passed to IPv6 packet as upper_layer_protocol.
592*cfb92d14SAndroid Build Coastguard Worker
593*cfb92d14SAndroid Build Coastguard Worker    This class consists of an ICMPv6 header and body. The example below shows how an ICMPv6 message can be constructed.
594*cfb92d14SAndroid Build Coastguard Worker
595*cfb92d14SAndroid Build Coastguard Worker    Example:
596*cfb92d14SAndroid Build Coastguard Worker        icmpv6_msg = ICMPv6(ICMPv6Header(128, 0),
597*cfb92d14SAndroid Build Coastguard Worker                            ICMPv6EchoBody(0, 2, bytes([0x80, 0x00, 0xc7, 0xbf, 0x00, 0x00, 0x00, 0x01,
598*cfb92d14SAndroid Build Coastguard Worker                                                        0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41,
599*cfb92d14SAndroid Build Coastguard Worker                                                        0x41, 0x41])))
600*cfb92d14SAndroid Build Coastguard Worker
601*cfb92d14SAndroid Build Coastguard Worker    """
602*cfb92d14SAndroid Build Coastguard Worker
603*cfb92d14SAndroid Build Coastguard Worker    @property
604*cfb92d14SAndroid Build Coastguard Worker    def type(self):
605*cfb92d14SAndroid Build Coastguard Worker        return 58
606*cfb92d14SAndroid Build Coastguard Worker
607*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, header, body):
608*cfb92d14SAndroid Build Coastguard Worker        super(ICMPv6, self).__init__(header)
609*cfb92d14SAndroid Build Coastguard Worker        self.body = body
610*cfb92d14SAndroid Build Coastguard Worker
611*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
612*cfb92d14SAndroid Build Coastguard Worker        return bytearray(self.header.to_bytes() + self.body.to_bytes())
613*cfb92d14SAndroid Build Coastguard Worker
614*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
615*cfb92d14SAndroid Build Coastguard Worker        return len(self.header) + len(self.body)
616*cfb92d14SAndroid Build Coastguard Worker
617*cfb92d14SAndroid Build Coastguard Worker
618*cfb92d14SAndroid Build Coastguard Workerclass FragmentHeader(ExtensionHeader):
619*cfb92d14SAndroid Build Coastguard Worker    """ Class representing Fragment extension header.
620*cfb92d14SAndroid Build Coastguard Worker
621*cfb92d14SAndroid Build Coastguard Worker    +-------------+----------+-----------------+-----+---+----------------+
622*cfb92d14SAndroid Build Coastguard Worker    | Next Header | Reserved | Fragment Offset | Res | M | Identification |
623*cfb92d14SAndroid Build Coastguard Worker    +-------------+----------+-----------------+-----+---+----------------+
624*cfb92d14SAndroid Build Coastguard Worker
625*cfb92d14SAndroid Build Coastguard Worker    Fragment extension header consists of:
626*cfb92d14SAndroid Build Coastguard Worker        - next_header type (8 bit)
627*cfb92d14SAndroid Build Coastguard Worker        - fragment offset which is multiple of 8 (13 bit)
628*cfb92d14SAndroid Build Coastguard Worker        - more_flag to indicate further data (1 bit)
629*cfb92d14SAndroid Build Coastguard Worker        - identification for all associated fragments (32 bit)
630*cfb92d14SAndroid Build Coastguard Worker    """
631*cfb92d14SAndroid Build Coastguard Worker
632*cfb92d14SAndroid Build Coastguard Worker    @property
633*cfb92d14SAndroid Build Coastguard Worker    def type(self):
634*cfb92d14SAndroid Build Coastguard Worker        return 44
635*cfb92d14SAndroid Build Coastguard Worker
636*cfb92d14SAndroid Build Coastguard Worker    @property
637*cfb92d14SAndroid Build Coastguard Worker    def identification(self):
638*cfb92d14SAndroid Build Coastguard Worker        return self._identification
639*cfb92d14SAndroid Build Coastguard Worker
640*cfb92d14SAndroid Build Coastguard Worker    @property
641*cfb92d14SAndroid Build Coastguard Worker    def more_flag(self):
642*cfb92d14SAndroid Build Coastguard Worker        return self._more_flag
643*cfb92d14SAndroid Build Coastguard Worker
644*cfb92d14SAndroid Build Coastguard Worker    @property
645*cfb92d14SAndroid Build Coastguard Worker    def offset(self):
646*cfb92d14SAndroid Build Coastguard Worker        return self._fragm_offset
647*cfb92d14SAndroid Build Coastguard Worker
648*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, next_header=None, fragm_offset=0, more_flag=False, identification=0):
649*cfb92d14SAndroid Build Coastguard Worker        super(FragmentHeader, self).__init__(next_header, 0)
650*cfb92d14SAndroid Build Coastguard Worker        self._fragm_offset = fragm_offset
651*cfb92d14SAndroid Build Coastguard Worker        self._more_flag = more_flag
652*cfb92d14SAndroid Build Coastguard Worker        self._identification = identification
653*cfb92d14SAndroid Build Coastguard Worker
654*cfb92d14SAndroid Build Coastguard Worker    def callculate_offset(self, position):
655*cfb92d14SAndroid Build Coastguard Worker        return position >> 3
656*cfb92d14SAndroid Build Coastguard Worker
657*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
658*cfb92d14SAndroid Build Coastguard Worker        data = bytearray([self.next_header, 0x00])
659*cfb92d14SAndroid Build Coastguard Worker        data += bytearray([self._fragm_offset >> 5, ((self._fragm_offset << 3) | self._more_flag) & 0xff])
660*cfb92d14SAndroid Build Coastguard Worker        data += struct.pack(">I", self._identification)
661*cfb92d14SAndroid Build Coastguard Worker
662*cfb92d14SAndroid Build Coastguard Worker        return data
663*cfb92d14SAndroid Build Coastguard Worker
664*cfb92d14SAndroid Build Coastguard Worker    @classmethod
665*cfb92d14SAndroid Build Coastguard Worker    def from_bytes(cls, data):
666*cfb92d14SAndroid Build Coastguard Worker        next_header = struct.unpack(">B", data.read(1))[0]
667*cfb92d14SAndroid Build Coastguard Worker        struct.unpack(">B", data.read(1))[0]  # reserved
668*cfb92d14SAndroid Build Coastguard Worker        fragment_offset = struct.unpack(">H", data.read(2))[0]
669*cfb92d14SAndroid Build Coastguard Worker        more_flag = fragment_offset & 0x1
670*cfb92d14SAndroid Build Coastguard Worker        identificaton = struct.unpack(">I", data.read(4))[0]
671*cfb92d14SAndroid Build Coastguard Worker
672*cfb92d14SAndroid Build Coastguard Worker        fragment_offset = fragment_offset >> 3
673*cfb92d14SAndroid Build Coastguard Worker
674*cfb92d14SAndroid Build Coastguard Worker        return cls(next_header, fragment_offset, more_flag, identificaton)
675*cfb92d14SAndroid Build Coastguard Worker
676*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
677*cfb92d14SAndroid Build Coastguard Worker        return 64
678*cfb92d14SAndroid Build Coastguard Worker
679*cfb92d14SAndroid Build Coastguard Worker
680*cfb92d14SAndroid Build Coastguard Workerclass HopByHop(ExtensionHeader):
681*cfb92d14SAndroid Build Coastguard Worker    """ Class representing HopByHop extension header.
682*cfb92d14SAndroid Build Coastguard Worker
683*cfb92d14SAndroid Build Coastguard Worker    HopByHop extension header consists of:
684*cfb92d14SAndroid Build Coastguard Worker        - next_header type
685*cfb92d14SAndroid Build Coastguard Worker        - extension header length which is multiple of 8
686*cfb92d14SAndroid Build Coastguard Worker        - options
687*cfb92d14SAndroid Build Coastguard Worker
688*cfb92d14SAndroid Build Coastguard Worker    """
689*cfb92d14SAndroid Build Coastguard Worker
690*cfb92d14SAndroid Build Coastguard Worker    _one_byte_padding = 0x00
691*cfb92d14SAndroid Build Coastguard Worker    _many_bytes_padding = 0x01
692*cfb92d14SAndroid Build Coastguard Worker
693*cfb92d14SAndroid Build Coastguard Worker    @property
694*cfb92d14SAndroid Build Coastguard Worker    def type(self):
695*cfb92d14SAndroid Build Coastguard Worker        return 0
696*cfb92d14SAndroid Build Coastguard Worker
697*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, next_header=None, options=None, hdr_ext_len=None):
698*cfb92d14SAndroid Build Coastguard Worker        super(HopByHop, self).__init__(next_header, hdr_ext_len)
699*cfb92d14SAndroid Build Coastguard Worker        self.options = options if options is not None else []
700*cfb92d14SAndroid Build Coastguard Worker
701*cfb92d14SAndroid Build Coastguard Worker        if hdr_ext_len is not None:
702*cfb92d14SAndroid Build Coastguard Worker            self.hdr_ext_len = hdr_ext_len
703*cfb92d14SAndroid Build Coastguard Worker        else:
704*cfb92d14SAndroid Build Coastguard Worker            payload_length = self._calculate_payload_length()
705*cfb92d14SAndroid Build Coastguard Worker            self.hdr_ext_len = self._calculate_hdr_ext_len(payload_length)
706*cfb92d14SAndroid Build Coastguard Worker
707*cfb92d14SAndroid Build Coastguard Worker    def _calculate_payload_length(self):
708*cfb92d14SAndroid Build Coastguard Worker        payload_length = 2
709*cfb92d14SAndroid Build Coastguard Worker
710*cfb92d14SAndroid Build Coastguard Worker        for option in self.options:
711*cfb92d14SAndroid Build Coastguard Worker            payload_length += len(option)
712*cfb92d14SAndroid Build Coastguard Worker
713*cfb92d14SAndroid Build Coastguard Worker        return payload_length
714*cfb92d14SAndroid Build Coastguard Worker
715*cfb92d14SAndroid Build Coastguard Worker    def _calculate_hdr_ext_len(self, payload_length):
716*cfb92d14SAndroid Build Coastguard Worker        count = payload_length >> 3
717*cfb92d14SAndroid Build Coastguard Worker
718*cfb92d14SAndroid Build Coastguard Worker        if (payload_length & 0x7) == 0 and count > 0:
719*cfb92d14SAndroid Build Coastguard Worker            return count - 1
720*cfb92d14SAndroid Build Coastguard Worker
721*cfb92d14SAndroid Build Coastguard Worker        return count
722*cfb92d14SAndroid Build Coastguard Worker
723*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
724*cfb92d14SAndroid Build Coastguard Worker        data = bytearray([self.next_header, self.hdr_ext_len])
725*cfb92d14SAndroid Build Coastguard Worker
726*cfb92d14SAndroid Build Coastguard Worker        for option in self.options:
727*cfb92d14SAndroid Build Coastguard Worker            data += option.to_bytes()
728*cfb92d14SAndroid Build Coastguard Worker
729*cfb92d14SAndroid Build Coastguard Worker        # Padding
730*cfb92d14SAndroid Build Coastguard Worker        #
731*cfb92d14SAndroid Build Coastguard Worker        # More details:
732*cfb92d14SAndroid Build Coastguard Worker        #   https://tools.ietf.org/html/rfc2460#section-4.2
733*cfb92d14SAndroid Build Coastguard Worker        #
734*cfb92d14SAndroid Build Coastguard Worker        excess_bytes = len(data) & 0x7
735*cfb92d14SAndroid Build Coastguard Worker
736*cfb92d14SAndroid Build Coastguard Worker        if excess_bytes > 0:
737*cfb92d14SAndroid Build Coastguard Worker            padding_length = 8 - excess_bytes
738*cfb92d14SAndroid Build Coastguard Worker
739*cfb92d14SAndroid Build Coastguard Worker            if padding_length == 1:
740*cfb92d14SAndroid Build Coastguard Worker                data += bytearray([self._one_byte_padding])
741*cfb92d14SAndroid Build Coastguard Worker
742*cfb92d14SAndroid Build Coastguard Worker            else:
743*cfb92d14SAndroid Build Coastguard Worker                padding_length -= 2
744*cfb92d14SAndroid Build Coastguard Worker                data += bytearray([self._many_bytes_padding, padding_length])
745*cfb92d14SAndroid Build Coastguard Worker                data += bytearray([0x00 for _ in range(padding_length)])
746*cfb92d14SAndroid Build Coastguard Worker
747*cfb92d14SAndroid Build Coastguard Worker        return data
748*cfb92d14SAndroid Build Coastguard Worker
749*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
750*cfb92d14SAndroid Build Coastguard Worker        """ HopByHop extension header length
751*cfb92d14SAndroid Build Coastguard Worker
752*cfb92d14SAndroid Build Coastguard Worker        More details:
753*cfb92d14SAndroid Build Coastguard Worker            https://tools.ietf.org/html/rfc2460#section-4.3
754*cfb92d14SAndroid Build Coastguard Worker
755*cfb92d14SAndroid Build Coastguard Worker        """
756*cfb92d14SAndroid Build Coastguard Worker        return (self.hdr_ext_len + 1) * 8
757*cfb92d14SAndroid Build Coastguard Worker
758*cfb92d14SAndroid Build Coastguard Worker
759*cfb92d14SAndroid Build Coastguard Workerclass HopByHopOptionHeader(ConvertibleToBytes, BuildableFromBytes):
760*cfb92d14SAndroid Build Coastguard Worker    """ Class representing HopByHop option header. """
761*cfb92d14SAndroid Build Coastguard Worker
762*cfb92d14SAndroid Build Coastguard Worker    _header_length = 2
763*cfb92d14SAndroid Build Coastguard Worker
764*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, _type, length=None):
765*cfb92d14SAndroid Build Coastguard Worker        self.type = _type
766*cfb92d14SAndroid Build Coastguard Worker        self.length = length if length is not None else 0
767*cfb92d14SAndroid Build Coastguard Worker
768*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
769*cfb92d14SAndroid Build Coastguard Worker        return bytearray([self.type, self.length])
770*cfb92d14SAndroid Build Coastguard Worker
771*cfb92d14SAndroid Build Coastguard Worker    @classmethod
772*cfb92d14SAndroid Build Coastguard Worker    def from_bytes(cls, data):
773*cfb92d14SAndroid Build Coastguard Worker        _type = ord(data.read(1))
774*cfb92d14SAndroid Build Coastguard Worker        length = ord(data.read(1))
775*cfb92d14SAndroid Build Coastguard Worker        return cls(_type, length)
776*cfb92d14SAndroid Build Coastguard Worker
777*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
778*cfb92d14SAndroid Build Coastguard Worker        return self._header_length
779*cfb92d14SAndroid Build Coastguard Worker
780*cfb92d14SAndroid Build Coastguard Worker    def __repr__(self):
781*cfb92d14SAndroid Build Coastguard Worker        return "HopByHopOptionHeader(type={}, length={})".format(self.type, self.length)
782*cfb92d14SAndroid Build Coastguard Worker
783*cfb92d14SAndroid Build Coastguard Worker
784*cfb92d14SAndroid Build Coastguard Workerclass HopByHopOption(ConvertibleToBytes):
785*cfb92d14SAndroid Build Coastguard Worker    """ Class representing HopByHop option.
786*cfb92d14SAndroid Build Coastguard Worker
787*cfb92d14SAndroid Build Coastguard Worker    Class consists of two elements: HopByHopOptionHeader and value (e.g. for MPLOption).
788*cfb92d14SAndroid Build Coastguard Worker
789*cfb92d14SAndroid Build Coastguard Worker    The following example shows how any HopByHop option can be constructed.
790*cfb92d14SAndroid Build Coastguard Worker
791*cfb92d14SAndroid Build Coastguard Worker    Example:
792*cfb92d14SAndroid Build Coastguard Worker        HopByHop(next_header=0x3a,
793*cfb92d14SAndroid Build Coastguard Worker                 options=[HopByHopOption(HopByHopOptionHeader(_type=0x6d),
794*cfb92d14SAndroid Build Coastguard Worker                                         MPLOption(S=1, M=0, V=0, sequence=2, seed_id=bytes([0x00, 0x18])))
795*cfb92d14SAndroid Build Coastguard Worker
796*cfb92d14SAndroid Build Coastguard Worker    """
797*cfb92d14SAndroid Build Coastguard Worker
798*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, header, value):
799*cfb92d14SAndroid Build Coastguard Worker        self.value = value
800*cfb92d14SAndroid Build Coastguard Worker
801*cfb92d14SAndroid Build Coastguard Worker        self.header = header
802*cfb92d14SAndroid Build Coastguard Worker        self.header.length = len(self.value)
803*cfb92d14SAndroid Build Coastguard Worker
804*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
805*cfb92d14SAndroid Build Coastguard Worker        return self.header.to_bytes() + self.value.to_bytes()
806*cfb92d14SAndroid Build Coastguard Worker
807*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
808*cfb92d14SAndroid Build Coastguard Worker        return len(self.header) + len(self.value)
809*cfb92d14SAndroid Build Coastguard Worker
810*cfb92d14SAndroid Build Coastguard Worker    def __repr__(self):
811*cfb92d14SAndroid Build Coastguard Worker        return "HopByHopOption(header={}, value={})".format(self.header, self.value)
812*cfb92d14SAndroid Build Coastguard Worker
813*cfb92d14SAndroid Build Coastguard Worker
814*cfb92d14SAndroid Build Coastguard Workerclass MPLOption(ConvertibleToBytes):
815*cfb92d14SAndroid Build Coastguard Worker    """ Class representing MPL option. """
816*cfb92d14SAndroid Build Coastguard Worker
817*cfb92d14SAndroid Build Coastguard Worker    _header_length = 2
818*cfb92d14SAndroid Build Coastguard Worker
819*cfb92d14SAndroid Build Coastguard Worker    _seed_id_length = {0: 0, 1: 2, 2: 8, 3: 16}
820*cfb92d14SAndroid Build Coastguard Worker
821*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, S, M, V, sequence, seed_id):
822*cfb92d14SAndroid Build Coastguard Worker        self.S = S
823*cfb92d14SAndroid Build Coastguard Worker        self.M = M
824*cfb92d14SAndroid Build Coastguard Worker        self.V = V
825*cfb92d14SAndroid Build Coastguard Worker        self.sequence = sequence
826*cfb92d14SAndroid Build Coastguard Worker        self.seed_id = seed_id
827*cfb92d14SAndroid Build Coastguard Worker
828*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
829*cfb92d14SAndroid Build Coastguard Worker        smv = (((self.S & 0x03) << 6) | ((self.M & 0x01) << 5) | ((self.V & 0x01) << 4))
830*cfb92d14SAndroid Build Coastguard Worker
831*cfb92d14SAndroid Build Coastguard Worker        return bytearray([smv, self.sequence]) + self.seed_id
832*cfb92d14SAndroid Build Coastguard Worker
833*cfb92d14SAndroid Build Coastguard Worker    @classmethod
834*cfb92d14SAndroid Build Coastguard Worker    def from_bytes(cls, data):
835*cfb92d14SAndroid Build Coastguard Worker        b = ord(data.read(1))
836*cfb92d14SAndroid Build Coastguard Worker
837*cfb92d14SAndroid Build Coastguard Worker        s = (b >> 6) & 0x03
838*cfb92d14SAndroid Build Coastguard Worker        m = (b >> 5) & 0x01
839*cfb92d14SAndroid Build Coastguard Worker        v = (b >> 4) & 0x01
840*cfb92d14SAndroid Build Coastguard Worker
841*cfb92d14SAndroid Build Coastguard Worker        sequence = ord(data.read(1))
842*cfb92d14SAndroid Build Coastguard Worker        seed_id = data.read(cls._seed_id_length[s])
843*cfb92d14SAndroid Build Coastguard Worker
844*cfb92d14SAndroid Build Coastguard Worker        return cls(s, m, v, sequence, seed_id)
845*cfb92d14SAndroid Build Coastguard Worker
846*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
847*cfb92d14SAndroid Build Coastguard Worker        return self._header_length + self._seed_id_length[self.S]
848*cfb92d14SAndroid Build Coastguard Worker
849*cfb92d14SAndroid Build Coastguard Worker    def __repr__(self):
850*cfb92d14SAndroid Build Coastguard Worker        return "MPLOption(S={}, M={}, V={}, sequence={}, seed_id={})".format(self.S, self.M, self.V, self.sequence,
851*cfb92d14SAndroid Build Coastguard Worker                                                                             hexlify(self.seed_id))
852*cfb92d14SAndroid Build Coastguard Worker
853*cfb92d14SAndroid Build Coastguard Worker
854*cfb92d14SAndroid Build Coastguard Workerclass IPv6PacketFactory(PacketFactory):
855*cfb92d14SAndroid Build Coastguard Worker    """ Factory that produces IPv6 packets from data.
856*cfb92d14SAndroid Build Coastguard Worker
857*cfb92d14SAndroid Build Coastguard Worker    This factory must be initialized with factories which allow to parse extension headers and upper layer protocols.
858*cfb92d14SAndroid Build Coastguard Worker
859*cfb92d14SAndroid Build Coastguard Worker    The following example shows preferable setup of IPv6PacketFactory.
860*cfb92d14SAndroid Build Coastguard Worker
861*cfb92d14SAndroid Build Coastguard Worker    Header types:
862*cfb92d14SAndroid Build Coastguard Worker        0: HopByHop
863*cfb92d14SAndroid Build Coastguard Worker        17: UDP
864*cfb92d14SAndroid Build Coastguard Worker        58: ICMPv6
865*cfb92d14SAndroid Build Coastguard Worker
866*cfb92d14SAndroid Build Coastguard Worker    Option types:
867*cfb92d14SAndroid Build Coastguard Worker        109: MPL
868*cfb92d14SAndroid Build Coastguard Worker
869*cfb92d14SAndroid Build Coastguard Worker    ICMPv6 body types:
870*cfb92d14SAndroid Build Coastguard Worker        128: Echo request
871*cfb92d14SAndroid Build Coastguard Worker        129: Echo response
872*cfb92d14SAndroid Build Coastguard Worker
873*cfb92d14SAndroid Build Coastguard Worker    Example usage:
874*cfb92d14SAndroid Build Coastguard Worker
875*cfb92d14SAndroid Build Coastguard Worker        ipv6_factory = IPv6PacketFactory(
876*cfb92d14SAndroid Build Coastguard Worker            ehf={
877*cfb92d14SAndroid Build Coastguard Worker                0: HopByHopFactory(options_factories={
878*cfb92d14SAndroid Build Coastguard Worker                    109: MPLOptionFactory()
879*cfb92d14SAndroid Build Coastguard Worker                })
880*cfb92d14SAndroid Build Coastguard Worker            },
881*cfb92d14SAndroid Build Coastguard Worker            ulpf={
882*cfb92d14SAndroid Build Coastguard Worker                17: UDPDatagramFactory(dst_port_factories={
883*cfb92d14SAndroid Build Coastguard Worker                    19788: MLEMessageFactory(),
884*cfb92d14SAndroid Build Coastguard Worker                    19789: CoAPMessageFactory()
885*cfb92d14SAndroid Build Coastguard Worker                }),
886*cfb92d14SAndroid Build Coastguard Worker                58: ICMPv6Factory(body_factories={
887*cfb92d14SAndroid Build Coastguard Worker                    128: ICMPv6EchoBodyFactory(),
888*cfb92d14SAndroid Build Coastguard Worker                    129: ICMPv6EchoBodyFactory()
889*cfb92d14SAndroid Build Coastguard Worker                })
890*cfb92d14SAndroid Build Coastguard Worker            }
891*cfb92d14SAndroid Build Coastguard Worker        )
892*cfb92d14SAndroid Build Coastguard Worker
893*cfb92d14SAndroid Build Coastguard Worker    """
894*cfb92d14SAndroid Build Coastguard Worker
895*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, ehf=None, ulpf=None):
896*cfb92d14SAndroid Build Coastguard Worker        """
897*cfb92d14SAndroid Build Coastguard Worker        ehf - Extension Header Factory
898*cfb92d14SAndroid Build Coastguard Worker        ulpf - Upper Layer Protocol Factory
899*cfb92d14SAndroid Build Coastguard Worker
900*cfb92d14SAndroid Build Coastguard Worker        Args:
901*cfb92d14SAndroid Build Coastguard Worker            ehf(dict[int: PacketFactory]): Dictionary mapping extension header types on specialized factories.
902*cfb92d14SAndroid Build Coastguard Worker            ulpf(dict[int: PacketFactory]): Dictionary mapping upper layer protocol types on specialized factories.
903*cfb92d14SAndroid Build Coastguard Worker        """
904*cfb92d14SAndroid Build Coastguard Worker        self._ehf = ehf if ehf is not None else {}
905*cfb92d14SAndroid Build Coastguard Worker        self._ulpf = ulpf if ulpf is not None else {}
906*cfb92d14SAndroid Build Coastguard Worker
907*cfb92d14SAndroid Build Coastguard Worker    def _is_extension_header(self, header_type):
908*cfb92d14SAndroid Build Coastguard Worker        return header_type not in UPPER_LAYER_PROTOCOLS
909*cfb92d14SAndroid Build Coastguard Worker
910*cfb92d14SAndroid Build Coastguard Worker    def _get_extension_header_factory_for(self, next_header):
911*cfb92d14SAndroid Build Coastguard Worker        try:
912*cfb92d14SAndroid Build Coastguard Worker            return self._ehf[next_header]
913*cfb92d14SAndroid Build Coastguard Worker        except KeyError:
914*cfb92d14SAndroid Build Coastguard Worker            raise RuntimeError("Could not get Extension Header factory for next_header={}.".format(next_header))
915*cfb92d14SAndroid Build Coastguard Worker
916*cfb92d14SAndroid Build Coastguard Worker    def _get_upper_layer_protocol_factory_for(self, next_header):
917*cfb92d14SAndroid Build Coastguard Worker        try:
918*cfb92d14SAndroid Build Coastguard Worker            return self._ulpf[next_header]
919*cfb92d14SAndroid Build Coastguard Worker        except KeyError:
920*cfb92d14SAndroid Build Coastguard Worker            raise RuntimeError("Could not get Upper Layer Protocol factory for next_header={}.".format(next_header))
921*cfb92d14SAndroid Build Coastguard Worker
922*cfb92d14SAndroid Build Coastguard Worker    def _parse_extension_headers(self, data, next_header, message_info):
923*cfb92d14SAndroid Build Coastguard Worker        extension_headers = []
924*cfb92d14SAndroid Build Coastguard Worker
925*cfb92d14SAndroid Build Coastguard Worker        while self._is_extension_header(next_header):
926*cfb92d14SAndroid Build Coastguard Worker            factory = self._get_extension_header_factory_for(next_header)
927*cfb92d14SAndroid Build Coastguard Worker
928*cfb92d14SAndroid Build Coastguard Worker            extension_header = factory.parse(data, message_info)
929*cfb92d14SAndroid Build Coastguard Worker
930*cfb92d14SAndroid Build Coastguard Worker            next_header = extension_header.next_header
931*cfb92d14SAndroid Build Coastguard Worker
932*cfb92d14SAndroid Build Coastguard Worker            extension_headers.append(extension_header)
933*cfb92d14SAndroid Build Coastguard Worker
934*cfb92d14SAndroid Build Coastguard Worker        return next_header, extension_headers
935*cfb92d14SAndroid Build Coastguard Worker
936*cfb92d14SAndroid Build Coastguard Worker    def _parse_upper_layer_protocol(self, data, next_header, message_info):
937*cfb92d14SAndroid Build Coastguard Worker        factory = self._get_upper_layer_protocol_factory_for(next_header)
938*cfb92d14SAndroid Build Coastguard Worker
939*cfb92d14SAndroid Build Coastguard Worker        return factory.parse(data, message_info)
940*cfb92d14SAndroid Build Coastguard Worker
941*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
942*cfb92d14SAndroid Build Coastguard Worker        ipv6_header = IPv6Header.from_bytes(data)
943*cfb92d14SAndroid Build Coastguard Worker
944*cfb92d14SAndroid Build Coastguard Worker        message_info.source_ipv6 = ipv6_header.source_address
945*cfb92d14SAndroid Build Coastguard Worker        message_info.destination_ipv6 = ipv6_header.destination_address
946*cfb92d14SAndroid Build Coastguard Worker
947*cfb92d14SAndroid Build Coastguard Worker        next_header, extension_headers = self._parse_extension_headers(data, ipv6_header.next_header, message_info)
948*cfb92d14SAndroid Build Coastguard Worker
949*cfb92d14SAndroid Build Coastguard Worker        upper_layer_protocol = self._parse_upper_layer_protocol(data, next_header, message_info)
950*cfb92d14SAndroid Build Coastguard Worker
951*cfb92d14SAndroid Build Coastguard Worker        return IPv6Packet(ipv6_header, upper_layer_protocol, extension_headers)
952*cfb92d14SAndroid Build Coastguard Worker
953*cfb92d14SAndroid Build Coastguard Worker
954*cfb92d14SAndroid Build Coastguard Workerclass HopByHopOptionsFactory(object):
955*cfb92d14SAndroid Build Coastguard Worker    """ Factory that produces HopByHop options. """
956*cfb92d14SAndroid Build Coastguard Worker
957*cfb92d14SAndroid Build Coastguard Worker    _one_byte_padding = 0x00
958*cfb92d14SAndroid Build Coastguard Worker    _many_bytes_padding = 0x01
959*cfb92d14SAndroid Build Coastguard Worker
960*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, options_factories=None):
961*cfb92d14SAndroid Build Coastguard Worker        self._options_factories = (options_factories if options_factories is not None else {})
962*cfb92d14SAndroid Build Coastguard Worker
963*cfb92d14SAndroid Build Coastguard Worker    def _get_HopByHopOption_value_factory(self, _type):
964*cfb92d14SAndroid Build Coastguard Worker        try:
965*cfb92d14SAndroid Build Coastguard Worker            return self._options_factories[_type]
966*cfb92d14SAndroid Build Coastguard Worker        except KeyError:
967*cfb92d14SAndroid Build Coastguard Worker            raise RuntimeError("Could not find HopByHopOption value factory for type={}.".format(_type))
968*cfb92d14SAndroid Build Coastguard Worker
969*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
970*cfb92d14SAndroid Build Coastguard Worker        options = []
971*cfb92d14SAndroid Build Coastguard Worker
972*cfb92d14SAndroid Build Coastguard Worker        while data.tell() < len(data.getvalue()):
973*cfb92d14SAndroid Build Coastguard Worker            option_header = HopByHopOptionHeader.from_bytes(data)
974*cfb92d14SAndroid Build Coastguard Worker
975*cfb92d14SAndroid Build Coastguard Worker            if option_header.type == self._one_byte_padding:
976*cfb92d14SAndroid Build Coastguard Worker                # skip one byte padding
977*cfb92d14SAndroid Build Coastguard Worker                data.read(1)
978*cfb92d14SAndroid Build Coastguard Worker
979*cfb92d14SAndroid Build Coastguard Worker            elif option_header.type == self._many_bytes_padding:
980*cfb92d14SAndroid Build Coastguard Worker                # skip n bytes padding
981*cfb92d14SAndroid Build Coastguard Worker                data.read(option_header.length)
982*cfb92d14SAndroid Build Coastguard Worker
983*cfb92d14SAndroid Build Coastguard Worker            else:
984*cfb92d14SAndroid Build Coastguard Worker                factory = self._get_HopByHopOption_value_factory(option_header.type)
985*cfb92d14SAndroid Build Coastguard Worker
986*cfb92d14SAndroid Build Coastguard Worker                option_data = data.read(option_header.length)
987*cfb92d14SAndroid Build Coastguard Worker
988*cfb92d14SAndroid Build Coastguard Worker                option = HopByHopOption(
989*cfb92d14SAndroid Build Coastguard Worker                    option_header,
990*cfb92d14SAndroid Build Coastguard Worker                    factory.parse(io.BytesIO(option_data), message_info),
991*cfb92d14SAndroid Build Coastguard Worker                )
992*cfb92d14SAndroid Build Coastguard Worker
993*cfb92d14SAndroid Build Coastguard Worker                options.append(option)
994*cfb92d14SAndroid Build Coastguard Worker
995*cfb92d14SAndroid Build Coastguard Worker        return options
996*cfb92d14SAndroid Build Coastguard Worker
997*cfb92d14SAndroid Build Coastguard Worker
998*cfb92d14SAndroid Build Coastguard Workerclass HopByHopFactory(PacketFactory):
999*cfb92d14SAndroid Build Coastguard Worker    """ Factory that produces HopByHop extension headers from data. """
1000*cfb92d14SAndroid Build Coastguard Worker
1001*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, hop_by_hop_options_factory):
1002*cfb92d14SAndroid Build Coastguard Worker        self._hop_by_hop_options_factory = hop_by_hop_options_factory
1003*cfb92d14SAndroid Build Coastguard Worker
1004*cfb92d14SAndroid Build Coastguard Worker    def _calculate_extension_header_length(self, hdr_ext_len):
1005*cfb92d14SAndroid Build Coastguard Worker        return (hdr_ext_len + 1) * 8
1006*cfb92d14SAndroid Build Coastguard Worker
1007*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
1008*cfb92d14SAndroid Build Coastguard Worker        next_header = ord(data.read(1))
1009*cfb92d14SAndroid Build Coastguard Worker
1010*cfb92d14SAndroid Build Coastguard Worker        hdr_ext_len = ord(data.read(1))
1011*cfb92d14SAndroid Build Coastguard Worker
1012*cfb92d14SAndroid Build Coastguard Worker        # Note! Two bytes were read (next_header and hdr_ext_len) so they must
1013*cfb92d14SAndroid Build Coastguard Worker        # be subtracted from header length
1014*cfb92d14SAndroid Build Coastguard Worker        hop_by_hop_length = (self._calculate_extension_header_length(hdr_ext_len) - 2)
1015*cfb92d14SAndroid Build Coastguard Worker
1016*cfb92d14SAndroid Build Coastguard Worker        hop_by_hop_data = data.read(hop_by_hop_length)
1017*cfb92d14SAndroid Build Coastguard Worker
1018*cfb92d14SAndroid Build Coastguard Worker        options = self._hop_by_hop_options_factory.parse(io.BytesIO(hop_by_hop_data), message_info)
1019*cfb92d14SAndroid Build Coastguard Worker
1020*cfb92d14SAndroid Build Coastguard Worker        hop_by_hop = HopByHop(next_header, options, hdr_ext_len)
1021*cfb92d14SAndroid Build Coastguard Worker
1022*cfb92d14SAndroid Build Coastguard Worker        message_info.payload_length += len(hop_by_hop)
1023*cfb92d14SAndroid Build Coastguard Worker
1024*cfb92d14SAndroid Build Coastguard Worker        return hop_by_hop
1025*cfb92d14SAndroid Build Coastguard Worker
1026*cfb92d14SAndroid Build Coastguard Worker
1027*cfb92d14SAndroid Build Coastguard Workerclass MPLOptionFactory(PacketFactory):
1028*cfb92d14SAndroid Build Coastguard Worker    """ Factory that produces MPL options for HopByHop extension header. """
1029*cfb92d14SAndroid Build Coastguard Worker
1030*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
1031*cfb92d14SAndroid Build Coastguard Worker        return MPLOption.from_bytes(data)
1032*cfb92d14SAndroid Build Coastguard Worker
1033*cfb92d14SAndroid Build Coastguard Worker
1034*cfb92d14SAndroid Build Coastguard Workerclass UDPHeaderFactory:
1035*cfb92d14SAndroid Build Coastguard Worker    """ Factory that produces UDP header. """
1036*cfb92d14SAndroid Build Coastguard Worker
1037*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
1038*cfb92d14SAndroid Build Coastguard Worker        return UDPHeader.from_bytes(data)
1039*cfb92d14SAndroid Build Coastguard Worker
1040*cfb92d14SAndroid Build Coastguard Worker
1041*cfb92d14SAndroid Build Coastguard Workerclass UdpBasedOnSrcDstPortsPayloadFactory:
1042*cfb92d14SAndroid Build Coastguard Worker
1043*cfb92d14SAndroid Build Coastguard Worker    # TODO: Unittests
1044*cfb92d14SAndroid Build Coastguard Worker    """ Factory that produces UDP payload. """
1045*cfb92d14SAndroid Build Coastguard Worker
1046*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, src_dst_port_based_payload_factories):
1047*cfb92d14SAndroid Build Coastguard Worker        """
1048*cfb92d14SAndroid Build Coastguard Worker        Args:
1049*cfb92d14SAndroid Build Coastguard Worker            src_dst_port_based_payload_factories (PacketFactory):
1050*cfb92d14SAndroid Build Coastguard Worker                Factories parse UDP payload based on source or
1051*cfb92d14SAndroid Build Coastguard Worker                destination port.
1052*cfb92d14SAndroid Build Coastguard Worker        """
1053*cfb92d14SAndroid Build Coastguard Worker        self._factories = src_dst_port_based_payload_factories
1054*cfb92d14SAndroid Build Coastguard Worker
1055*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
1056*cfb92d14SAndroid Build Coastguard Worker        factory = None
1057*cfb92d14SAndroid Build Coastguard Worker
1058*cfb92d14SAndroid Build Coastguard Worker        if message_info.dst_port in self._factories:
1059*cfb92d14SAndroid Build Coastguard Worker            factory = self._factories[message_info.dst_port]
1060*cfb92d14SAndroid Build Coastguard Worker
1061*cfb92d14SAndroid Build Coastguard Worker        if message_info.src_port in self._factories:
1062*cfb92d14SAndroid Build Coastguard Worker            factory = self._factories[message_info.src_port]
1063*cfb92d14SAndroid Build Coastguard Worker
1064*cfb92d14SAndroid Build Coastguard Worker        if message_info.dst_port == common.UDP_TEST_PORT:
1065*cfb92d14SAndroid Build Coastguard Worker            # Ignore traffic targeted to test port
1066*cfb92d14SAndroid Build Coastguard Worker            return None
1067*cfb92d14SAndroid Build Coastguard Worker        elif factory is None:
1068*cfb92d14SAndroid Build Coastguard Worker            raise RuntimeError("Could not find factory to build UDP payload.")
1069*cfb92d14SAndroid Build Coastguard Worker
1070*cfb92d14SAndroid Build Coastguard Worker        return factory.parse(data, message_info)
1071*cfb92d14SAndroid Build Coastguard Worker
1072*cfb92d14SAndroid Build Coastguard Worker
1073*cfb92d14SAndroid Build Coastguard Workerclass UDPDatagramFactory(PacketFactory):
1074*cfb92d14SAndroid Build Coastguard Worker
1075*cfb92d14SAndroid Build Coastguard Worker    # TODO: Unittests
1076*cfb92d14SAndroid Build Coastguard Worker    """ Factory that produces UDP datagrams. """
1077*cfb92d14SAndroid Build Coastguard Worker
1078*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, udp_header_factory, udp_payload_factory):
1079*cfb92d14SAndroid Build Coastguard Worker        self._udp_header_factory = udp_header_factory
1080*cfb92d14SAndroid Build Coastguard Worker        self._udp_payload_factory = udp_payload_factory
1081*cfb92d14SAndroid Build Coastguard Worker
1082*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
1083*cfb92d14SAndroid Build Coastguard Worker        header = self._udp_header_factory.parse(data, message_info)
1084*cfb92d14SAndroid Build Coastguard Worker
1085*cfb92d14SAndroid Build Coastguard Worker        # Update message payload length: UDP header (8B) + payload length
1086*cfb92d14SAndroid Build Coastguard Worker        message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell())
1087*cfb92d14SAndroid Build Coastguard Worker
1088*cfb92d14SAndroid Build Coastguard Worker        message_info.src_port = header.src_port
1089*cfb92d14SAndroid Build Coastguard Worker        message_info.dst_port = header.dst_port
1090*cfb92d14SAndroid Build Coastguard Worker
1091*cfb92d14SAndroid Build Coastguard Worker        payload = self._udp_payload_factory.parse(data, message_info)
1092*cfb92d14SAndroid Build Coastguard Worker
1093*cfb92d14SAndroid Build Coastguard Worker        return UDPDatagram(header, payload)
1094*cfb92d14SAndroid Build Coastguard Worker
1095*cfb92d14SAndroid Build Coastguard Worker
1096*cfb92d14SAndroid Build Coastguard Workerclass ICMPv6Factory(PacketFactory):
1097*cfb92d14SAndroid Build Coastguard Worker    """ Factory that produces ICMPv6 messages from data. """
1098*cfb92d14SAndroid Build Coastguard Worker
1099*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, body_factories=None):
1100*cfb92d14SAndroid Build Coastguard Worker        self._body_factories = (body_factories if body_factories is not None else {})
1101*cfb92d14SAndroid Build Coastguard Worker
1102*cfb92d14SAndroid Build Coastguard Worker    def _get_icmpv6_body_factory(self, _type):
1103*cfb92d14SAndroid Build Coastguard Worker        try:
1104*cfb92d14SAndroid Build Coastguard Worker            return self._body_factories[_type]
1105*cfb92d14SAndroid Build Coastguard Worker
1106*cfb92d14SAndroid Build Coastguard Worker        except KeyError:
1107*cfb92d14SAndroid Build Coastguard Worker            if "default" not in self._body_factories:
1108*cfb92d14SAndroid Build Coastguard Worker                raise RuntimeError("Could not find specialized factory to parse ICMP body. "
1109*cfb92d14SAndroid Build Coastguard Worker                                   "Unsupported ICMP type: {}".format(_type))
1110*cfb92d14SAndroid Build Coastguard Worker
1111*cfb92d14SAndroid Build Coastguard Worker            default_factory = self._body_factories["default"]
1112*cfb92d14SAndroid Build Coastguard Worker
1113*cfb92d14SAndroid Build Coastguard Worker            print("Could not find specialized factory to parse ICMP body. "
1114*cfb92d14SAndroid Build Coastguard Worker                  "Take the default one: {}".format(type(default_factory)))
1115*cfb92d14SAndroid Build Coastguard Worker
1116*cfb92d14SAndroid Build Coastguard Worker            return default_factory
1117*cfb92d14SAndroid Build Coastguard Worker
1118*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
1119*cfb92d14SAndroid Build Coastguard Worker        header = ICMPv6Header.from_bytes(data)
1120*cfb92d14SAndroid Build Coastguard Worker
1121*cfb92d14SAndroid Build Coastguard Worker        factory = self._get_icmpv6_body_factory(header.type)
1122*cfb92d14SAndroid Build Coastguard Worker
1123*cfb92d14SAndroid Build Coastguard Worker        message_info.payload_length += len(header) + (len(data.getvalue()) - data.tell())
1124*cfb92d14SAndroid Build Coastguard Worker
1125*cfb92d14SAndroid Build Coastguard Worker        return ICMPv6(header, factory.parse(data, message_info))
1126*cfb92d14SAndroid Build Coastguard Worker
1127*cfb92d14SAndroid Build Coastguard Worker
1128*cfb92d14SAndroid Build Coastguard Workerclass ICMPv6EchoBodyFactory(PacketFactory):
1129*cfb92d14SAndroid Build Coastguard Worker    """ Factory that produces ICMPv6 echo message body. """
1130*cfb92d14SAndroid Build Coastguard Worker
1131*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
1132*cfb92d14SAndroid Build Coastguard Worker        return ICMPv6EchoBody.from_bytes(data)
1133*cfb92d14SAndroid Build Coastguard Worker
1134*cfb92d14SAndroid Build Coastguard Worker
1135*cfb92d14SAndroid Build Coastguard Workerclass BytesPayload(ConvertibleToBytes, BuildableFromBytes):
1136*cfb92d14SAndroid Build Coastguard Worker    """ Class representing bytes payload. """
1137*cfb92d14SAndroid Build Coastguard Worker
1138*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, data):
1139*cfb92d14SAndroid Build Coastguard Worker        self.data = data
1140*cfb92d14SAndroid Build Coastguard Worker
1141*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
1142*cfb92d14SAndroid Build Coastguard Worker        return bytearray(self.data)
1143*cfb92d14SAndroid Build Coastguard Worker
1144*cfb92d14SAndroid Build Coastguard Worker    @classmethod
1145*cfb92d14SAndroid Build Coastguard Worker    def from_bytes(cls, data):
1146*cfb92d14SAndroid Build Coastguard Worker        return cls(data)
1147*cfb92d14SAndroid Build Coastguard Worker
1148*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
1149*cfb92d14SAndroid Build Coastguard Worker        return len(self.data)
1150*cfb92d14SAndroid Build Coastguard Worker
1151*cfb92d14SAndroid Build Coastguard Worker
1152*cfb92d14SAndroid Build Coastguard Workerclass BytesPayloadFactory(PacketFactory):
1153*cfb92d14SAndroid Build Coastguard Worker    """ Factory that produces bytes payload. """
1154*cfb92d14SAndroid Build Coastguard Worker
1155*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
1156*cfb92d14SAndroid Build Coastguard Worker        return BytesPayload(data.read())
1157*cfb92d14SAndroid Build Coastguard Worker
1158*cfb92d14SAndroid Build Coastguard Worker
1159*cfb92d14SAndroid Build Coastguard Workerclass ICMPv6EchoBody(ConvertibleToBytes, BuildableFromBytes):
1160*cfb92d14SAndroid Build Coastguard Worker    """ Class representing body of ICMPv6 echo messages. """
1161*cfb92d14SAndroid Build Coastguard Worker
1162*cfb92d14SAndroid Build Coastguard Worker    _header_length = 4
1163*cfb92d14SAndroid Build Coastguard Worker
1164*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, identifier, sequence_number, data):
1165*cfb92d14SAndroid Build Coastguard Worker        self.identifier = identifier
1166*cfb92d14SAndroid Build Coastguard Worker        self.sequence_number = sequence_number
1167*cfb92d14SAndroid Build Coastguard Worker        self.data = data
1168*cfb92d14SAndroid Build Coastguard Worker
1169*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
1170*cfb92d14SAndroid Build Coastguard Worker        data = struct.pack(">H", self.identifier)
1171*cfb92d14SAndroid Build Coastguard Worker        data += struct.pack(">H", self.sequence_number)
1172*cfb92d14SAndroid Build Coastguard Worker        data += self.data
1173*cfb92d14SAndroid Build Coastguard Worker        return data
1174*cfb92d14SAndroid Build Coastguard Worker
1175*cfb92d14SAndroid Build Coastguard Worker    @classmethod
1176*cfb92d14SAndroid Build Coastguard Worker    def from_bytes(cls, data):
1177*cfb92d14SAndroid Build Coastguard Worker        identifier = struct.unpack(">H", data.read(2))[0]
1178*cfb92d14SAndroid Build Coastguard Worker        sequence_number = struct.unpack(">H", data.read(2))[0]
1179*cfb92d14SAndroid Build Coastguard Worker
1180*cfb92d14SAndroid Build Coastguard Worker        return cls(identifier, sequence_number, data.read())
1181*cfb92d14SAndroid Build Coastguard Worker
1182*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
1183*cfb92d14SAndroid Build Coastguard Worker        return self._header_length + len(self.data)
1184*cfb92d14SAndroid Build Coastguard Worker
1185*cfb92d14SAndroid Build Coastguard Worker
1186*cfb92d14SAndroid Build Coastguard Workerclass ICMPv6DestinationUnreachableFactory(PacketFactory):
1187*cfb92d14SAndroid Build Coastguard Worker    """ Factory that produces ICMPv6 echo message body. """
1188*cfb92d14SAndroid Build Coastguard Worker
1189*cfb92d14SAndroid Build Coastguard Worker    def parse(self, data, message_info):
1190*cfb92d14SAndroid Build Coastguard Worker        return ICMPv6DestinationUnreachable.from_bytes(data)
1191*cfb92d14SAndroid Build Coastguard Worker
1192*cfb92d14SAndroid Build Coastguard Worker
1193*cfb92d14SAndroid Build Coastguard Workerclass ICMPv6DestinationUnreachable(ConvertibleToBytes, BuildableFromBytes):
1194*cfb92d14SAndroid Build Coastguard Worker    """ Class representing body of ICMPv6 Destination Unreachable messages. """
1195*cfb92d14SAndroid Build Coastguard Worker
1196*cfb92d14SAndroid Build Coastguard Worker    _header_length = 4
1197*cfb92d14SAndroid Build Coastguard Worker    _unused = 0
1198*cfb92d14SAndroid Build Coastguard Worker
1199*cfb92d14SAndroid Build Coastguard Worker    def __init__(self, data):
1200*cfb92d14SAndroid Build Coastguard Worker        self.data = data
1201*cfb92d14SAndroid Build Coastguard Worker
1202*cfb92d14SAndroid Build Coastguard Worker    def to_bytes(self):
1203*cfb92d14SAndroid Build Coastguard Worker        data = bytearray(struct.pack(">I", self._unused))
1204*cfb92d14SAndroid Build Coastguard Worker        data += self.data
1205*cfb92d14SAndroid Build Coastguard Worker        return data
1206*cfb92d14SAndroid Build Coastguard Worker
1207*cfb92d14SAndroid Build Coastguard Worker    @classmethod
1208*cfb92d14SAndroid Build Coastguard Worker    def from_bytes(cls, data):
1209*cfb92d14SAndroid Build Coastguard Worker        unused = struct.unpack(">I", data.read(4))[0]
1210*cfb92d14SAndroid Build Coastguard Worker        if unused != 0:
1211*cfb92d14SAndroid Build Coastguard Worker            raise RuntimeError(
1212*cfb92d14SAndroid Build Coastguard Worker                "Invalid value of unused field in the ICMPv6 Destination Unreachable data. Expected value: 0.")
1213*cfb92d14SAndroid Build Coastguard Worker
1214*cfb92d14SAndroid Build Coastguard Worker        return cls(bytearray(data.read()))
1215*cfb92d14SAndroid Build Coastguard Worker
1216*cfb92d14SAndroid Build Coastguard Worker    def __len__(self):
1217*cfb92d14SAndroid Build Coastguard Worker        return self._header_length + len(self.data)
1218