1# This file is part of Scapy 2## See http://www.secdev.org/projects/scapy for more informations 3## Copyright (C) Sabrina Dubroca <[email protected]> 4## This program is published under a GPLv2 license 5 6""" 7Classes and functions for MACsec. 8""" 9 10from __future__ import absolute_import 11from __future__ import print_function 12import struct 13 14from scapy.config import conf 15from scapy.fields import * 16from scapy.packet import Packet, Raw, bind_layers 17from scapy.layers.l2 import Ether, Dot1AD, Dot1Q 18from scapy.layers.eap import MACsecSCI 19from scapy.layers.inet import IP 20from scapy.layers.inet6 import IPv6 21import scapy.modules.six as six 22 23if conf.crypto_valid: 24 from cryptography.exceptions import InvalidTag 25 from cryptography.hazmat.backends import default_backend 26 from cryptography.hazmat.primitives.ciphers import ( 27 Cipher, 28 algorithms, 29 modes, 30 ) 31else: 32 log_loading.info("Can't import python-cryptography v1.7+. " 33 "Disabled MACsec encryption/authentication.") 34 35 36NOSCI_LEN = 14 + 6 37SCI_LEN = 8 38DEFAULT_ICV_LEN = 16 39 40 41class MACsecSA(object): 42 """Representation of a MACsec Secure Association 43 44 Provides encapsulation, decapsulation, encryption, and decryption 45 of MACsec frames 46 """ 47 def __init__(self, sci, an, pn, key, icvlen, encrypt, send_sci): 48 if isinstance(sci, six.integer_types): 49 self.sci = struct.pack('!Q', sci) 50 elif isinstance(sci, bytes): 51 self.sci = sci 52 else: 53 raise TypeError("SCI must be either bytes or int") 54 self.an = an 55 self.pn = pn 56 self.key = key 57 self.icvlen = icvlen 58 self.do_encrypt = encrypt 59 self.send_sci = send_sci 60 61 def make_iv(self, pkt): 62 """generate an IV for the packet""" 63 return self.sci + struct.pack('!I', pkt[MACsec].pn) 64 65 @staticmethod 66 def split_pkt(pkt, assoclen, icvlen=0): 67 """ 68 split the packet into associated data, plaintext or ciphertext, and 69 optional ICV 70 """ 71 data = raw(pkt) 72 assoc = data[:assoclen] 73 if icvlen: 74 icv = data[-icvlen:] 75 enc = data[assoclen:-icvlen] 76 else: 77 icv = b'' 78 enc = data[assoclen:] 79 return assoc, enc, icv 80 81 def e_bit(self): 82 """returns the value of the E bit for packets sent through this SA""" 83 return self.do_encrypt 84 85 def c_bit(self): 86 """returns the value of the C bit for packets sent through this SA""" 87 return self.do_encrypt or self.icvlen != DEFAULT_ICV_LEN 88 89 @staticmethod 90 def shortlen(pkt): 91 """determine shortlen for a raw packet (not encapsulated yet)""" 92 datalen = len(pkt) - 2*6 93 if datalen < 48: 94 return datalen 95 return 0 96 97 def encap(self, pkt): 98 """encapsulate a frame using this Secure Association""" 99 if pkt.name != Ether().name: 100 raise TypeError('cannot encapsulate packet in MACsec, must be Ethernet') 101 hdr = copy.deepcopy(pkt) 102 payload = hdr.payload 103 del hdr.payload 104 tag = MACsec(sci=self.sci, an=self.an, 105 SC=self.send_sci, 106 E=self.e_bit(), C=self.c_bit(), 107 shortlen=MACsecSA.shortlen(pkt), 108 pn=self.pn, type=pkt.type) 109 hdr.type = ETH_P_MACSEC 110 return hdr/tag/payload 111 112 # this doesn't really need to be a method, but for symmetry with 113 # encap(), it is 114 def decap(self, orig_pkt): 115 """decapsulate a MACsec frame""" 116 if orig_pkt.name != Ether().name or orig_pkt.payload.name != MACsec().name: 117 raise TypeError('cannot decapsulate MACsec packet, must be Ethernet/MACsec') 118 packet = copy.deepcopy(orig_pkt) 119 prev_layer = packet[MACsec].underlayer 120 prev_layer.type = packet[MACsec].type 121 next_layer = packet[MACsec].payload 122 del prev_layer.payload 123 if prev_layer.name == Ether().name: 124 return Ether(raw(prev_layer/next_layer)) 125 return prev_layer/next_layer 126 127 def encrypt(self, orig_pkt, assoclen=None): 128 """encrypt a MACsec frame for this Secure Association""" 129 hdr = copy.deepcopy(orig_pkt) 130 del hdr[MACsec].payload 131 del hdr[MACsec].type 132 pktlen = len(orig_pkt) 133 if self.send_sci: 134 hdrlen = NOSCI_LEN + SCI_LEN 135 else: 136 hdrlen = NOSCI_LEN 137 if assoclen is None or not self.do_encrypt: 138 if self.do_encrypt: 139 assoclen = hdrlen 140 else: 141 assoclen = pktlen 142 iv = self.make_iv(orig_pkt) 143 assoc, pt, _ = MACsecSA.split_pkt(orig_pkt, assoclen) 144 encryptor = Cipher( 145 algorithms.AES(self.key), 146 modes.GCM(iv), 147 backend=default_backend() 148 ).encryptor() 149 encryptor.authenticate_additional_data(assoc) 150 ct = encryptor.update(pt) + encryptor.finalize() 151 hdr[MACsec].payload = Raw(assoc[hdrlen:assoclen] + ct + encryptor.tag) 152 return hdr 153 154 def decrypt(self, orig_pkt, assoclen=None): 155 """decrypt a MACsec frame for this Secure Association""" 156 hdr = copy.deepcopy(orig_pkt) 157 del hdr[MACsec].payload 158 pktlen = len(orig_pkt) 159 if self.send_sci: 160 hdrlen = NOSCI_LEN + SCI_LEN 161 else: 162 hdrlen = NOSCI_LEN 163 if assoclen is None or not self.do_encrypt: 164 if self.do_encrypt: 165 assoclen = hdrlen 166 else: 167 assoclen = pktlen - self.icvlen 168 iv = self.make_iv(hdr) 169 assoc, ct, icv = MACsecSA.split_pkt(orig_pkt, assoclen, self.icvlen) 170 decryptor = Cipher( 171 algorithms.AES(self.key), 172 modes.GCM(iv, icv), 173 backend=default_backend() 174 ).decryptor() 175 decryptor.authenticate_additional_data(assoc) 176 pt = assoc[hdrlen:assoclen] 177 pt += decryptor.update(ct) 178 pt += decryptor.finalize() 179 hdr[MACsec].type = struct.unpack('!H', pt[0:2])[0] 180 hdr[MACsec].payload = Raw(pt[2:]) 181 return hdr 182 183 184class MACsec(Packet): 185 """representation of one MACsec frame""" 186 name = '802.1AE' 187 fields_desc = [BitField('Ver', 0, 1), 188 BitField('ES', 0, 1), 189 BitField('SC', 0, 1), 190 BitField('SCB', 0, 1), 191 BitField('E', 0, 1), 192 BitField('C', 0, 1), 193 BitField('an', 0, 2), 194 BitField('reserved', 0, 2), 195 BitField('shortlen', 0, 6), 196 IntField("pn", 1), 197 ConditionalField(PacketField("sci", None, MACsecSCI), lambda pkt: pkt.SC), 198 ConditionalField(XShortEnumField("type", None, ETHER_TYPES), 199 lambda pkt: pkt.type is not None)] 200 201 def mysummary(self): 202 summary = self.sprintf("an=%MACsec.an%, pn=%MACsec.pn%") 203 if self.SC: 204 summary += self.sprintf(", sci=%MACsec.sci%") 205 if self.type is not None: 206 summary += self.sprintf(", %MACsec.type%") 207 return summary 208 209 210bind_layers(MACsec, IP, type=ETH_P_IP) 211bind_layers(MACsec, IPv6, type=ETH_P_IPV6) 212 213bind_layers( Dot1AD, MACsec, type=ETH_P_MACSEC) 214bind_layers( Dot1Q, MACsec, type=ETH_P_MACSEC) 215bind_layers( Ether, MACsec, type=ETH_P_MACSEC) 216