1#!/usr/bin/env python3 2# Copyright 2018 The ChromiumOS Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""A tool for running puffin tests in a corpus of deflate compressed files.""" 7 8import argparse 9import filecmp 10import logging 11import os 12import subprocess 13import sys 14import tempfile 15 16 17_PUFFHUFF = "puffhuff" 18_PUFFDIFF = "puffdiff" 19TESTS = (_PUFFHUFF, _PUFFDIFF) 20 21 22class Error(Exception): 23 """Puffin general processing error.""" 24 25 26def ParseArguments(argv): 27 """Parses and Validates command line arguments. 28 29 Args: 30 argv: command line arguments to parse. 31 32 Returns: 33 The arguments list. 34 """ 35 parser = argparse.ArgumentParser() 36 37 parser.add_argument( 38 "corpus", 39 metavar="CORPUS", 40 help="A corpus directory containing compressed files", 41 ) 42 parser.add_argument( 43 "-d", 44 "--disabled_tests", 45 default=(), 46 metavar="", 47 nargs="*", 48 help=( 49 "Space separated list of tests to disable. " 50 "Allowed options include: " + ", ".join(TESTS) 51 ), 52 choices=TESTS, 53 ) 54 parser.add_argument( 55 "--cache_size", 56 type=int, 57 metavar="SIZE", 58 help="The size (in bytes) of the cache for puffpatch " "operations.", 59 ) 60 parser.add_argument( 61 "--debug", action="store_true", help="Turns on verbosity." 62 ) 63 64 # Parse command-line arguments. 65 args = parser.parse_args(argv) 66 67 if not os.path.isdir(args.corpus): 68 raise Error( 69 "Corpus directory {} is non-existent or inaccesible".format( 70 args.corpus 71 ) 72 ) 73 return args 74 75 76def main(argv): 77 """The main function.""" 78 args = ParseArguments(argv[1:]) 79 80 if args.debug: 81 logging.getLogger().setLevel(logging.DEBUG) 82 83 # Construct list of appropriate files. 84 files = list( 85 filter( 86 os.path.isfile, 87 [os.path.join(args.corpus, f) for f in os.listdir(args.corpus)], 88 ) 89 ) 90 91 # For each file in corpus run puffhuff. 92 if _PUFFHUFF not in args.disabled_tests: 93 for src in files: 94 with tempfile.NamedTemporaryFile() as tgt_file: 95 operation = "puffhuff" 96 logging.debug("Running %s on %s", operation, src) 97 cmd = [ 98 "puffin", 99 "--operation={}".format(operation), 100 "--src_file={}".format(src), 101 "--dst_file={}".format(tgt_file.name), 102 ] 103 if subprocess.call(cmd) != 0: 104 raise Error( 105 "Puffin failed to do {} command: {}".format( 106 operation, cmd 107 ) 108 ) 109 110 if not filecmp.cmp(src, tgt_file.name): 111 raise Error( 112 "The generated file {} is not equivalent to the " 113 "original file {} after {} operation.".format( 114 tgt_file.name, src, operation 115 ) 116 ) 117 118 if _PUFFDIFF not in args.disabled_tests: 119 # Run puffdiff and puffpatch for each pairs of files in the corpus. 120 for src in files: 121 for tgt in files: 122 with tempfile.NamedTemporaryFile() as patch, \ 123 tempfile.NamedTemporaryFile() as new_tgt: 124 operation = "puffdiff" 125 logging.debug( 126 "Running %s on %s (%d) and %s (%d)", 127 operation, 128 os.path.basename(src), 129 os.stat(src).st_size, 130 os.path.basename(tgt), 131 os.stat(tgt).st_size, 132 ) 133 cmd = [ 134 "puffin", 135 "--operation={}".format(operation), 136 "--src_file={}".format(src), 137 "--dst_file={}".format(tgt), 138 "--patch_file={}".format(patch.name), 139 ] 140 141 # Running the puffdiff operation 142 if subprocess.call(cmd) != 0: 143 raise Error( 144 "Puffin failed to do {} command: {}".format( 145 operation, cmd 146 ) 147 ) 148 149 logging.debug( 150 "Patch size is: %d", os.stat(patch.name).st_size 151 ) 152 153 operation = "puffpatch" 154 logging.debug( 155 "Running %s on src file %s and patch %s", 156 operation, 157 os.path.basename(src), 158 patch.name, 159 ) 160 cmd = [ 161 "puffin", 162 "--operation={}".format(operation), 163 "--src_file={}".format(src), 164 "--dst_file={}".format(new_tgt.name), 165 "--patch_file={}".format(patch.name), 166 ] 167 if args.cache_size: 168 cmd += ["--cache_size={}".format(args.cache_size)] 169 170 # Running the puffpatch operation 171 if subprocess.call(cmd) != 0: 172 raise Error( 173 "Puffin failed to do {} command: {}".format( 174 operation, cmd 175 ) 176 ) 177 178 if not filecmp.cmp(tgt, new_tgt.name): 179 raise Error( 180 "The generated file {} is not equivalent to the " 181 "original file {} after puffpatch " 182 "operation.".format( 183 new_tgt.name, tgt 184 ) 185 ) 186 187 return 0 188 189 190if __name__ == "__main__": 191 sys.exit(main(sys.argv)) 192