xref: /aosp_15_r20/external/scapy/scapy/contrib/macsec.py (revision 7dc08ffc4802948ccbc861daaf1e81c405c2c4bd)
1# This file is part of Scapy
2## See http://www.secdev.org/projects/scapy for more informations
3## Copyright (C) Sabrina Dubroca <[email protected]>
4## This program is published under a GPLv2 license
5
6"""
7Classes and functions for MACsec.
8"""
9
10from __future__ import absolute_import
11from __future__ import print_function
12import struct
13
14from scapy.config import conf
15from scapy.fields import *
16from scapy.packet import Packet, Raw, bind_layers
17from scapy.layers.l2 import Ether, Dot1AD, Dot1Q
18from scapy.layers.eap import MACsecSCI
19from scapy.layers.inet import IP
20from scapy.layers.inet6 import IPv6
21import scapy.modules.six as six
22
23if conf.crypto_valid:
24    from cryptography.exceptions import InvalidTag
25    from cryptography.hazmat.backends import default_backend
26    from cryptography.hazmat.primitives.ciphers import (
27        Cipher,
28        algorithms,
29        modes,
30    )
31else:
32    log_loading.info("Can't import python-cryptography v1.7+. "
33                     "Disabled MACsec encryption/authentication.")
34
35
36NOSCI_LEN = 14 + 6
37SCI_LEN = 8
38DEFAULT_ICV_LEN = 16
39
40
41class MACsecSA(object):
42    """Representation of a MACsec Secure Association
43
44    Provides encapsulation, decapsulation, encryption, and decryption
45    of MACsec frames
46    """
47    def __init__(self, sci, an, pn, key, icvlen, encrypt, send_sci):
48        if isinstance(sci, six.integer_types):
49            self.sci = struct.pack('!Q', sci)
50        elif isinstance(sci, bytes):
51            self.sci = sci
52        else:
53            raise TypeError("SCI must be either bytes or int")
54        self.an = an
55        self.pn = pn
56        self.key = key
57        self.icvlen = icvlen
58        self.do_encrypt = encrypt
59        self.send_sci = send_sci
60
61    def make_iv(self, pkt):
62        """generate an IV for the packet"""
63        return self.sci + struct.pack('!I', pkt[MACsec].pn)
64
65    @staticmethod
66    def split_pkt(pkt, assoclen, icvlen=0):
67        """
68        split the packet into associated data, plaintext or ciphertext, and
69        optional ICV
70        """
71        data = raw(pkt)
72        assoc = data[:assoclen]
73        if icvlen:
74            icv = data[-icvlen:]
75            enc = data[assoclen:-icvlen]
76        else:
77            icv = b''
78            enc = data[assoclen:]
79        return assoc, enc, icv
80
81    def e_bit(self):
82        """returns the value of the E bit for packets sent through this SA"""
83        return self.do_encrypt
84
85    def c_bit(self):
86        """returns the value of the C bit for packets sent through this SA"""
87        return self.do_encrypt or self.icvlen != DEFAULT_ICV_LEN
88
89    @staticmethod
90    def shortlen(pkt):
91        """determine shortlen for a raw packet (not encapsulated yet)"""
92        datalen = len(pkt) - 2*6
93        if datalen < 48:
94            return datalen
95        return 0
96
97    def encap(self, pkt):
98        """encapsulate a frame using this Secure Association"""
99        if pkt.name != Ether().name:
100            raise TypeError('cannot encapsulate packet in MACsec, must be Ethernet')
101        hdr = copy.deepcopy(pkt)
102        payload = hdr.payload
103        del hdr.payload
104        tag = MACsec(sci=self.sci, an=self.an,
105                     SC=self.send_sci,
106                     E=self.e_bit(), C=self.c_bit(),
107                     shortlen=MACsecSA.shortlen(pkt),
108                     pn=self.pn, type=pkt.type)
109        hdr.type = ETH_P_MACSEC
110        return hdr/tag/payload
111
112    # this doesn't really need to be a method, but for symmetry with
113    # encap(), it is
114    def decap(self, orig_pkt):
115        """decapsulate a MACsec frame"""
116        if orig_pkt.name != Ether().name or orig_pkt.payload.name != MACsec().name:
117            raise TypeError('cannot decapsulate MACsec packet, must be Ethernet/MACsec')
118        packet = copy.deepcopy(orig_pkt)
119        prev_layer = packet[MACsec].underlayer
120        prev_layer.type = packet[MACsec].type
121        next_layer = packet[MACsec].payload
122        del prev_layer.payload
123        if prev_layer.name == Ether().name:
124            return Ether(raw(prev_layer/next_layer))
125        return prev_layer/next_layer
126
127    def encrypt(self, orig_pkt, assoclen=None):
128        """encrypt a MACsec frame for this Secure Association"""
129        hdr = copy.deepcopy(orig_pkt)
130        del hdr[MACsec].payload
131        del hdr[MACsec].type
132        pktlen = len(orig_pkt)
133        if self.send_sci:
134            hdrlen = NOSCI_LEN + SCI_LEN
135        else:
136            hdrlen = NOSCI_LEN
137        if assoclen is None or not self.do_encrypt:
138            if self.do_encrypt:
139                assoclen = hdrlen
140            else:
141                assoclen = pktlen
142        iv = self.make_iv(orig_pkt)
143        assoc, pt, _ = MACsecSA.split_pkt(orig_pkt, assoclen)
144        encryptor = Cipher(
145            algorithms.AES(self.key),
146            modes.GCM(iv),
147            backend=default_backend()
148        ).encryptor()
149        encryptor.authenticate_additional_data(assoc)
150        ct = encryptor.update(pt) + encryptor.finalize()
151        hdr[MACsec].payload = Raw(assoc[hdrlen:assoclen] + ct + encryptor.tag)
152        return hdr
153
154    def decrypt(self, orig_pkt, assoclen=None):
155        """decrypt a MACsec frame for this Secure Association"""
156        hdr = copy.deepcopy(orig_pkt)
157        del hdr[MACsec].payload
158        pktlen = len(orig_pkt)
159        if self.send_sci:
160            hdrlen = NOSCI_LEN + SCI_LEN
161        else:
162            hdrlen = NOSCI_LEN
163        if assoclen is None or not self.do_encrypt:
164            if self.do_encrypt:
165                assoclen = hdrlen
166            else:
167                assoclen = pktlen - self.icvlen
168        iv = self.make_iv(hdr)
169        assoc, ct, icv = MACsecSA.split_pkt(orig_pkt, assoclen, self.icvlen)
170        decryptor = Cipher(
171               algorithms.AES(self.key),
172               modes.GCM(iv, icv),
173               backend=default_backend()
174           ).decryptor()
175        decryptor.authenticate_additional_data(assoc)
176        pt = assoc[hdrlen:assoclen]
177        pt += decryptor.update(ct)
178        pt += decryptor.finalize()
179        hdr[MACsec].type = struct.unpack('!H', pt[0:2])[0]
180        hdr[MACsec].payload = Raw(pt[2:])
181        return hdr
182
183
184class MACsec(Packet):
185    """representation of one MACsec frame"""
186    name = '802.1AE'
187    fields_desc = [BitField('Ver', 0, 1),
188                   BitField('ES', 0, 1),
189                   BitField('SC', 0, 1),
190                   BitField('SCB', 0, 1),
191                   BitField('E', 0, 1),
192                   BitField('C', 0, 1),
193                   BitField('an', 0, 2),
194                   BitField('reserved', 0, 2),
195                   BitField('shortlen', 0, 6),
196                   IntField("pn", 1),
197                   ConditionalField(PacketField("sci", None, MACsecSCI), lambda pkt: pkt.SC),
198                   ConditionalField(XShortEnumField("type", None, ETHER_TYPES),
199                                    lambda pkt: pkt.type is not None)]
200
201    def mysummary(self):
202        summary = self.sprintf("an=%MACsec.an%, pn=%MACsec.pn%")
203        if self.SC:
204            summary += self.sprintf(", sci=%MACsec.sci%")
205        if self.type is not None:
206            summary += self.sprintf(", %MACsec.type%")
207        return summary
208
209
210bind_layers(MACsec, IP, type=ETH_P_IP)
211bind_layers(MACsec, IPv6, type=ETH_P_IPV6)
212
213bind_layers( Dot1AD,        MACsec,        type=ETH_P_MACSEC)
214bind_layers( Dot1Q,         MACsec,        type=ETH_P_MACSEC)
215bind_layers( Ether,         MACsec,        type=ETH_P_MACSEC)
216