1#!/usr/bin/env python3 2# Copyright 2014 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7This utility takes a JSON input that describes a CRLSet and produces a 8CRLSet from it. 9 10The input is taken on stdin and is a dict with the following keys: 11 - BlockedBySPKI: An array of strings, where each string is a filename 12 containing a PEM certificate, from which an SPKI will be extracted. 13 - BlockedByHash: A dict of string to an array of strings. The dict key is 14 a filename containing a PEM certificate, representing the issuer cert, 15 while the array of strings contain the filenames of PEM format 16 certificates whose serials are blocked. 17 - LimitedSubjects: A dict of string to an array of strings, where the key is 18 a filename containing a PEM format certificate, and the strings are the 19 filenames of PEM format certificates. Certificates that share a Subject 20 with the key will be restricted to the set of SPKIs extracted from the 21 files in the values. 22 - Sequence: An optional integer sequence number to use for the CRLSet. If 23 not present, defaults to 1. 24 25For example: 26 27{ 28 "BlockedBySPKI": ["/tmp/blocked-certificate"], 29 "BlockedByHash": { 30 "/tmp/intermediate-certificate": [1, 2, 3] 31 }, 32 "LimitedSubjects": { 33 "/tmp/limited-certificate": [ 34 "/tmp/limited-certificate", 35 "/tmp/limited-certificate2" 36 ] 37 }, 38 "Sequence": 23 39} 40""" 41 42import base64 43import collections 44import hashlib 45import json 46import optparse 47import six 48import struct 49import sys 50 51 52def _pem_cert_to_binary(pem_filename): 53 """Decodes the first PEM-encoded certificate in a given file into binary 54 55 Args: 56 pem_filename: A filename that contains a PEM-encoded certificate. It may 57 contain additional data (keys, textual representation) which will be 58 ignored 59 60 Returns: 61 A byte array containing the decoded certificate data 62 """ 63 pem_data = "" 64 started = False 65 66 with open(pem_filename, 'r') as pem_file: 67 for line in pem_file: 68 if not started: 69 if line.startswith('-----BEGIN CERTIFICATE'): 70 started = True 71 else: 72 if line.startswith('-----END CERTIFICATE'): 73 break 74 pem_data += line[:-1].strip() 75 76 return base64.b64decode(pem_data) 77 78 79def _parse_asn1_element(der_bytes): 80 """Parses a DER-encoded tag/Length/Value into its component parts 81 82 Args: 83 der_bytes: A DER-encoded ASN.1 data type 84 85 Returns: 86 A tuple of the ASN.1 tag value, the length of the ASN.1 header that was 87 read, the sequence of bytes for the value, and then any data from der_bytes 88 that was not part of the tag/Length/Value. 89 """ 90 tag = six.indexbytes(der_bytes, 0) 91 length = six.indexbytes(der_bytes, 1) 92 header_length = 2 93 94 if length & 0x80: 95 num_length_bytes = length & 0x7f 96 length = 0 97 for i in range(2, 2 + num_length_bytes): 98 length <<= 8 99 length += six.indexbytes(der_bytes, i) 100 header_length = 2 + num_length_bytes 101 102 contents = der_bytes[:header_length + length] 103 rest = der_bytes[header_length + length:] 104 105 return (tag, header_length, contents, rest) 106 107 108class ASN1Iterator(object): 109 """Iterator that parses and iterates through a ASN.1 DER structure""" 110 111 def __init__(self, contents): 112 self._tag = 0 113 self._header_length = 0 114 self._rest = None 115 self._contents = contents 116 self.step_into() 117 118 def step_into(self): 119 """Begins processing the inner contents of the next ASN.1 element""" 120 (self._tag, self._header_length, self._contents, self._rest) = ( 121 _parse_asn1_element(self._contents[self._header_length:])) 122 123 def step_over(self): 124 """Skips/ignores the next ASN.1 element""" 125 (self._tag, self._header_length, self._contents, self._rest) = ( 126 _parse_asn1_element(self._rest)) 127 128 def tag(self): 129 """Returns the ASN.1 tag of the current element""" 130 return self._tag 131 132 def contents(self): 133 """Returns the raw data of the current element""" 134 return self._contents 135 136 def encoded_value(self): 137 """Returns the encoded value of the current element (i.e. without header)""" 138 return self._contents[self._header_length:] 139 140 141def _der_cert_to_spki(der_bytes): 142 """Returns the subjectPublicKeyInfo of a DER-encoded certificate 143 144 Args: 145 der_bytes: A DER-encoded certificate (RFC 5280) 146 147 Returns: 148 A byte array containing the subjectPublicKeyInfo 149 """ 150 iterator = ASN1Iterator(der_bytes) 151 iterator.step_into() # enter certificate structure 152 iterator.step_into() # enter TBSCertificate 153 iterator.step_over() # over version 154 iterator.step_over() # over serial 155 iterator.step_over() # over signature algorithm 156 iterator.step_over() # over issuer name 157 iterator.step_over() # over validity 158 iterator.step_over() # over subject name 159 return iterator.contents() 160 161 162def der_cert_to_spki_hash(der_cert): 163 """Gets the SHA-256 hash of the subjectPublicKeyInfo of a DER encoded cert 164 165 Args: 166 der_cert: A string containing the DER-encoded certificate 167 168 Returns: 169 The SHA-256 hash of the certificate, as a byte sequence 170 """ 171 return hashlib.sha256(_der_cert_to_spki(der_cert)).digest() 172 173 174def pem_cert_file_to_spki_hash(pem_filename): 175 """Gets the SHA-256 hash of the subjectPublicKeyInfo of a cert in a file 176 177 Args: 178 pem_filename: A file containing a PEM-encoded certificate. 179 180 Returns: 181 The SHA-256 hash of the first certificate in the file, as a byte sequence 182 """ 183 return der_cert_to_spki_hash(_pem_cert_to_binary(pem_filename)) 184 185 186def der_cert_to_subject_hash(der_bytes): 187 """Returns SHA256(subject) of a DER-encoded certificate 188 189 Args: 190 der_bytes: A DER-encoded certificate (RFC 5280) 191 192 Returns: 193 The SHA-256 hash of the certificate's subject. 194 """ 195 iterator = ASN1Iterator(der_bytes) 196 iterator.step_into() # enter certificate structure 197 iterator.step_into() # enter TBSCertificate 198 iterator.step_over() # over version 199 iterator.step_over() # over serial 200 iterator.step_over() # over signature algorithm 201 iterator.step_over() # over issuer name 202 iterator.step_over() # over validity 203 return hashlib.sha256(iterator.contents()).digest() 204 205 206def pem_cert_file_to_subject_hash(pem_filename): 207 """Gets the SHA-256 hash of the subject of a cert in a file 208 209 Args: 210 pem_filename: A file containing a PEM-encoded certificate. 211 212 Returns: 213 The SHA-256 hash of the subject of the first certificate in the file, as a 214 byte sequence 215 """ 216 return der_cert_to_subject_hash(_pem_cert_to_binary(pem_filename)) 217 218 219def der_cert_to_serial(der_bytes): 220 """Gets the serial of a DER-encoded certificate, omitting leading 0x00 221 222 Args: 223 der_bytes: A DER-encoded certificates (RFC 5280) 224 225 Returns: 226 The encoded serial number value (omitting tag and length), and omitting 227 any leading 0x00 used to indicate it is a positive INTEGER. 228 """ 229 iterator = ASN1Iterator(der_bytes) 230 iterator.step_into() # enter certificate structure 231 iterator.step_into() # enter TBSCertificate 232 iterator.step_over() # over version 233 raw_serial = iterator.encoded_value() 234 if six.indexbytes(raw_serial, 0) == 0x00 and len(raw_serial) > 1: 235 raw_serial = raw_serial[1:] 236 return raw_serial 237 238 239def pem_cert_file_to_serial(pem_filename): 240 """Gets the DER-encoded serial of a cert in a file, omitting leading 0x00 241 242 Args: 243 pem_filename: A file containing a PEM-encoded certificate. 244 245 Returns: 246 The DER-encoded serial as a byte sequence 247 """ 248 return der_cert_to_serial(_pem_cert_to_binary(pem_filename)) 249 250 251def main(): 252 parser = optparse.OptionParser(description=sys.modules[__name__].__doc__) 253 parser.add_option('-o', '--output', 254 help='Specifies the output file. The default is stdout.') 255 options, _ = parser.parse_args() 256 outfile = sys.stdout 257 if options.output and options.output != '-': 258 outfile = open(options.output, 'wb') 259 260 config = json.load(sys.stdin) 261 blocked_spkis = [ 262 base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii') 263 for pem_file in config.get('BlockedBySPKI', []) 264 ] 265 parents = { 266 pem_cert_file_to_spki_hash(pem_file): [ 267 pem_cert_file_to_serial(issued_cert_file) 268 for issued_cert_file in issued_certs 269 ] 270 for pem_file, issued_certs in config.get('BlockedByHash', {}).items() 271 } 272 limited_subjects = { 273 base64.b64encode(pem_cert_file_to_subject_hash(pem_file)).decode('ascii'): 274 [ 275 base64.b64encode(pem_cert_file_to_spki_hash(filename)).decode('ascii') 276 for filename in allowed_pems 277 ] 278 for pem_file, allowed_pems in config.get('LimitedSubjects', {}).items() 279 } 280 known_interception_spkis = [ 281 base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii') 282 for pem_file in config.get('KnownInterceptionSPKIs', []) 283 ] 284 blocked_interception_spkis = [ 285 base64.b64encode(pem_cert_file_to_spki_hash(pem_file)).decode('ascii') 286 for pem_file in config.get('BlockedInterceptionSPKIs', []) 287 ] 288 header_json = { 289 'Version': 0, 290 'ContentType': 'CRLSet', 291 'Sequence': int(config.get("Sequence", 1)), 292 'NumParents': len(parents), 293 'BlockedSPKIs': blocked_spkis, 294 'LimitedSubjects': limited_subjects, 295 'KnownInterceptionSPKIs': known_interception_spkis, 296 'BlockedInterceptionSPKIs': blocked_interception_spkis 297 } 298 header = json.dumps(header_json) 299 outfile.write(struct.pack('<H', len(header))) 300 outfile.write(header.encode('utf-8')) 301 for spki, serials in sorted(parents.items()): 302 outfile.write(spki) 303 outfile.write(struct.pack('<I', len(serials))) 304 for serial in serials: 305 raw_serial = [] 306 if not serial: 307 raw_serial = b'\x00' 308 else: 309 raw_serial = serial 310 311 outfile.write(struct.pack('<B', len(raw_serial))) 312 outfile.write(raw_serial) 313 return 0 314 315 316if __name__ == '__main__': 317 sys.exit(main()) 318