xref: /aosp_15_r20/kernel/tests/net/test/packets.py (revision 2f2c4c7ab4226c71756b9c31670392fdd6887c4f)
1*2f2c4c7aSAndroid Build Coastguard Worker#!/usr/bin/python3
2*2f2c4c7aSAndroid Build Coastguard Worker#
3*2f2c4c7aSAndroid Build Coastguard Worker# Copyright 2015 The Android Open Source Project
4*2f2c4c7aSAndroid Build Coastguard Worker#
5*2f2c4c7aSAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License");
6*2f2c4c7aSAndroid Build Coastguard Worker# you may not use this file except in compliance with the License.
7*2f2c4c7aSAndroid Build Coastguard Worker# You may obtain a copy of the License at
8*2f2c4c7aSAndroid Build Coastguard Worker#
9*2f2c4c7aSAndroid Build Coastguard Worker# http://www.apache.org/licenses/LICENSE-2.0
10*2f2c4c7aSAndroid Build Coastguard Worker#
11*2f2c4c7aSAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software
12*2f2c4c7aSAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS,
13*2f2c4c7aSAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*2f2c4c7aSAndroid Build Coastguard Worker# See the License for the specific language governing permissions and
15*2f2c4c7aSAndroid Build Coastguard Worker# limitations under the License.
16*2f2c4c7aSAndroid Build Coastguard Worker
17*2f2c4c7aSAndroid Build Coastguard Workerimport random
18*2f2c4c7aSAndroid Build Coastguard Worker
19*2f2c4c7aSAndroid Build Coastguard Workerfrom scapy import all as scapy
20*2f2c4c7aSAndroid Build Coastguard Workerfrom socket import *
21*2f2c4c7aSAndroid Build Coastguard Worker
22*2f2c4c7aSAndroid Build Coastguard Workerimport net_test
23*2f2c4c7aSAndroid Build Coastguard Worker
24*2f2c4c7aSAndroid Build Coastguard WorkerTCP_FIN = 1
25*2f2c4c7aSAndroid Build Coastguard WorkerTCP_SYN = 2
26*2f2c4c7aSAndroid Build Coastguard WorkerTCP_RST = 4
27*2f2c4c7aSAndroid Build Coastguard WorkerTCP_PSH = 8
28*2f2c4c7aSAndroid Build Coastguard WorkerTCP_ACK = 16
29*2f2c4c7aSAndroid Build Coastguard Worker
30*2f2c4c7aSAndroid Build Coastguard WorkerTCP_WINDOW = 14400
31*2f2c4c7aSAndroid Build Coastguard Worker
32*2f2c4c7aSAndroid Build Coastguard WorkerPTB_MTU = 1280
33*2f2c4c7aSAndroid Build Coastguard Worker
34*2f2c4c7aSAndroid Build Coastguard WorkerPING_IDENT = 0xff19
35*2f2c4c7aSAndroid Build Coastguard WorkerPING_PAYLOAD = b"foobarbaz"
36*2f2c4c7aSAndroid Build Coastguard WorkerPING_SEQ = 3
37*2f2c4c7aSAndroid Build Coastguard WorkerPING_TOS = 0x83
38*2f2c4c7aSAndroid Build Coastguard Worker
39*2f2c4c7aSAndroid Build Coastguard Worker# For brevity.
40*2f2c4c7aSAndroid Build Coastguard WorkerUDP_PAYLOAD = net_test.UDP_PAYLOAD
41*2f2c4c7aSAndroid Build Coastguard Worker
42*2f2c4c7aSAndroid Build Coastguard Worker
43*2f2c4c7aSAndroid Build Coastguard Workerdef _RandomPort():
44*2f2c4c7aSAndroid Build Coastguard Worker  return random.randint(1025, 65535)
45*2f2c4c7aSAndroid Build Coastguard Worker
46*2f2c4c7aSAndroid Build Coastguard Workerdef _GetIpLayer(version):
47*2f2c4c7aSAndroid Build Coastguard Worker  return {4: scapy.IP, 5: scapy.IP, 6: scapy.IPv6}[version]
48*2f2c4c7aSAndroid Build Coastguard Worker
49*2f2c4c7aSAndroid Build Coastguard Workerdef _SetPacketTos(packet, tos):
50*2f2c4c7aSAndroid Build Coastguard Worker  if isinstance(packet, scapy.IPv6):
51*2f2c4c7aSAndroid Build Coastguard Worker    packet.tc = tos
52*2f2c4c7aSAndroid Build Coastguard Worker  elif isinstance(packet, scapy.IP):
53*2f2c4c7aSAndroid Build Coastguard Worker    packet.tos = tos
54*2f2c4c7aSAndroid Build Coastguard Worker  else:
55*2f2c4c7aSAndroid Build Coastguard Worker    raise ValueError("Can't find ToS Field")
56*2f2c4c7aSAndroid Build Coastguard Worker
57*2f2c4c7aSAndroid Build Coastguard Workerdef UDP(version, srcaddr, dstaddr, sport=0):
58*2f2c4c7aSAndroid Build Coastguard Worker  ip = _GetIpLayer(version)
59*2f2c4c7aSAndroid Build Coastguard Worker  # Can't just use "if sport" because None has meaning (it means unspecified).
60*2f2c4c7aSAndroid Build Coastguard Worker  if sport == 0:
61*2f2c4c7aSAndroid Build Coastguard Worker    sport = _RandomPort()
62*2f2c4c7aSAndroid Build Coastguard Worker  return ("UDPv%d packet" % version,
63*2f2c4c7aSAndroid Build Coastguard Worker          ip(src=srcaddr, dst=dstaddr) /
64*2f2c4c7aSAndroid Build Coastguard Worker          scapy.UDP(sport=sport, dport=53) / UDP_PAYLOAD)
65*2f2c4c7aSAndroid Build Coastguard Worker
66*2f2c4c7aSAndroid Build Coastguard Workerdef UDPWithOptions(version, srcaddr, dstaddr, sport=0, lifetime=39):
67*2f2c4c7aSAndroid Build Coastguard Worker  if version == 4:
68*2f2c4c7aSAndroid Build Coastguard Worker    packet = (scapy.IP(src=srcaddr, dst=dstaddr, ttl=lifetime, tos=0x83) /
69*2f2c4c7aSAndroid Build Coastguard Worker              scapy.UDP(sport=sport, dport=53) /
70*2f2c4c7aSAndroid Build Coastguard Worker              UDP_PAYLOAD)
71*2f2c4c7aSAndroid Build Coastguard Worker  else:
72*2f2c4c7aSAndroid Build Coastguard Worker    packet = (scapy.IPv6(src=srcaddr, dst=dstaddr,
73*2f2c4c7aSAndroid Build Coastguard Worker                         fl=0xbeef, hlim=lifetime, tc=0x83) /
74*2f2c4c7aSAndroid Build Coastguard Worker              scapy.UDP(sport=sport, dport=53) /
75*2f2c4c7aSAndroid Build Coastguard Worker              UDP_PAYLOAD)
76*2f2c4c7aSAndroid Build Coastguard Worker  return ("UDPv%d packet with options" % version, packet)
77*2f2c4c7aSAndroid Build Coastguard Worker
78*2f2c4c7aSAndroid Build Coastguard Workerdef SYN(dport, version, srcaddr, dstaddr, sport=0, seq=-1):
79*2f2c4c7aSAndroid Build Coastguard Worker  ip = _GetIpLayer(version)
80*2f2c4c7aSAndroid Build Coastguard Worker  if sport == 0:
81*2f2c4c7aSAndroid Build Coastguard Worker    sport = _RandomPort()
82*2f2c4c7aSAndroid Build Coastguard Worker  if seq == -1:  # Can't use None because it means unspecified.
83*2f2c4c7aSAndroid Build Coastguard Worker    seq = random.getrandbits(32)
84*2f2c4c7aSAndroid Build Coastguard Worker  return ("TCP SYN",
85*2f2c4c7aSAndroid Build Coastguard Worker          ip(src=srcaddr, dst=dstaddr) /
86*2f2c4c7aSAndroid Build Coastguard Worker          scapy.TCP(sport=sport, dport=dport,
87*2f2c4c7aSAndroid Build Coastguard Worker                    seq=seq, ack=0,
88*2f2c4c7aSAndroid Build Coastguard Worker                    flags=TCP_SYN, window=TCP_WINDOW))
89*2f2c4c7aSAndroid Build Coastguard Worker
90*2f2c4c7aSAndroid Build Coastguard Workerdef RST(version, srcaddr, dstaddr, packet, sent_fin=False):
91*2f2c4c7aSAndroid Build Coastguard Worker  ip = _GetIpLayer(version)
92*2f2c4c7aSAndroid Build Coastguard Worker  original = packet.getlayer("TCP")
93*2f2c4c7aSAndroid Build Coastguard Worker  was_syn_or_fin = (original.flags & (TCP_SYN | TCP_FIN)) != 0
94*2f2c4c7aSAndroid Build Coastguard Worker  return ("TCP RST",
95*2f2c4c7aSAndroid Build Coastguard Worker          ip(src=srcaddr, dst=dstaddr) /
96*2f2c4c7aSAndroid Build Coastguard Worker          scapy.TCP(sport=original.dport, dport=original.sport,
97*2f2c4c7aSAndroid Build Coastguard Worker                    ack=original.seq + was_syn_or_fin,
98*2f2c4c7aSAndroid Build Coastguard Worker                    seq=original.ack + sent_fin,
99*2f2c4c7aSAndroid Build Coastguard Worker                    flags=TCP_RST | TCP_ACK, window=TCP_WINDOW))
100*2f2c4c7aSAndroid Build Coastguard Worker
101*2f2c4c7aSAndroid Build Coastguard Workerdef SYNACK(version, srcaddr, dstaddr, packet):
102*2f2c4c7aSAndroid Build Coastguard Worker  ip = _GetIpLayer(version)
103*2f2c4c7aSAndroid Build Coastguard Worker  original = packet.getlayer("TCP")
104*2f2c4c7aSAndroid Build Coastguard Worker  return ("TCP SYN+ACK",
105*2f2c4c7aSAndroid Build Coastguard Worker          ip(src=srcaddr, dst=dstaddr) /
106*2f2c4c7aSAndroid Build Coastguard Worker          scapy.TCP(sport=original.dport, dport=original.sport,
107*2f2c4c7aSAndroid Build Coastguard Worker                    ack=original.seq + 1, seq=None,
108*2f2c4c7aSAndroid Build Coastguard Worker                    flags=TCP_SYN | TCP_ACK, window=None))
109*2f2c4c7aSAndroid Build Coastguard Worker
110*2f2c4c7aSAndroid Build Coastguard Workerdef ACK(version, srcaddr, dstaddr, packet, payload=b""):
111*2f2c4c7aSAndroid Build Coastguard Worker  ip = _GetIpLayer(version)
112*2f2c4c7aSAndroid Build Coastguard Worker  original = packet.getlayer("TCP")
113*2f2c4c7aSAndroid Build Coastguard Worker  was_syn_or_fin = (original.flags & (TCP_SYN | TCP_FIN)) != 0
114*2f2c4c7aSAndroid Build Coastguard Worker  ack_delta = was_syn_or_fin + len(original.payload)
115*2f2c4c7aSAndroid Build Coastguard Worker  desc = "TCP data" if payload else "TCP ACK"
116*2f2c4c7aSAndroid Build Coastguard Worker  flags = TCP_ACK | TCP_PSH if payload else TCP_ACK
117*2f2c4c7aSAndroid Build Coastguard Worker  return (desc,
118*2f2c4c7aSAndroid Build Coastguard Worker          ip(src=srcaddr, dst=dstaddr) /
119*2f2c4c7aSAndroid Build Coastguard Worker          scapy.TCP(sport=original.dport, dport=original.sport,
120*2f2c4c7aSAndroid Build Coastguard Worker                    ack=original.seq + ack_delta, seq=original.ack,
121*2f2c4c7aSAndroid Build Coastguard Worker                    flags=flags, window=TCP_WINDOW) /
122*2f2c4c7aSAndroid Build Coastguard Worker          payload)
123*2f2c4c7aSAndroid Build Coastguard Worker
124*2f2c4c7aSAndroid Build Coastguard Workerdef FIN(version, srcaddr, dstaddr, packet):
125*2f2c4c7aSAndroid Build Coastguard Worker  ip = _GetIpLayer(version)
126*2f2c4c7aSAndroid Build Coastguard Worker  original = packet.getlayer("TCP")
127*2f2c4c7aSAndroid Build Coastguard Worker  was_syn_or_fin = (original.flags & (TCP_SYN | TCP_FIN)) != 0
128*2f2c4c7aSAndroid Build Coastguard Worker  ack_delta = was_syn_or_fin + len(original.payload)
129*2f2c4c7aSAndroid Build Coastguard Worker  return ("TCP FIN",
130*2f2c4c7aSAndroid Build Coastguard Worker          ip(src=srcaddr, dst=dstaddr) /
131*2f2c4c7aSAndroid Build Coastguard Worker          scapy.TCP(sport=original.dport, dport=original.sport,
132*2f2c4c7aSAndroid Build Coastguard Worker                    ack=original.seq + ack_delta, seq=original.ack,
133*2f2c4c7aSAndroid Build Coastguard Worker                    flags=TCP_ACK | TCP_FIN, window=TCP_WINDOW))
134*2f2c4c7aSAndroid Build Coastguard Worker
135*2f2c4c7aSAndroid Build Coastguard Workerdef GRE(version, srcaddr, dstaddr, proto, packet):
136*2f2c4c7aSAndroid Build Coastguard Worker  if version == 4:
137*2f2c4c7aSAndroid Build Coastguard Worker    ip = scapy.IP(src=srcaddr, dst=dstaddr, proto=net_test.IPPROTO_GRE)
138*2f2c4c7aSAndroid Build Coastguard Worker  else:
139*2f2c4c7aSAndroid Build Coastguard Worker    ip = scapy.IPv6(src=srcaddr, dst=dstaddr, nh=net_test.IPPROTO_GRE)
140*2f2c4c7aSAndroid Build Coastguard Worker  packet = ip / scapy.GRE(proto=proto) / packet
141*2f2c4c7aSAndroid Build Coastguard Worker  return ("GRE packet", packet)
142*2f2c4c7aSAndroid Build Coastguard Worker
143*2f2c4c7aSAndroid Build Coastguard Workerdef ICMPPortUnreachable(version, srcaddr, dstaddr, packet):
144*2f2c4c7aSAndroid Build Coastguard Worker  if version == 4:
145*2f2c4c7aSAndroid Build Coastguard Worker    # Linux hardcodes the ToS on ICMP errors to 0xc0 or greater because of
146*2f2c4c7aSAndroid Build Coastguard Worker    # RFC 1812 4.3.2.5 (!).
147*2f2c4c7aSAndroid Build Coastguard Worker    return ("ICMPv4 port unreachable",
148*2f2c4c7aSAndroid Build Coastguard Worker            scapy.IP(src=srcaddr, dst=dstaddr, proto=1, tos=0xc0) /
149*2f2c4c7aSAndroid Build Coastguard Worker            scapy.ICMPerror(type=3, code=3) / packet)
150*2f2c4c7aSAndroid Build Coastguard Worker  else:
151*2f2c4c7aSAndroid Build Coastguard Worker    return ("ICMPv6 port unreachable",
152*2f2c4c7aSAndroid Build Coastguard Worker            scapy.IPv6(src=srcaddr, dst=dstaddr) /
153*2f2c4c7aSAndroid Build Coastguard Worker            scapy.ICMPv6DestUnreach(code=4) / packet)
154*2f2c4c7aSAndroid Build Coastguard Worker
155*2f2c4c7aSAndroid Build Coastguard Workerdef ICMPPacketTooBig(version, srcaddr, dstaddr, packet):
156*2f2c4c7aSAndroid Build Coastguard Worker  if version == 4:
157*2f2c4c7aSAndroid Build Coastguard Worker    desc = "ICMPv4 fragmentation needed"
158*2f2c4c7aSAndroid Build Coastguard Worker    pkt = (scapy.IP(src=srcaddr, dst=dstaddr, proto=1) /
159*2f2c4c7aSAndroid Build Coastguard Worker           scapy.ICMPerror(type=3, code=4) / bytes(packet)[:64])
160*2f2c4c7aSAndroid Build Coastguard Worker    # Only newer versions of scapy understand that since RFC 1191, the last two
161*2f2c4c7aSAndroid Build Coastguard Worker    # bytes of a fragmentation needed ICMP error contain the MTU.
162*2f2c4c7aSAndroid Build Coastguard Worker    if hasattr(scapy.ICMP, "nexthopmtu"):
163*2f2c4c7aSAndroid Build Coastguard Worker      pkt[scapy.ICMPerror].nexthopmtu = PTB_MTU
164*2f2c4c7aSAndroid Build Coastguard Worker    else:
165*2f2c4c7aSAndroid Build Coastguard Worker      pkt[scapy.ICMPerror].unused = PTB_MTU
166*2f2c4c7aSAndroid Build Coastguard Worker    return desc, pkt
167*2f2c4c7aSAndroid Build Coastguard Worker  else:
168*2f2c4c7aSAndroid Build Coastguard Worker    return ("ICMPv6 Packet Too Big",
169*2f2c4c7aSAndroid Build Coastguard Worker            scapy.IPv6(src=srcaddr, dst=dstaddr) /
170*2f2c4c7aSAndroid Build Coastguard Worker            scapy.ICMPv6PacketTooBig(mtu=PTB_MTU) / bytes(packet)[:1232])
171*2f2c4c7aSAndroid Build Coastguard Worker
172*2f2c4c7aSAndroid Build Coastguard Workerdef ICMPEcho(version, srcaddr, dstaddr):
173*2f2c4c7aSAndroid Build Coastguard Worker  ip = _GetIpLayer(version)
174*2f2c4c7aSAndroid Build Coastguard Worker  icmp = {4: scapy.ICMP, 6: scapy.ICMPv6EchoRequest}[version]
175*2f2c4c7aSAndroid Build Coastguard Worker  packet = (ip(src=srcaddr, dst=dstaddr) /
176*2f2c4c7aSAndroid Build Coastguard Worker            icmp(id=PING_IDENT, seq=PING_SEQ) / PING_PAYLOAD)
177*2f2c4c7aSAndroid Build Coastguard Worker  _SetPacketTos(packet, PING_TOS)
178*2f2c4c7aSAndroid Build Coastguard Worker  return ("ICMPv%d echo" % version, packet)
179*2f2c4c7aSAndroid Build Coastguard Worker
180*2f2c4c7aSAndroid Build Coastguard Workerdef ICMPReply(version, srcaddr, dstaddr, packet):
181*2f2c4c7aSAndroid Build Coastguard Worker  ip = _GetIpLayer(version)
182*2f2c4c7aSAndroid Build Coastguard Worker  # Scapy doesn't provide an ICMP echo reply constructor.
183*2f2c4c7aSAndroid Build Coastguard Worker  icmpv4_reply = lambda **kwargs: scapy.ICMP(type=0, **kwargs)
184*2f2c4c7aSAndroid Build Coastguard Worker  icmp = {4: icmpv4_reply, 6: scapy.ICMPv6EchoReply}[version]
185*2f2c4c7aSAndroid Build Coastguard Worker  packet = (ip(src=srcaddr, dst=dstaddr) /
186*2f2c4c7aSAndroid Build Coastguard Worker            icmp(id=PING_IDENT, seq=PING_SEQ) / PING_PAYLOAD)
187*2f2c4c7aSAndroid Build Coastguard Worker  _SetPacketTos(packet, PING_TOS)
188*2f2c4c7aSAndroid Build Coastguard Worker  return ("ICMPv%d echo reply" % version, packet)
189*2f2c4c7aSAndroid Build Coastguard Worker
190*2f2c4c7aSAndroid Build Coastguard Workerdef NS(srcaddr, tgtaddr, srcmac):
191*2f2c4c7aSAndroid Build Coastguard Worker  solicited = inet_pton(AF_INET6, tgtaddr)
192*2f2c4c7aSAndroid Build Coastguard Worker  last3bytes = tuple([net_test.ByteToHex(b) for b in solicited[-3:]])
193*2f2c4c7aSAndroid Build Coastguard Worker  solicited = "ff02::1:ff%s:%s%s" % last3bytes
194*2f2c4c7aSAndroid Build Coastguard Worker  packet = (scapy.IPv6(src=srcaddr, dst=solicited) /
195*2f2c4c7aSAndroid Build Coastguard Worker            scapy.ICMPv6ND_NS(tgt=tgtaddr) /
196*2f2c4c7aSAndroid Build Coastguard Worker            scapy.ICMPv6NDOptSrcLLAddr(lladdr=srcmac))
197*2f2c4c7aSAndroid Build Coastguard Worker  return ("ICMPv6 NS", packet)
198*2f2c4c7aSAndroid Build Coastguard Worker
199*2f2c4c7aSAndroid Build Coastguard Workerdef NA(srcaddr, dstaddr, srcmac):
200*2f2c4c7aSAndroid Build Coastguard Worker  packet = (scapy.IPv6(src=srcaddr, dst=dstaddr) /
201*2f2c4c7aSAndroid Build Coastguard Worker            scapy.ICMPv6ND_NA(tgt=srcaddr, R=0, S=1, O=1) /
202*2f2c4c7aSAndroid Build Coastguard Worker            scapy.ICMPv6NDOptDstLLAddr(lladdr=srcmac))
203*2f2c4c7aSAndroid Build Coastguard Worker  return ("ICMPv6 NA", packet)
204