1*7dc08ffcSJunyu Lai## This file is part of Scapy 2*7dc08ffcSJunyu Lai## Copyright (C) 2008 Arnaud Ebalard <[email protected]> 3*7dc08ffcSJunyu Lai## <[email protected]> 4*7dc08ffcSJunyu Lai## 2015, 2016, 2017 Maxence Tury <[email protected]> 5*7dc08ffcSJunyu Lai## This program is published under a GPLv2 license 6*7dc08ffcSJunyu Lai 7*7dc08ffcSJunyu Lai""" 8*7dc08ffcSJunyu LaiHigh-level methods for PKI objects (X.509 certificates, CRLs, asymmetric keys). 9*7dc08ffcSJunyu LaiSupports both RSA and ECDSA objects. 10*7dc08ffcSJunyu Lai 11*7dc08ffcSJunyu LaiThe classes below are wrappers for the ASN.1 objects defined in x509.py. 12*7dc08ffcSJunyu LaiBy collecting their attributes, we bypass the ASN.1 structure, hence 13*7dc08ffcSJunyu Laithere is no direct method for exporting a new full DER-encoded version 14*7dc08ffcSJunyu Laiof a Cert instance after its serial has been modified (for example). 15*7dc08ffcSJunyu LaiIf you need to modify an import, just use the corresponding ASN1_Packet. 16*7dc08ffcSJunyu Lai 17*7dc08ffcSJunyu LaiFor instance, here is what you could do in order to modify the serial of 18*7dc08ffcSJunyu Lai'cert' and then resign it with whatever 'key': 19*7dc08ffcSJunyu Lai f = open('cert.der') 20*7dc08ffcSJunyu Lai c = X509_Cert(f.read()) 21*7dc08ffcSJunyu Lai c.tbsCertificate.serialNumber = 0x4B1D 22*7dc08ffcSJunyu Lai k = PrivKey('key.pem') 23*7dc08ffcSJunyu Lai new_x509_cert = k.resignCert(c) 24*7dc08ffcSJunyu LaiNo need for obnoxious openssl tweaking anymore. :) 25*7dc08ffcSJunyu Lai""" 26*7dc08ffcSJunyu Lai 27*7dc08ffcSJunyu Laifrom __future__ import absolute_import 28*7dc08ffcSJunyu Laifrom __future__ import print_function 29*7dc08ffcSJunyu Laiimport base64 30*7dc08ffcSJunyu Laiimport os 31*7dc08ffcSJunyu Laiimport time 32*7dc08ffcSJunyu Lai 33*7dc08ffcSJunyu Laifrom scapy.config import conf, crypto_validator 34*7dc08ffcSJunyu Laiimport scapy.modules.six as six 35*7dc08ffcSJunyu Laifrom scapy.modules.six.moves import range 36*7dc08ffcSJunyu Laiif conf.crypto_valid: 37*7dc08ffcSJunyu Lai from cryptography.hazmat.backends import default_backend 38*7dc08ffcSJunyu Lai from cryptography.hazmat.primitives import serialization 39*7dc08ffcSJunyu Lai from cryptography.hazmat.primitives.asymmetric import rsa 40*7dc08ffcSJunyu Lai 41*7dc08ffcSJunyu Laifrom scapy.error import warning 42*7dc08ffcSJunyu Laifrom scapy.utils import binrepr 43*7dc08ffcSJunyu Laifrom scapy.asn1.asn1 import ASN1_BIT_STRING 44*7dc08ffcSJunyu Laifrom scapy.asn1.mib import hash_by_oid 45*7dc08ffcSJunyu Laifrom scapy.layers.x509 import (X509_SubjectPublicKeyInfo, 46*7dc08ffcSJunyu Lai RSAPublicKey, RSAPrivateKey, 47*7dc08ffcSJunyu Lai ECDSAPublicKey, ECDSAPrivateKey, 48*7dc08ffcSJunyu Lai RSAPrivateKey_OpenSSL, ECDSAPrivateKey_OpenSSL, 49*7dc08ffcSJunyu Lai X509_Cert, X509_CRL) 50*7dc08ffcSJunyu Laifrom scapy.layers.tls.crypto.pkcs1 import (pkcs_os2ip, pkcs_i2osp, _get_hash, 51*7dc08ffcSJunyu Lai _EncryptAndVerifyRSA, 52*7dc08ffcSJunyu Lai _DecryptAndSignRSA) 53*7dc08ffcSJunyu Lai 54*7dc08ffcSJunyu Laifrom scapy.compat import * 55*7dc08ffcSJunyu Lai 56*7dc08ffcSJunyu Lai# Maximum allowed size in bytes for a certificate file, to avoid 57*7dc08ffcSJunyu Lai# loading huge file when importing a cert 58*7dc08ffcSJunyu Lai_MAX_KEY_SIZE = 50*1024 59*7dc08ffcSJunyu Lai_MAX_CERT_SIZE = 50*1024 60*7dc08ffcSJunyu Lai_MAX_CRL_SIZE = 10*1024*1024 # some are that big 61*7dc08ffcSJunyu Lai 62*7dc08ffcSJunyu Lai 63*7dc08ffcSJunyu Lai##################################################################### 64*7dc08ffcSJunyu Lai# Some helpers 65*7dc08ffcSJunyu Lai##################################################################### 66*7dc08ffcSJunyu Lai 67*7dc08ffcSJunyu Lai@conf.commands.register 68*7dc08ffcSJunyu Laidef der2pem(der_string, obj="UNKNOWN"): 69*7dc08ffcSJunyu Lai """Convert DER octet string to PEM format (with optional header)""" 70*7dc08ffcSJunyu Lai # Encode a byte string in PEM format. Header advertizes <obj> type. 71*7dc08ffcSJunyu Lai pem_string = ("-----BEGIN %s-----\n" % obj).encode() 72*7dc08ffcSJunyu Lai base64_string = base64.b64encode(der_string) 73*7dc08ffcSJunyu Lai chunks = [base64_string[i:i+64] for i in range(0, len(base64_string), 64)] 74*7dc08ffcSJunyu Lai pem_string += b'\n'.join(chunks) 75*7dc08ffcSJunyu Lai pem_string += ("\n-----END %s-----\n" % obj).encode() 76*7dc08ffcSJunyu Lai return pem_string 77*7dc08ffcSJunyu Lai 78*7dc08ffcSJunyu Lai@conf.commands.register 79*7dc08ffcSJunyu Laidef pem2der(pem_string): 80*7dc08ffcSJunyu Lai """Convert PEM string to DER format""" 81*7dc08ffcSJunyu Lai # Encode all lines between the first '-----\n' and the 2nd-to-last '-----'. 82*7dc08ffcSJunyu Lai pem_string = pem_string.replace(b"\r", b"") 83*7dc08ffcSJunyu Lai first_idx = pem_string.find(b"-----\n") + 6 84*7dc08ffcSJunyu Lai if pem_string.find(b"-----BEGIN", first_idx) != -1: 85*7dc08ffcSJunyu Lai raise Exception("pem2der() expects only one PEM-encoded object") 86*7dc08ffcSJunyu Lai last_idx = pem_string.rfind(b"-----", 0, pem_string.rfind(b"-----")) 87*7dc08ffcSJunyu Lai base64_string = pem_string[first_idx:last_idx] 88*7dc08ffcSJunyu Lai base64_string.replace(b"\n", b"") 89*7dc08ffcSJunyu Lai der_string = base64.b64decode(base64_string) 90*7dc08ffcSJunyu Lai return der_string 91*7dc08ffcSJunyu Lai 92*7dc08ffcSJunyu Laidef split_pem(s): 93*7dc08ffcSJunyu Lai """ 94*7dc08ffcSJunyu Lai Split PEM objects. Useful to process concatenated certificates. 95*7dc08ffcSJunyu Lai """ 96*7dc08ffcSJunyu Lai pem_strings = [] 97*7dc08ffcSJunyu Lai while s != b"": 98*7dc08ffcSJunyu Lai start_idx = s.find(b"-----BEGIN") 99*7dc08ffcSJunyu Lai if start_idx == -1: 100*7dc08ffcSJunyu Lai break 101*7dc08ffcSJunyu Lai end_idx = s.find(b"-----END") 102*7dc08ffcSJunyu Lai end_idx = s.find(b"\n", end_idx) + 1 103*7dc08ffcSJunyu Lai pem_strings.append(s[start_idx:end_idx]) 104*7dc08ffcSJunyu Lai s = s[end_idx:] 105*7dc08ffcSJunyu Lai return pem_strings 106*7dc08ffcSJunyu Lai 107*7dc08ffcSJunyu Lai 108*7dc08ffcSJunyu Laiclass _PKIObj(object): 109*7dc08ffcSJunyu Lai def __init__(self, frmt, der, pem): 110*7dc08ffcSJunyu Lai # Note that changing attributes of the _PKIObj does not update these 111*7dc08ffcSJunyu Lai # values (e.g. modifying k.modulus does not change k.der). 112*7dc08ffcSJunyu Lai #XXX use __setattr__ for this 113*7dc08ffcSJunyu Lai self.frmt = frmt 114*7dc08ffcSJunyu Lai self.der = der 115*7dc08ffcSJunyu Lai self.pem = pem 116*7dc08ffcSJunyu Lai 117*7dc08ffcSJunyu Lai def __str__(self): 118*7dc08ffcSJunyu Lai return self.der 119*7dc08ffcSJunyu Lai 120*7dc08ffcSJunyu Lai 121*7dc08ffcSJunyu Laiclass _PKIObjMaker(type): 122*7dc08ffcSJunyu Lai def __call__(cls, obj_path, obj_max_size, pem_marker=None): 123*7dc08ffcSJunyu Lai # This enables transparent DER and PEM-encoded data imports. 124*7dc08ffcSJunyu Lai # Note that when importing a PEM file with multiple objects (like ECDSA 125*7dc08ffcSJunyu Lai # private keys output by openssl), it will concatenate every object in 126*7dc08ffcSJunyu Lai # order to create a 'der' attribute. When converting a 'multi' DER file 127*7dc08ffcSJunyu Lai # into a PEM file, though, the PEM attribute will not be valid, 128*7dc08ffcSJunyu Lai # because we do not try to identify the class of each object. 129*7dc08ffcSJunyu Lai error_msg = "Unable to import data" 130*7dc08ffcSJunyu Lai 131*7dc08ffcSJunyu Lai if obj_path is None: 132*7dc08ffcSJunyu Lai raise Exception(error_msg) 133*7dc08ffcSJunyu Lai obj_path = raw(obj_path) 134*7dc08ffcSJunyu Lai 135*7dc08ffcSJunyu Lai if (not b'\x00' in obj_path) and os.path.isfile(obj_path): 136*7dc08ffcSJunyu Lai _size = os.path.getsize(obj_path) 137*7dc08ffcSJunyu Lai if _size > obj_max_size: 138*7dc08ffcSJunyu Lai raise Exception(error_msg) 139*7dc08ffcSJunyu Lai try: 140*7dc08ffcSJunyu Lai f = open(obj_path, "rb") 141*7dc08ffcSJunyu Lai _raw = f.read() 142*7dc08ffcSJunyu Lai f.close() 143*7dc08ffcSJunyu Lai except: 144*7dc08ffcSJunyu Lai raise Exception(error_msg) 145*7dc08ffcSJunyu Lai else: 146*7dc08ffcSJunyu Lai _raw = obj_path 147*7dc08ffcSJunyu Lai 148*7dc08ffcSJunyu Lai try: 149*7dc08ffcSJunyu Lai if b"-----BEGIN" in _raw: 150*7dc08ffcSJunyu Lai frmt = "PEM" 151*7dc08ffcSJunyu Lai pem = _raw 152*7dc08ffcSJunyu Lai der_list = split_pem(_raw) 153*7dc08ffcSJunyu Lai der = b''.join(map(pem2der, der_list)) 154*7dc08ffcSJunyu Lai else: 155*7dc08ffcSJunyu Lai frmt = "DER" 156*7dc08ffcSJunyu Lai der = _raw 157*7dc08ffcSJunyu Lai pem = "" 158*7dc08ffcSJunyu Lai if pem_marker is not None: 159*7dc08ffcSJunyu Lai pem = der2pem(_raw, pem_marker) 160*7dc08ffcSJunyu Lai # type identification may be needed for pem_marker 161*7dc08ffcSJunyu Lai # in such case, the pem attribute has to be updated 162*7dc08ffcSJunyu Lai except: 163*7dc08ffcSJunyu Lai raise Exception(error_msg) 164*7dc08ffcSJunyu Lai 165*7dc08ffcSJunyu Lai p = _PKIObj(frmt, der, pem) 166*7dc08ffcSJunyu Lai return p 167*7dc08ffcSJunyu Lai 168*7dc08ffcSJunyu Lai 169*7dc08ffcSJunyu Lai##################################################################### 170*7dc08ffcSJunyu Lai# PKI objects wrappers 171*7dc08ffcSJunyu Lai##################################################################### 172*7dc08ffcSJunyu Lai 173*7dc08ffcSJunyu Lai############### 174*7dc08ffcSJunyu Lai# Public Keys # 175*7dc08ffcSJunyu Lai############### 176*7dc08ffcSJunyu Lai 177*7dc08ffcSJunyu Laiclass _PubKeyFactory(_PKIObjMaker): 178*7dc08ffcSJunyu Lai """ 179*7dc08ffcSJunyu Lai Metaclass for PubKey creation. 180*7dc08ffcSJunyu Lai It casts the appropriate class on the fly, then fills in 181*7dc08ffcSJunyu Lai the appropriate attributes with import_from_asn1pkt() submethod. 182*7dc08ffcSJunyu Lai """ 183*7dc08ffcSJunyu Lai def __call__(cls, key_path=None): 184*7dc08ffcSJunyu Lai 185*7dc08ffcSJunyu Lai if key_path is None: 186*7dc08ffcSJunyu Lai obj = type.__call__(cls) 187*7dc08ffcSJunyu Lai if cls is PubKey: 188*7dc08ffcSJunyu Lai cls = PubKeyRSA 189*7dc08ffcSJunyu Lai obj.__class__ = cls 190*7dc08ffcSJunyu Lai obj.frmt = "original" 191*7dc08ffcSJunyu Lai obj.fill_and_store() 192*7dc08ffcSJunyu Lai return obj 193*7dc08ffcSJunyu Lai 194*7dc08ffcSJunyu Lai # This deals with the rare RSA 'kx export' call. 195*7dc08ffcSJunyu Lai if isinstance(key_path, tuple): 196*7dc08ffcSJunyu Lai obj = type.__call__(cls) 197*7dc08ffcSJunyu Lai obj.__class__ = PubKeyRSA 198*7dc08ffcSJunyu Lai obj.frmt = "tuple" 199*7dc08ffcSJunyu Lai obj.import_from_tuple(key_path) 200*7dc08ffcSJunyu Lai return obj 201*7dc08ffcSJunyu Lai 202*7dc08ffcSJunyu Lai # Now for the usual calls, key_path may be the path to either: 203*7dc08ffcSJunyu Lai # _an X509_SubjectPublicKeyInfo, as processed by openssl; 204*7dc08ffcSJunyu Lai # _an RSAPublicKey; 205*7dc08ffcSJunyu Lai # _an ECDSAPublicKey. 206*7dc08ffcSJunyu Lai obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 207*7dc08ffcSJunyu Lai try: 208*7dc08ffcSJunyu Lai spki = X509_SubjectPublicKeyInfo(obj.der) 209*7dc08ffcSJunyu Lai pubkey = spki.subjectPublicKey 210*7dc08ffcSJunyu Lai if isinstance(pubkey, RSAPublicKey): 211*7dc08ffcSJunyu Lai obj.__class__ = PubKeyRSA 212*7dc08ffcSJunyu Lai obj.import_from_asn1pkt(pubkey) 213*7dc08ffcSJunyu Lai elif isinstance(pubkey, ECDSAPublicKey): 214*7dc08ffcSJunyu Lai obj.__class__ = PubKeyECDSA 215*7dc08ffcSJunyu Lai try: 216*7dc08ffcSJunyu Lai obj.import_from_der(obj.der) 217*7dc08ffcSJunyu Lai except ImportError: 218*7dc08ffcSJunyu Lai pass 219*7dc08ffcSJunyu Lai else: 220*7dc08ffcSJunyu Lai raise 221*7dc08ffcSJunyu Lai marker = b"PUBLIC KEY" 222*7dc08ffcSJunyu Lai except: 223*7dc08ffcSJunyu Lai try: 224*7dc08ffcSJunyu Lai pubkey = RSAPublicKey(obj.der) 225*7dc08ffcSJunyu Lai obj.__class__ = PubKeyRSA 226*7dc08ffcSJunyu Lai obj.import_from_asn1pkt(pubkey) 227*7dc08ffcSJunyu Lai marker = b"RSA PUBLIC KEY" 228*7dc08ffcSJunyu Lai except: 229*7dc08ffcSJunyu Lai # We cannot import an ECDSA public key without curve knowledge 230*7dc08ffcSJunyu Lai raise Exception("Unable to import public key") 231*7dc08ffcSJunyu Lai 232*7dc08ffcSJunyu Lai if obj.frmt == "DER": 233*7dc08ffcSJunyu Lai obj.pem = der2pem(obj.der, marker) 234*7dc08ffcSJunyu Lai return obj 235*7dc08ffcSJunyu Lai 236*7dc08ffcSJunyu Lai 237*7dc08ffcSJunyu Laiclass PubKey(six.with_metaclass(_PubKeyFactory, object)): 238*7dc08ffcSJunyu Lai """ 239*7dc08ffcSJunyu Lai Parent class for both PubKeyRSA and PubKeyECDSA. 240*7dc08ffcSJunyu Lai Provides a common verifyCert() method. 241*7dc08ffcSJunyu Lai """ 242*7dc08ffcSJunyu Lai 243*7dc08ffcSJunyu Lai def verifyCert(self, cert): 244*7dc08ffcSJunyu Lai """ Verifies either a Cert or an X509_Cert. """ 245*7dc08ffcSJunyu Lai tbsCert = cert.tbsCertificate 246*7dc08ffcSJunyu Lai sigAlg = tbsCert.signature 247*7dc08ffcSJunyu Lai h = hash_by_oid[sigAlg.algorithm.val] 248*7dc08ffcSJunyu Lai sigVal = raw(cert.signatureValue) 249*7dc08ffcSJunyu Lai return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs') 250*7dc08ffcSJunyu Lai 251*7dc08ffcSJunyu Lai 252*7dc08ffcSJunyu Laiclass PubKeyRSA(PubKey, _EncryptAndVerifyRSA): 253*7dc08ffcSJunyu Lai """ 254*7dc08ffcSJunyu Lai Wrapper for RSA keys based on _EncryptAndVerifyRSA from crypto/pkcs1.py 255*7dc08ffcSJunyu Lai Use the 'key' attribute to access original object. 256*7dc08ffcSJunyu Lai """ 257*7dc08ffcSJunyu Lai @crypto_validator 258*7dc08ffcSJunyu Lai def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None): 259*7dc08ffcSJunyu Lai pubExp = pubExp or 65537 260*7dc08ffcSJunyu Lai if not modulus: 261*7dc08ffcSJunyu Lai real_modulusLen = modulusLen or 2048 262*7dc08ffcSJunyu Lai private_key = rsa.generate_private_key(public_exponent=pubExp, 263*7dc08ffcSJunyu Lai key_size=real_modulusLen, 264*7dc08ffcSJunyu Lai backend=default_backend()) 265*7dc08ffcSJunyu Lai self.pubkey = private_key.public_key() 266*7dc08ffcSJunyu Lai else: 267*7dc08ffcSJunyu Lai real_modulusLen = len(binrepr(modulus)) 268*7dc08ffcSJunyu Lai if modulusLen and real_modulusLen != modulusLen: 269*7dc08ffcSJunyu Lai warning("modulus and modulusLen do not match!") 270*7dc08ffcSJunyu Lai pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 271*7dc08ffcSJunyu Lai self.pubkey = pubNum.public_key(default_backend()) 272*7dc08ffcSJunyu Lai # Lines below are only useful for the legacy part of pkcs1.py 273*7dc08ffcSJunyu Lai pubNum = self.pubkey.public_numbers() 274*7dc08ffcSJunyu Lai self._modulusLen = real_modulusLen 275*7dc08ffcSJunyu Lai self._modulus = pubNum.n 276*7dc08ffcSJunyu Lai self._pubExp = pubNum.e 277*7dc08ffcSJunyu Lai 278*7dc08ffcSJunyu Lai @crypto_validator 279*7dc08ffcSJunyu Lai def import_from_tuple(self, tup): 280*7dc08ffcSJunyu Lai # this is rarely used 281*7dc08ffcSJunyu Lai e, m, mLen = tup 282*7dc08ffcSJunyu Lai if isinstance(m, bytes): 283*7dc08ffcSJunyu Lai m = pkcs_os2ip(m) 284*7dc08ffcSJunyu Lai if isinstance(e, bytes): 285*7dc08ffcSJunyu Lai e = pkcs_os2ip(e) 286*7dc08ffcSJunyu Lai self.fill_and_store(modulus=m, pubExp=e) 287*7dc08ffcSJunyu Lai self.pem = self.pubkey.public_bytes( 288*7dc08ffcSJunyu Lai encoding=serialization.Encoding.PEM, 289*7dc08ffcSJunyu Lai format=serialization.PublicFormat.SubjectPublicKeyInfo) 290*7dc08ffcSJunyu Lai self.der = pem2der(self.pem) 291*7dc08ffcSJunyu Lai 292*7dc08ffcSJunyu Lai def import_from_asn1pkt(self, pubkey): 293*7dc08ffcSJunyu Lai modulus = pubkey.modulus.val 294*7dc08ffcSJunyu Lai pubExp = pubkey.publicExponent.val 295*7dc08ffcSJunyu Lai self.fill_and_store(modulus=modulus, pubExp=pubExp) 296*7dc08ffcSJunyu Lai 297*7dc08ffcSJunyu Lai def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 298*7dc08ffcSJunyu Lai # no ECDSA encryption support, hence no ECDSA specific keywords here 299*7dc08ffcSJunyu Lai return _EncryptAndVerifyRSA.encrypt(self, msg, t, h, mgf, L) 300*7dc08ffcSJunyu Lai 301*7dc08ffcSJunyu Lai def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 302*7dc08ffcSJunyu Lai return _EncryptAndVerifyRSA.verify(self, msg, sig, t, h, mgf, L) 303*7dc08ffcSJunyu Lai 304*7dc08ffcSJunyu Laiclass PubKeyECDSA(PubKey): 305*7dc08ffcSJunyu Lai """ 306*7dc08ffcSJunyu Lai Wrapper for ECDSA keys based on the cryptography library. 307*7dc08ffcSJunyu Lai Use the 'key' attribute to access original object. 308*7dc08ffcSJunyu Lai """ 309*7dc08ffcSJunyu Lai @crypto_validator 310*7dc08ffcSJunyu Lai def fill_and_store(self, curve=None): 311*7dc08ffcSJunyu Lai curve = curve or ec.SECP256R1 312*7dc08ffcSJunyu Lai private_key = ec.generate_private_key(curve(), default_backend()) 313*7dc08ffcSJunyu Lai self.pubkey = private_key.public_key() 314*7dc08ffcSJunyu Lai 315*7dc08ffcSJunyu Lai @crypto_validator 316*7dc08ffcSJunyu Lai def import_from_der(self, pubkey): 317*7dc08ffcSJunyu Lai # No lib support for explicit curves nor compressed points. 318*7dc08ffcSJunyu Lai self.pubkey = serialization.load_der_public_key(pubkey, 319*7dc08ffcSJunyu Lai backend=default_backend()) 320*7dc08ffcSJunyu Lai 321*7dc08ffcSJunyu Lai def encrypt(self, msg, h="sha256", **kwargs): 322*7dc08ffcSJunyu Lai # cryptography lib does not support ECDSA encryption 323*7dc08ffcSJunyu Lai raise Exception("No ECDSA encryption support") 324*7dc08ffcSJunyu Lai 325*7dc08ffcSJunyu Lai @crypto_validator 326*7dc08ffcSJunyu Lai def verify(self, msg, sig, h="sha256", **kwargs): 327*7dc08ffcSJunyu Lai # 'sig' should be a DER-encoded signature, as per RFC 3279 328*7dc08ffcSJunyu Lai verifier = self.pubkey.verifier(sig, ec.ECDSA(_get_hash(h))) 329*7dc08ffcSJunyu Lai verifier.update(msg) 330*7dc08ffcSJunyu Lai return verifier.verify() 331*7dc08ffcSJunyu Lai 332*7dc08ffcSJunyu Lai 333*7dc08ffcSJunyu Lai################ 334*7dc08ffcSJunyu Lai# Private Keys # 335*7dc08ffcSJunyu Lai################ 336*7dc08ffcSJunyu Lai 337*7dc08ffcSJunyu Laiclass _PrivKeyFactory(_PKIObjMaker): 338*7dc08ffcSJunyu Lai """ 339*7dc08ffcSJunyu Lai Metaclass for PrivKey creation. 340*7dc08ffcSJunyu Lai It casts the appropriate class on the fly, then fills in 341*7dc08ffcSJunyu Lai the appropriate attributes with import_from_asn1pkt() submethod. 342*7dc08ffcSJunyu Lai """ 343*7dc08ffcSJunyu Lai def __call__(cls, key_path=None): 344*7dc08ffcSJunyu Lai """ 345*7dc08ffcSJunyu Lai key_path may be the path to either: 346*7dc08ffcSJunyu Lai _an RSAPrivateKey_OpenSSL (as generated by openssl); 347*7dc08ffcSJunyu Lai _an ECDSAPrivateKey_OpenSSL (as generated by openssl); 348*7dc08ffcSJunyu Lai _an RSAPrivateKey; 349*7dc08ffcSJunyu Lai _an ECDSAPrivateKey. 350*7dc08ffcSJunyu Lai """ 351*7dc08ffcSJunyu Lai if key_path is None: 352*7dc08ffcSJunyu Lai obj = type.__call__(cls) 353*7dc08ffcSJunyu Lai if cls is PrivKey: 354*7dc08ffcSJunyu Lai cls = PrivKeyECDSA 355*7dc08ffcSJunyu Lai obj.__class__ = cls 356*7dc08ffcSJunyu Lai obj.frmt = "original" 357*7dc08ffcSJunyu Lai obj.fill_and_store() 358*7dc08ffcSJunyu Lai return obj 359*7dc08ffcSJunyu Lai 360*7dc08ffcSJunyu Lai obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 361*7dc08ffcSJunyu Lai multiPEM = False 362*7dc08ffcSJunyu Lai try: 363*7dc08ffcSJunyu Lai privkey = RSAPrivateKey_OpenSSL(obj.der) 364*7dc08ffcSJunyu Lai privkey = privkey.privateKey 365*7dc08ffcSJunyu Lai obj.__class__ = PrivKeyRSA 366*7dc08ffcSJunyu Lai marker = b"PRIVATE KEY" 367*7dc08ffcSJunyu Lai except: 368*7dc08ffcSJunyu Lai try: 369*7dc08ffcSJunyu Lai privkey = ECDSAPrivateKey_OpenSSL(obj.der) 370*7dc08ffcSJunyu Lai privkey = privkey.privateKey 371*7dc08ffcSJunyu Lai obj.__class__ = PrivKeyECDSA 372*7dc08ffcSJunyu Lai marker = b"EC PRIVATE KEY" 373*7dc08ffcSJunyu Lai multiPEM = True 374*7dc08ffcSJunyu Lai except: 375*7dc08ffcSJunyu Lai try: 376*7dc08ffcSJunyu Lai privkey = RSAPrivateKey(obj.der) 377*7dc08ffcSJunyu Lai obj.__class__ = PrivKeyRSA 378*7dc08ffcSJunyu Lai marker = b"RSA PRIVATE KEY" 379*7dc08ffcSJunyu Lai except: 380*7dc08ffcSJunyu Lai try: 381*7dc08ffcSJunyu Lai privkey = ECDSAPrivateKey(obj.der) 382*7dc08ffcSJunyu Lai obj.__class__ = PrivKeyECDSA 383*7dc08ffcSJunyu Lai marker = b"EC PRIVATE KEY" 384*7dc08ffcSJunyu Lai except: 385*7dc08ffcSJunyu Lai raise Exception("Unable to import private key") 386*7dc08ffcSJunyu Lai try: 387*7dc08ffcSJunyu Lai obj.import_from_asn1pkt(privkey) 388*7dc08ffcSJunyu Lai except ImportError: 389*7dc08ffcSJunyu Lai pass 390*7dc08ffcSJunyu Lai 391*7dc08ffcSJunyu Lai if obj.frmt == "DER": 392*7dc08ffcSJunyu Lai if multiPEM: 393*7dc08ffcSJunyu Lai # this does not restore the EC PARAMETERS header 394*7dc08ffcSJunyu Lai obj.pem = der2pem(raw(privkey), marker) 395*7dc08ffcSJunyu Lai else: 396*7dc08ffcSJunyu Lai obj.pem = der2pem(obj.der, marker) 397*7dc08ffcSJunyu Lai return obj 398*7dc08ffcSJunyu Lai 399*7dc08ffcSJunyu Lai 400*7dc08ffcSJunyu Laiclass PrivKey(six.with_metaclass(_PrivKeyFactory, object)): 401*7dc08ffcSJunyu Lai """ 402*7dc08ffcSJunyu Lai Parent class for both PrivKeyRSA and PrivKeyECDSA. 403*7dc08ffcSJunyu Lai Provides common signTBSCert() and resignCert() methods. 404*7dc08ffcSJunyu Lai """ 405*7dc08ffcSJunyu Lai 406*7dc08ffcSJunyu Lai def signTBSCert(self, tbsCert, h="sha256"): 407*7dc08ffcSJunyu Lai """ 408*7dc08ffcSJunyu Lai Note that this will always copy the signature field from the 409*7dc08ffcSJunyu Lai tbsCertificate into the signatureAlgorithm field of the result, 410*7dc08ffcSJunyu Lai regardless of the coherence between its contents (which might 411*7dc08ffcSJunyu Lai indicate ecdsa-with-SHA512) and the result (e.g. RSA signing MD2). 412*7dc08ffcSJunyu Lai 413*7dc08ffcSJunyu Lai There is a small inheritance trick for the computation of sigVal 414*7dc08ffcSJunyu Lai below: in order to use a sign() method which would apply 415*7dc08ffcSJunyu Lai to both PrivKeyRSA and PrivKeyECDSA, the sign() methods of the 416*7dc08ffcSJunyu Lai subclasses accept any argument, be it from the RSA or ECDSA world, 417*7dc08ffcSJunyu Lai and then they keep the ones they're interested in. 418*7dc08ffcSJunyu Lai Here, t will be passed eventually to pkcs1._DecryptAndSignRSA.sign(). 419*7dc08ffcSJunyu Lai """ 420*7dc08ffcSJunyu Lai sigAlg = tbsCert.signature 421*7dc08ffcSJunyu Lai h = h or hash_by_oid[sigAlg.algorithm.val] 422*7dc08ffcSJunyu Lai sigVal = self.sign(raw(tbsCert), h=h, t='pkcs') 423*7dc08ffcSJunyu Lai c = X509_Cert() 424*7dc08ffcSJunyu Lai c.tbsCertificate = tbsCert 425*7dc08ffcSJunyu Lai c.signatureAlgorithm = sigAlg 426*7dc08ffcSJunyu Lai c.signatureValue = ASN1_BIT_STRING(sigVal, readable=True) 427*7dc08ffcSJunyu Lai return c 428*7dc08ffcSJunyu Lai 429*7dc08ffcSJunyu Lai def resignCert(self, cert): 430*7dc08ffcSJunyu Lai """ Rewrite the signature of either a Cert or an X509_Cert. """ 431*7dc08ffcSJunyu Lai return self.signTBSCert(cert.tbsCertificate) 432*7dc08ffcSJunyu Lai 433*7dc08ffcSJunyu Lai def verifyCert(self, cert): 434*7dc08ffcSJunyu Lai """ Verifies either a Cert or an X509_Cert. """ 435*7dc08ffcSJunyu Lai tbsCert = cert.tbsCertificate 436*7dc08ffcSJunyu Lai sigAlg = tbsCert.signature 437*7dc08ffcSJunyu Lai h = hash_by_oid[sigAlg.algorithm.val] 438*7dc08ffcSJunyu Lai sigVal = raw(cert.signatureValue) 439*7dc08ffcSJunyu Lai return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs') 440*7dc08ffcSJunyu Lai 441*7dc08ffcSJunyu Lai 442*7dc08ffcSJunyu Laiclass PrivKeyRSA(PrivKey, _EncryptAndVerifyRSA, _DecryptAndSignRSA): 443*7dc08ffcSJunyu Lai """ 444*7dc08ffcSJunyu Lai Wrapper for RSA keys based on _DecryptAndSignRSA from crypto/pkcs1.py 445*7dc08ffcSJunyu Lai Use the 'key' attribute to access original object. 446*7dc08ffcSJunyu Lai """ 447*7dc08ffcSJunyu Lai @crypto_validator 448*7dc08ffcSJunyu Lai def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None, 449*7dc08ffcSJunyu Lai prime1=None, prime2=None, coefficient=None, 450*7dc08ffcSJunyu Lai exponent1=None, exponent2=None, privExp=None): 451*7dc08ffcSJunyu Lai pubExp = pubExp or 65537 452*7dc08ffcSJunyu Lai if None in [modulus, prime1, prime2, coefficient, privExp, 453*7dc08ffcSJunyu Lai exponent1, exponent2]: 454*7dc08ffcSJunyu Lai # note that the library requires every parameter 455*7dc08ffcSJunyu Lai # in order to call RSAPrivateNumbers(...) 456*7dc08ffcSJunyu Lai # if one of these is missing, we generate a whole new key 457*7dc08ffcSJunyu Lai real_modulusLen = modulusLen or 2048 458*7dc08ffcSJunyu Lai self.key = rsa.generate_private_key(public_exponent=pubExp, 459*7dc08ffcSJunyu Lai key_size=real_modulusLen, 460*7dc08ffcSJunyu Lai backend=default_backend()) 461*7dc08ffcSJunyu Lai self.pubkey = self.key.public_key() 462*7dc08ffcSJunyu Lai else: 463*7dc08ffcSJunyu Lai real_modulusLen = len(binrepr(modulus)) 464*7dc08ffcSJunyu Lai if modulusLen and real_modulusLen != modulusLen: 465*7dc08ffcSJunyu Lai warning("modulus and modulusLen do not match!") 466*7dc08ffcSJunyu Lai pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 467*7dc08ffcSJunyu Lai privNum = rsa.RSAPrivateNumbers(p=prime1, q=prime2, 468*7dc08ffcSJunyu Lai dmp1=exponent1, dmq1=exponent2, 469*7dc08ffcSJunyu Lai iqmp=coefficient, d=privExp, 470*7dc08ffcSJunyu Lai public_numbers=pubNum) 471*7dc08ffcSJunyu Lai self.key = privNum.private_key(default_backend()) 472*7dc08ffcSJunyu Lai self.pubkey = self.key.public_key() 473*7dc08ffcSJunyu Lai 474*7dc08ffcSJunyu Lai # Lines below are only useful for the legacy part of pkcs1.py 475*7dc08ffcSJunyu Lai pubNum = self.pubkey.public_numbers() 476*7dc08ffcSJunyu Lai self._modulusLen = real_modulusLen 477*7dc08ffcSJunyu Lai self._modulus = pubNum.n 478*7dc08ffcSJunyu Lai self._pubExp = pubNum.e 479*7dc08ffcSJunyu Lai 480*7dc08ffcSJunyu Lai def import_from_asn1pkt(self, privkey): 481*7dc08ffcSJunyu Lai modulus = privkey.modulus.val 482*7dc08ffcSJunyu Lai pubExp = privkey.publicExponent.val 483*7dc08ffcSJunyu Lai privExp = privkey.privateExponent.val 484*7dc08ffcSJunyu Lai prime1 = privkey.prime1.val 485*7dc08ffcSJunyu Lai prime2 = privkey.prime2.val 486*7dc08ffcSJunyu Lai exponent1 = privkey.exponent1.val 487*7dc08ffcSJunyu Lai exponent2 = privkey.exponent2.val 488*7dc08ffcSJunyu Lai coefficient = privkey.coefficient.val 489*7dc08ffcSJunyu Lai self.fill_and_store(modulus=modulus, pubExp=pubExp, 490*7dc08ffcSJunyu Lai privExp=privExp, prime1=prime1, prime2=prime2, 491*7dc08ffcSJunyu Lai exponent1=exponent1, exponent2=exponent2, 492*7dc08ffcSJunyu Lai coefficient=coefficient) 493*7dc08ffcSJunyu Lai 494*7dc08ffcSJunyu Lai def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 495*7dc08ffcSJunyu Lai # Let's copy this from PubKeyRSA instead of adding another baseclass :) 496*7dc08ffcSJunyu Lai return _EncryptAndVerifyRSA.verify(self, msg, sig, t, h, mgf, L) 497*7dc08ffcSJunyu Lai 498*7dc08ffcSJunyu Lai def sign(self, data, t="pkcs", h="sha256", mgf=None, L=None): 499*7dc08ffcSJunyu Lai return _DecryptAndSignRSA.sign(self, data, t, h, mgf, L) 500*7dc08ffcSJunyu Lai 501*7dc08ffcSJunyu Lai 502*7dc08ffcSJunyu Laiclass PrivKeyECDSA(PrivKey): 503*7dc08ffcSJunyu Lai """ 504*7dc08ffcSJunyu Lai Wrapper for ECDSA keys based on SigningKey from ecdsa library. 505*7dc08ffcSJunyu Lai Use the 'key' attribute to access original object. 506*7dc08ffcSJunyu Lai """ 507*7dc08ffcSJunyu Lai @crypto_validator 508*7dc08ffcSJunyu Lai def fill_and_store(self, curve=None): 509*7dc08ffcSJunyu Lai curve = curve or ec.SECP256R1 510*7dc08ffcSJunyu Lai self.key = ec.generate_private_key(curve(), default_backend()) 511*7dc08ffcSJunyu Lai self.pubkey = self.key.public_key() 512*7dc08ffcSJunyu Lai 513*7dc08ffcSJunyu Lai @crypto_validator 514*7dc08ffcSJunyu Lai def import_from_asn1pkt(self, privkey): 515*7dc08ffcSJunyu Lai self.key = serialization.load_der_private_key(raw(privkey), None, 516*7dc08ffcSJunyu Lai backend=default_backend()) 517*7dc08ffcSJunyu Lai self.pubkey = self.key.public_key() 518*7dc08ffcSJunyu Lai 519*7dc08ffcSJunyu Lai @crypto_validator 520*7dc08ffcSJunyu Lai def verify(self, msg, sig, h="sha256", **kwargs): 521*7dc08ffcSJunyu Lai # 'sig' should be a DER-encoded signature, as per RFC 3279 522*7dc08ffcSJunyu Lai verifier = self.pubkey.verifier(sig, ec.ECDSA(_get_hash(h))) 523*7dc08ffcSJunyu Lai verifier.update(msg) 524*7dc08ffcSJunyu Lai return verifier.verify() 525*7dc08ffcSJunyu Lai 526*7dc08ffcSJunyu Lai @crypto_validator 527*7dc08ffcSJunyu Lai def sign(self, data, h="sha256", **kwargs): 528*7dc08ffcSJunyu Lai signer = self.key.signer(ec.ECDSA(_get_hash(h))) 529*7dc08ffcSJunyu Lai signer.update(data) 530*7dc08ffcSJunyu Lai return signer.finalize() 531*7dc08ffcSJunyu Lai 532*7dc08ffcSJunyu Lai 533*7dc08ffcSJunyu Lai################ 534*7dc08ffcSJunyu Lai# Certificates # 535*7dc08ffcSJunyu Lai################ 536*7dc08ffcSJunyu Lai 537*7dc08ffcSJunyu Laiclass _CertMaker(_PKIObjMaker): 538*7dc08ffcSJunyu Lai """ 539*7dc08ffcSJunyu Lai Metaclass for Cert creation. It is not necessary as it was for the keys, 540*7dc08ffcSJunyu Lai but we reuse the model instead of creating redundant constructors. 541*7dc08ffcSJunyu Lai """ 542*7dc08ffcSJunyu Lai def __call__(cls, cert_path): 543*7dc08ffcSJunyu Lai obj = _PKIObjMaker.__call__(cls, cert_path, 544*7dc08ffcSJunyu Lai _MAX_CERT_SIZE, "CERTIFICATE") 545*7dc08ffcSJunyu Lai obj.__class__ = Cert 546*7dc08ffcSJunyu Lai try: 547*7dc08ffcSJunyu Lai cert = X509_Cert(obj.der) 548*7dc08ffcSJunyu Lai except: 549*7dc08ffcSJunyu Lai raise Exception("Unable to import certificate") 550*7dc08ffcSJunyu Lai obj.import_from_asn1pkt(cert) 551*7dc08ffcSJunyu Lai return obj 552*7dc08ffcSJunyu Lai 553*7dc08ffcSJunyu Lai 554*7dc08ffcSJunyu Laiclass Cert(six.with_metaclass(_CertMaker, object)): 555*7dc08ffcSJunyu Lai """ 556*7dc08ffcSJunyu Lai Wrapper for the X509_Cert from layers/x509.py. 557*7dc08ffcSJunyu Lai Use the 'x509Cert' attribute to access original object. 558*7dc08ffcSJunyu Lai """ 559*7dc08ffcSJunyu Lai 560*7dc08ffcSJunyu Lai def import_from_asn1pkt(self, cert): 561*7dc08ffcSJunyu Lai error_msg = "Unable to import certificate" 562*7dc08ffcSJunyu Lai 563*7dc08ffcSJunyu Lai self.x509Cert = cert 564*7dc08ffcSJunyu Lai 565*7dc08ffcSJunyu Lai tbsCert = cert.tbsCertificate 566*7dc08ffcSJunyu Lai self.tbsCertificate = tbsCert 567*7dc08ffcSJunyu Lai 568*7dc08ffcSJunyu Lai if tbsCert.version: 569*7dc08ffcSJunyu Lai self.version = tbsCert.version.val + 1 570*7dc08ffcSJunyu Lai else: 571*7dc08ffcSJunyu Lai self.version = 1 572*7dc08ffcSJunyu Lai self.serial = tbsCert.serialNumber.val 573*7dc08ffcSJunyu Lai self.sigAlg = tbsCert.signature.algorithm.oidname 574*7dc08ffcSJunyu Lai self.issuer = tbsCert.get_issuer() 575*7dc08ffcSJunyu Lai self.issuer_str = tbsCert.get_issuer_str() 576*7dc08ffcSJunyu Lai self.issuer_hash = hash(self.issuer_str) 577*7dc08ffcSJunyu Lai self.subject = tbsCert.get_subject() 578*7dc08ffcSJunyu Lai self.subject_str = tbsCert.get_subject_str() 579*7dc08ffcSJunyu Lai self.subject_hash = hash(self.subject_str) 580*7dc08ffcSJunyu Lai 581*7dc08ffcSJunyu Lai self.notBefore_str = tbsCert.validity.not_before.pretty_time 582*7dc08ffcSJunyu Lai notBefore = tbsCert.validity.not_before.val 583*7dc08ffcSJunyu Lai if notBefore[-1] == "Z": 584*7dc08ffcSJunyu Lai notBefore = notBefore[:-1] 585*7dc08ffcSJunyu Lai try: 586*7dc08ffcSJunyu Lai self.notBefore = time.strptime(notBefore, "%y%m%d%H%M%S") 587*7dc08ffcSJunyu Lai except: 588*7dc08ffcSJunyu Lai raise Exception(error_msg) 589*7dc08ffcSJunyu Lai self.notBefore_str_simple = time.strftime("%x", self.notBefore) 590*7dc08ffcSJunyu Lai 591*7dc08ffcSJunyu Lai self.notAfter_str = tbsCert.validity.not_after.pretty_time 592*7dc08ffcSJunyu Lai notAfter = tbsCert.validity.not_after.val 593*7dc08ffcSJunyu Lai if notAfter[-1] == "Z": 594*7dc08ffcSJunyu Lai notAfter = notAfter[:-1] 595*7dc08ffcSJunyu Lai try: 596*7dc08ffcSJunyu Lai self.notAfter = time.strptime(notAfter, "%y%m%d%H%M%S") 597*7dc08ffcSJunyu Lai except: 598*7dc08ffcSJunyu Lai raise Exception(error_msg) 599*7dc08ffcSJunyu Lai self.notAfter_str_simple = time.strftime("%x", self.notAfter) 600*7dc08ffcSJunyu Lai 601*7dc08ffcSJunyu Lai self.pubKey = PubKey(raw(tbsCert.subjectPublicKeyInfo)) 602*7dc08ffcSJunyu Lai 603*7dc08ffcSJunyu Lai if tbsCert.extensions: 604*7dc08ffcSJunyu Lai for extn in tbsCert.extensions: 605*7dc08ffcSJunyu Lai if extn.extnID.oidname == "basicConstraints": 606*7dc08ffcSJunyu Lai self.cA = False 607*7dc08ffcSJunyu Lai if extn.extnValue.cA: 608*7dc08ffcSJunyu Lai self.cA = not (extn.extnValue.cA.val == 0) 609*7dc08ffcSJunyu Lai elif extn.extnID.oidname == "keyUsage": 610*7dc08ffcSJunyu Lai self.keyUsage = extn.extnValue.get_keyUsage() 611*7dc08ffcSJunyu Lai elif extn.extnID.oidname == "extKeyUsage": 612*7dc08ffcSJunyu Lai self.extKeyUsage = extn.extnValue.get_extendedKeyUsage() 613*7dc08ffcSJunyu Lai elif extn.extnID.oidname == "authorityKeyIdentifier": 614*7dc08ffcSJunyu Lai self.authorityKeyID = extn.extnValue.keyIdentifier.val 615*7dc08ffcSJunyu Lai 616*7dc08ffcSJunyu Lai self.signatureValue = raw(cert.signatureValue) 617*7dc08ffcSJunyu Lai self.signatureLen = len(self.signatureValue) 618*7dc08ffcSJunyu Lai 619*7dc08ffcSJunyu Lai def isIssuerCert(self, other): 620*7dc08ffcSJunyu Lai """ 621*7dc08ffcSJunyu Lai True if 'other' issued 'self', i.e.: 622*7dc08ffcSJunyu Lai - self.issuer == other.subject 623*7dc08ffcSJunyu Lai - self is signed by other 624*7dc08ffcSJunyu Lai """ 625*7dc08ffcSJunyu Lai if self.issuer_hash != other.subject_hash: 626*7dc08ffcSJunyu Lai return False 627*7dc08ffcSJunyu Lai return other.pubKey.verifyCert(self) 628*7dc08ffcSJunyu Lai 629*7dc08ffcSJunyu Lai def isSelfSigned(self): 630*7dc08ffcSJunyu Lai """ 631*7dc08ffcSJunyu Lai Return True if the certificate is self-signed: 632*7dc08ffcSJunyu Lai - issuer and subject are the same 633*7dc08ffcSJunyu Lai - the signature of the certificate is valid. 634*7dc08ffcSJunyu Lai """ 635*7dc08ffcSJunyu Lai if self.issuer_hash == self.subject_hash: 636*7dc08ffcSJunyu Lai return self.isIssuerCert(self) 637*7dc08ffcSJunyu Lai return False 638*7dc08ffcSJunyu Lai 639*7dc08ffcSJunyu Lai def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 640*7dc08ffcSJunyu Lai # no ECDSA *encryption* support, hence only RSA specific keywords here 641*7dc08ffcSJunyu Lai return self.pubKey.encrypt(msg, t, h, mgf, L) 642*7dc08ffcSJunyu Lai 643*7dc08ffcSJunyu Lai def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 644*7dc08ffcSJunyu Lai return self.pubKey.verify(msg, sig, t, h, mgf, L) 645*7dc08ffcSJunyu Lai 646*7dc08ffcSJunyu Lai def remainingDays(self, now=None): 647*7dc08ffcSJunyu Lai """ 648*7dc08ffcSJunyu Lai Based on the value of notAfter field, returns the number of 649*7dc08ffcSJunyu Lai days the certificate will still be valid. The date used for the 650*7dc08ffcSJunyu Lai comparison is the current and local date, as returned by 651*7dc08ffcSJunyu Lai time.localtime(), except if 'now' argument is provided another 652*7dc08ffcSJunyu Lai one. 'now' argument can be given as either a time tuple or a string 653*7dc08ffcSJunyu Lai representing the date. Accepted format for the string version 654*7dc08ffcSJunyu Lai are: 655*7dc08ffcSJunyu Lai 656*7dc08ffcSJunyu Lai - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT' 657*7dc08ffcSJunyu Lai - '%m/%d/%y' e.g. '01/30/08' (less precise) 658*7dc08ffcSJunyu Lai 659*7dc08ffcSJunyu Lai If the certificate is no more valid at the date considered, then 660*7dc08ffcSJunyu Lai a negative value is returned representing the number of days 661*7dc08ffcSJunyu Lai since it has expired. 662*7dc08ffcSJunyu Lai 663*7dc08ffcSJunyu Lai The number of days is returned as a float to deal with the unlikely 664*7dc08ffcSJunyu Lai case of certificates that are still just valid. 665*7dc08ffcSJunyu Lai """ 666*7dc08ffcSJunyu Lai if now is None: 667*7dc08ffcSJunyu Lai now = time.localtime() 668*7dc08ffcSJunyu Lai elif isinstance(now, str): 669*7dc08ffcSJunyu Lai try: 670*7dc08ffcSJunyu Lai if '/' in now: 671*7dc08ffcSJunyu Lai now = time.strptime(now, '%m/%d/%y') 672*7dc08ffcSJunyu Lai else: 673*7dc08ffcSJunyu Lai now = time.strptime(now, '%b %d %H:%M:%S %Y %Z') 674*7dc08ffcSJunyu Lai except: 675*7dc08ffcSJunyu Lai warning("Bad time string provided, will use localtime() instead.") 676*7dc08ffcSJunyu Lai now = time.localtime() 677*7dc08ffcSJunyu Lai 678*7dc08ffcSJunyu Lai now = time.mktime(now) 679*7dc08ffcSJunyu Lai nft = time.mktime(self.notAfter) 680*7dc08ffcSJunyu Lai diff = (nft - now)/(24.*3600) 681*7dc08ffcSJunyu Lai return diff 682*7dc08ffcSJunyu Lai 683*7dc08ffcSJunyu Lai def isRevoked(self, crl_list): 684*7dc08ffcSJunyu Lai """ 685*7dc08ffcSJunyu Lai Given a list of trusted CRL (their signature has already been 686*7dc08ffcSJunyu Lai verified with trusted anchors), this function returns True if 687*7dc08ffcSJunyu Lai the certificate is marked as revoked by one of those CRL. 688*7dc08ffcSJunyu Lai 689*7dc08ffcSJunyu Lai Note that if the Certificate was on hold in a previous CRL and 690*7dc08ffcSJunyu Lai is now valid again in a new CRL and bot are in the list, it 691*7dc08ffcSJunyu Lai will be considered revoked: this is because _all_ CRLs are 692*7dc08ffcSJunyu Lai checked (not only the freshest) and revocation status is not 693*7dc08ffcSJunyu Lai handled. 694*7dc08ffcSJunyu Lai 695*7dc08ffcSJunyu Lai Also note that the check on the issuer is performed on the 696*7dc08ffcSJunyu Lai Authority Key Identifier if available in _both_ the CRL and the 697*7dc08ffcSJunyu Lai Cert. Otherwise, the issuers are simply compared. 698*7dc08ffcSJunyu Lai """ 699*7dc08ffcSJunyu Lai for c in crl_list: 700*7dc08ffcSJunyu Lai if (self.authorityKeyID is not None and 701*7dc08ffcSJunyu Lai c.authorityKeyID is not None and 702*7dc08ffcSJunyu Lai self.authorityKeyID == c.authorityKeyID): 703*7dc08ffcSJunyu Lai return self.serial in (x[0] for x in c.revoked_cert_serials) 704*7dc08ffcSJunyu Lai elif self.issuer == c.issuer: 705*7dc08ffcSJunyu Lai return self.serial in (x[0] for x in c.revoked_cert_serials) 706*7dc08ffcSJunyu Lai return False 707*7dc08ffcSJunyu Lai 708*7dc08ffcSJunyu Lai def export(self, filename, fmt="DER"): 709*7dc08ffcSJunyu Lai """ 710*7dc08ffcSJunyu Lai Export certificate in 'fmt' format (DER or PEM) to file 'filename' 711*7dc08ffcSJunyu Lai """ 712*7dc08ffcSJunyu Lai f = open(filename, "wb") 713*7dc08ffcSJunyu Lai if fmt == "DER": 714*7dc08ffcSJunyu Lai f.write(self.der) 715*7dc08ffcSJunyu Lai elif fmt == "PEM": 716*7dc08ffcSJunyu Lai f.write(self.pem) 717*7dc08ffcSJunyu Lai f.close() 718*7dc08ffcSJunyu Lai 719*7dc08ffcSJunyu Lai def show(self): 720*7dc08ffcSJunyu Lai print("Serial: %s" % self.serial) 721*7dc08ffcSJunyu Lai print("Issuer: " + self.issuer_str) 722*7dc08ffcSJunyu Lai print("Subject: " + self.subject_str) 723*7dc08ffcSJunyu Lai print("Validity: %s to %s" % (self.notBefore_str, self.notAfter_str)) 724*7dc08ffcSJunyu Lai 725*7dc08ffcSJunyu Lai def __repr__(self): 726*7dc08ffcSJunyu Lai return "[X.509 Cert. Subject:%s, Issuer:%s]" % (self.subject_str, self.issuer_str) 727*7dc08ffcSJunyu Lai 728*7dc08ffcSJunyu Lai 729*7dc08ffcSJunyu Lai################################ 730*7dc08ffcSJunyu Lai# Certificate Revocation Lists # 731*7dc08ffcSJunyu Lai################################ 732*7dc08ffcSJunyu Lai 733*7dc08ffcSJunyu Laiclass _CRLMaker(_PKIObjMaker): 734*7dc08ffcSJunyu Lai """ 735*7dc08ffcSJunyu Lai Metaclass for CRL creation. It is not necessary as it was for the keys, 736*7dc08ffcSJunyu Lai but we reuse the model instead of creating redundant constructors. 737*7dc08ffcSJunyu Lai """ 738*7dc08ffcSJunyu Lai def __call__(cls, cert_path): 739*7dc08ffcSJunyu Lai obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CRL_SIZE, "X509 CRL") 740*7dc08ffcSJunyu Lai obj.__class__ = CRL 741*7dc08ffcSJunyu Lai try: 742*7dc08ffcSJunyu Lai crl = X509_CRL(obj.der) 743*7dc08ffcSJunyu Lai except: 744*7dc08ffcSJunyu Lai raise Exception("Unable to import CRL") 745*7dc08ffcSJunyu Lai obj.import_from_asn1pkt(crl) 746*7dc08ffcSJunyu Lai return obj 747*7dc08ffcSJunyu Lai 748*7dc08ffcSJunyu Lai 749*7dc08ffcSJunyu Laiclass CRL(six.with_metaclass(_CRLMaker, object)): 750*7dc08ffcSJunyu Lai """ 751*7dc08ffcSJunyu Lai Wrapper for the X509_CRL from layers/x509.py. 752*7dc08ffcSJunyu Lai Use the 'x509CRL' attribute to access original object. 753*7dc08ffcSJunyu Lai """ 754*7dc08ffcSJunyu Lai 755*7dc08ffcSJunyu Lai def import_from_asn1pkt(self, crl): 756*7dc08ffcSJunyu Lai error_msg = "Unable to import CRL" 757*7dc08ffcSJunyu Lai 758*7dc08ffcSJunyu Lai self.x509CRL = crl 759*7dc08ffcSJunyu Lai 760*7dc08ffcSJunyu Lai tbsCertList = crl.tbsCertList 761*7dc08ffcSJunyu Lai self.tbsCertList = raw(tbsCertList) 762*7dc08ffcSJunyu Lai 763*7dc08ffcSJunyu Lai if tbsCertList.version: 764*7dc08ffcSJunyu Lai self.version = tbsCertList.version.val + 1 765*7dc08ffcSJunyu Lai else: 766*7dc08ffcSJunyu Lai self.version = 1 767*7dc08ffcSJunyu Lai self.sigAlg = tbsCertList.signature.algorithm.oidname 768*7dc08ffcSJunyu Lai self.issuer = tbsCertList.get_issuer() 769*7dc08ffcSJunyu Lai self.issuer_str = tbsCertList.get_issuer_str() 770*7dc08ffcSJunyu Lai self.issuer_hash = hash(self.issuer_str) 771*7dc08ffcSJunyu Lai 772*7dc08ffcSJunyu Lai self.lastUpdate_str = tbsCertList.this_update.pretty_time 773*7dc08ffcSJunyu Lai lastUpdate = tbsCertList.this_update.val 774*7dc08ffcSJunyu Lai if lastUpdate[-1] == "Z": 775*7dc08ffcSJunyu Lai lastUpdate = lastUpdate[:-1] 776*7dc08ffcSJunyu Lai try: 777*7dc08ffcSJunyu Lai self.lastUpdate = time.strptime(lastUpdate, "%y%m%d%H%M%S") 778*7dc08ffcSJunyu Lai except: 779*7dc08ffcSJunyu Lai raise Exception(error_msg) 780*7dc08ffcSJunyu Lai self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate) 781*7dc08ffcSJunyu Lai 782*7dc08ffcSJunyu Lai self.nextUpdate = None 783*7dc08ffcSJunyu Lai self.nextUpdate_str_simple = None 784*7dc08ffcSJunyu Lai if tbsCertList.next_update: 785*7dc08ffcSJunyu Lai self.nextUpdate_str = tbsCertList.next_update.pretty_time 786*7dc08ffcSJunyu Lai nextUpdate = tbsCertList.next_update.val 787*7dc08ffcSJunyu Lai if nextUpdate[-1] == "Z": 788*7dc08ffcSJunyu Lai nextUpdate = nextUpdate[:-1] 789*7dc08ffcSJunyu Lai try: 790*7dc08ffcSJunyu Lai self.nextUpdate = time.strptime(nextUpdate, "%y%m%d%H%M%S") 791*7dc08ffcSJunyu Lai except: 792*7dc08ffcSJunyu Lai raise Exception(error_msg) 793*7dc08ffcSJunyu Lai self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate) 794*7dc08ffcSJunyu Lai 795*7dc08ffcSJunyu Lai if tbsCertList.crlExtensions: 796*7dc08ffcSJunyu Lai for extension in tbsCertList.crlExtensions: 797*7dc08ffcSJunyu Lai if extension.extnID.oidname == "cRLNumber": 798*7dc08ffcSJunyu Lai self.number = extension.extnValue.cRLNumber.val 799*7dc08ffcSJunyu Lai 800*7dc08ffcSJunyu Lai revoked = [] 801*7dc08ffcSJunyu Lai if tbsCertList.revokedCertificates: 802*7dc08ffcSJunyu Lai for cert in tbsCertList.revokedCertificates: 803*7dc08ffcSJunyu Lai serial = cert.serialNumber.val 804*7dc08ffcSJunyu Lai date = cert.revocationDate.val 805*7dc08ffcSJunyu Lai if date[-1] == "Z": 806*7dc08ffcSJunyu Lai date = date[:-1] 807*7dc08ffcSJunyu Lai try: 808*7dc08ffcSJunyu Lai revocationDate = time.strptime(date, "%y%m%d%H%M%S") 809*7dc08ffcSJunyu Lai except: 810*7dc08ffcSJunyu Lai raise Exception(error_msg) 811*7dc08ffcSJunyu Lai revoked.append((serial, date)) 812*7dc08ffcSJunyu Lai self.revoked_cert_serials = revoked 813*7dc08ffcSJunyu Lai 814*7dc08ffcSJunyu Lai self.signatureValue = raw(crl.signatureValue) 815*7dc08ffcSJunyu Lai self.signatureLen = len(self.signatureValue) 816*7dc08ffcSJunyu Lai 817*7dc08ffcSJunyu Lai def isIssuerCert(self, other): 818*7dc08ffcSJunyu Lai # This is exactly the same thing as in Cert method. 819*7dc08ffcSJunyu Lai if self.issuer_hash != other.subject_hash: 820*7dc08ffcSJunyu Lai return False 821*7dc08ffcSJunyu Lai return other.pubKey.verifyCert(self) 822*7dc08ffcSJunyu Lai 823*7dc08ffcSJunyu Lai def verify(self, anchors): 824*7dc08ffcSJunyu Lai # Return True iff the CRL is signed by one of the provided anchors. 825*7dc08ffcSJunyu Lai for a in anchors: 826*7dc08ffcSJunyu Lai if self.isIssuerCert(a): 827*7dc08ffcSJunyu Lai return True 828*7dc08ffcSJunyu Lai return False 829*7dc08ffcSJunyu Lai 830*7dc08ffcSJunyu Lai def show(self): 831*7dc08ffcSJunyu Lai print("Version: %d" % self.version) 832*7dc08ffcSJunyu Lai print("sigAlg: " + self.sigAlg) 833*7dc08ffcSJunyu Lai print("Issuer: " + self.issuer_str) 834*7dc08ffcSJunyu Lai print("lastUpdate: %s" % self.lastUpdate_str) 835*7dc08ffcSJunyu Lai print("nextUpdate: %s" % self.nextUpdate_str) 836*7dc08ffcSJunyu Lai 837*7dc08ffcSJunyu Lai 838*7dc08ffcSJunyu Lai###################### 839*7dc08ffcSJunyu Lai# Certificate chains # 840*7dc08ffcSJunyu Lai###################### 841*7dc08ffcSJunyu Lai 842*7dc08ffcSJunyu Laiclass Chain(list): 843*7dc08ffcSJunyu Lai """ 844*7dc08ffcSJunyu Lai Basically, an enhanced array of Cert. 845*7dc08ffcSJunyu Lai """ 846*7dc08ffcSJunyu Lai def __init__(self, certList, cert0=None): 847*7dc08ffcSJunyu Lai """ 848*7dc08ffcSJunyu Lai Construct a chain of certificates starting with a self-signed 849*7dc08ffcSJunyu Lai certificate (or any certificate submitted by the user) 850*7dc08ffcSJunyu Lai and following issuer/subject matching and signature validity. 851*7dc08ffcSJunyu Lai If there is exactly one chain to be constructed, it will be, 852*7dc08ffcSJunyu Lai but if there are multiple potential chains, there is no guarantee 853*7dc08ffcSJunyu Lai that the retained one will be the longest one. 854*7dc08ffcSJunyu Lai As Cert and CRL classes both share an isIssuerCert() method, 855*7dc08ffcSJunyu Lai the trailing element of a Chain may alternatively be a CRL. 856*7dc08ffcSJunyu Lai 857*7dc08ffcSJunyu Lai Note that we do not check AKID/{SKID/issuer/serial} matching, 858*7dc08ffcSJunyu Lai nor the presence of keyCertSign in keyUsage extension (if present). 859*7dc08ffcSJunyu Lai """ 860*7dc08ffcSJunyu Lai list.__init__(self, ()) 861*7dc08ffcSJunyu Lai if cert0: 862*7dc08ffcSJunyu Lai self.append(cert0) 863*7dc08ffcSJunyu Lai else: 864*7dc08ffcSJunyu Lai for root_candidate in certList: 865*7dc08ffcSJunyu Lai if root_candidate.isSelfSigned(): 866*7dc08ffcSJunyu Lai self.append(root_candidate) 867*7dc08ffcSJunyu Lai certList.remove(root_candidate) 868*7dc08ffcSJunyu Lai break 869*7dc08ffcSJunyu Lai 870*7dc08ffcSJunyu Lai if len(self) > 0: 871*7dc08ffcSJunyu Lai while certList: 872*7dc08ffcSJunyu Lai l = len(self) 873*7dc08ffcSJunyu Lai for c in certList: 874*7dc08ffcSJunyu Lai if c.isIssuerCert(self[-1]): 875*7dc08ffcSJunyu Lai self.append(c) 876*7dc08ffcSJunyu Lai certList.remove(c) 877*7dc08ffcSJunyu Lai break 878*7dc08ffcSJunyu Lai if len(self) == l: 879*7dc08ffcSJunyu Lai # no new certificate appended to self 880*7dc08ffcSJunyu Lai break 881*7dc08ffcSJunyu Lai 882*7dc08ffcSJunyu Lai def verifyChain(self, anchors, untrusted=None): 883*7dc08ffcSJunyu Lai """ 884*7dc08ffcSJunyu Lai Perform verification of certificate chains for that certificate. 885*7dc08ffcSJunyu Lai A list of anchors is required. The certificates in the optional 886*7dc08ffcSJunyu Lai untrusted list may be used as additional elements to the final chain. 887*7dc08ffcSJunyu Lai On par with chain instantiation, only one chain constructed with the 888*7dc08ffcSJunyu Lai untrusted candidates will be retained. Eventually, dates are checked. 889*7dc08ffcSJunyu Lai """ 890*7dc08ffcSJunyu Lai untrusted = untrusted or [] 891*7dc08ffcSJunyu Lai for a in anchors: 892*7dc08ffcSJunyu Lai chain = Chain(self + untrusted, a) 893*7dc08ffcSJunyu Lai if len(chain) == 1: # anchor only 894*7dc08ffcSJunyu Lai continue 895*7dc08ffcSJunyu Lai # check that the chain does not exclusively rely on untrusted 896*7dc08ffcSJunyu Lai found = False 897*7dc08ffcSJunyu Lai for c in self: 898*7dc08ffcSJunyu Lai if c in chain[1:]: 899*7dc08ffcSJunyu Lai found = True 900*7dc08ffcSJunyu Lai if found: 901*7dc08ffcSJunyu Lai for c in chain: 902*7dc08ffcSJunyu Lai if c.remainingDays() < 0: 903*7dc08ffcSJunyu Lai break 904*7dc08ffcSJunyu Lai if c is chain[-1]: # we got to the end of the chain 905*7dc08ffcSJunyu Lai return chain 906*7dc08ffcSJunyu Lai return None 907*7dc08ffcSJunyu Lai 908*7dc08ffcSJunyu Lai def verifyChainFromCAFile(self, cafile, untrusted_file=None): 909*7dc08ffcSJunyu Lai """ 910*7dc08ffcSJunyu Lai Does the same job as .verifyChain() but using the list of anchors 911*7dc08ffcSJunyu Lai from the cafile. As for .verifyChain(), a list of untrusted 912*7dc08ffcSJunyu Lai certificates can be passed (as a file, this time). 913*7dc08ffcSJunyu Lai """ 914*7dc08ffcSJunyu Lai try: 915*7dc08ffcSJunyu Lai f = open(cafile) 916*7dc08ffcSJunyu Lai ca_certs = f.read() 917*7dc08ffcSJunyu Lai f.close() 918*7dc08ffcSJunyu Lai except: 919*7dc08ffcSJunyu Lai raise Exception("Could not read from cafile") 920*7dc08ffcSJunyu Lai 921*7dc08ffcSJunyu Lai anchors = [Cert(c) for c in split_pem(ca_certs)] 922*7dc08ffcSJunyu Lai 923*7dc08ffcSJunyu Lai untrusted = None 924*7dc08ffcSJunyu Lai if untrusted_file: 925*7dc08ffcSJunyu Lai try: 926*7dc08ffcSJunyu Lai f = open(untrusted_file) 927*7dc08ffcSJunyu Lai untrusted_certs = f.read() 928*7dc08ffcSJunyu Lai f.close() 929*7dc08ffcSJunyu Lai except: 930*7dc08ffcSJunyu Lai raise Exception("Could not read from untrusted_file") 931*7dc08ffcSJunyu Lai untrusted = [Cert(c) for c in split_pem(untrusted_certs)] 932*7dc08ffcSJunyu Lai 933*7dc08ffcSJunyu Lai return self.verifyChain(anchors, untrusted) 934*7dc08ffcSJunyu Lai 935*7dc08ffcSJunyu Lai def verifyChainFromCAPath(self, capath, untrusted_file=None): 936*7dc08ffcSJunyu Lai """ 937*7dc08ffcSJunyu Lai Does the same job as .verifyChainFromCAFile() but using the list 938*7dc08ffcSJunyu Lai of anchors in capath directory. The directory should (only) contain 939*7dc08ffcSJunyu Lai certificates files in PEM format. As for .verifyChainFromCAFile(), 940*7dc08ffcSJunyu Lai a list of untrusted certificates can be passed as a file 941*7dc08ffcSJunyu Lai (concatenation of the certificates in PEM format). 942*7dc08ffcSJunyu Lai """ 943*7dc08ffcSJunyu Lai try: 944*7dc08ffcSJunyu Lai anchors = [] 945*7dc08ffcSJunyu Lai for cafile in os.listdir(capath): 946*7dc08ffcSJunyu Lai anchors.append(Cert(open(cafile).read())) 947*7dc08ffcSJunyu Lai except: 948*7dc08ffcSJunyu Lai raise Exception("capath provided is not a valid cert path") 949*7dc08ffcSJunyu Lai 950*7dc08ffcSJunyu Lai untrusted = None 951*7dc08ffcSJunyu Lai if untrusted_file: 952*7dc08ffcSJunyu Lai try: 953*7dc08ffcSJunyu Lai f = open(untrusted_file) 954*7dc08ffcSJunyu Lai untrusted_certs = f.read() 955*7dc08ffcSJunyu Lai f.close() 956*7dc08ffcSJunyu Lai except: 957*7dc08ffcSJunyu Lai raise Exception("Could not read from untrusted_file") 958*7dc08ffcSJunyu Lai untrusted = [Cert(c) for c in split_pem(untrusted_certs)] 959*7dc08ffcSJunyu Lai 960*7dc08ffcSJunyu Lai return self.verifyChain(anchors, untrusted) 961*7dc08ffcSJunyu Lai 962*7dc08ffcSJunyu Lai def __repr__(self): 963*7dc08ffcSJunyu Lai llen = len(self) - 1 964*7dc08ffcSJunyu Lai if llen < 0: 965*7dc08ffcSJunyu Lai return "" 966*7dc08ffcSJunyu Lai c = self[0] 967*7dc08ffcSJunyu Lai s = "__ " 968*7dc08ffcSJunyu Lai if not c.isSelfSigned(): 969*7dc08ffcSJunyu Lai s += "%s [Not Self Signed]\n" % c.subject_str 970*7dc08ffcSJunyu Lai else: 971*7dc08ffcSJunyu Lai s += "%s [Self Signed]\n" % c.subject_str 972*7dc08ffcSJunyu Lai idx = 1 973*7dc08ffcSJunyu Lai while idx <= llen: 974*7dc08ffcSJunyu Lai c = self[idx] 975*7dc08ffcSJunyu Lai s += "%s\_ %s" % (" "*idx*2, c.subject_str) 976*7dc08ffcSJunyu Lai if idx != llen: 977*7dc08ffcSJunyu Lai s += "\n" 978*7dc08ffcSJunyu Lai idx += 1 979*7dc08ffcSJunyu Lai return s 980*7dc08ffcSJunyu Lai 981*7dc08ffcSJunyu Lai 982*7dc08ffcSJunyu Lai############################## 983*7dc08ffcSJunyu Lai# Certificate export helpers # 984*7dc08ffcSJunyu Lai############################## 985*7dc08ffcSJunyu Lai 986*7dc08ffcSJunyu Laidef _create_ca_file(anchor_list, filename): 987*7dc08ffcSJunyu Lai """ 988*7dc08ffcSJunyu Lai Concatenate all the certificates (PEM format for the export) in 989*7dc08ffcSJunyu Lai 'anchor_list' and write the result to file 'filename'. On success 990*7dc08ffcSJunyu Lai 'filename' is returned, None otherwise. 991*7dc08ffcSJunyu Lai 992*7dc08ffcSJunyu Lai If you are used to OpenSSL tools, this function builds a CAfile 993*7dc08ffcSJunyu Lai that can be used for certificate and CRL check. 994*7dc08ffcSJunyu Lai """ 995*7dc08ffcSJunyu Lai try: 996*7dc08ffcSJunyu Lai f = open(filename, "w") 997*7dc08ffcSJunyu Lai for a in anchor_list: 998*7dc08ffcSJunyu Lai s = a.output(fmt="PEM") 999*7dc08ffcSJunyu Lai f.write(s) 1000*7dc08ffcSJunyu Lai f.close() 1001*7dc08ffcSJunyu Lai except IOError: 1002*7dc08ffcSJunyu Lai return None 1003*7dc08ffcSJunyu Lai return filename 1004*7dc08ffcSJunyu Lai 1005