#!/usr/bin/env python3 # Copyright 2018 The ChromiumOS Authors # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """A tool for running puffin tests in a corpus of deflate compressed files.""" import argparse import filecmp import logging import os import subprocess import sys import tempfile _PUFFHUFF = "puffhuff" _PUFFDIFF = "puffdiff" TESTS = (_PUFFHUFF, _PUFFDIFF) class Error(Exception): """Puffin general processing error.""" def ParseArguments(argv): """Parses and Validates command line arguments. Args: argv: command line arguments to parse. Returns: The arguments list. """ parser = argparse.ArgumentParser() parser.add_argument( "corpus", metavar="CORPUS", help="A corpus directory containing compressed files", ) parser.add_argument( "-d", "--disabled_tests", default=(), metavar="", nargs="*", help=( "Space separated list of tests to disable. " "Allowed options include: " + ", ".join(TESTS) ), choices=TESTS, ) parser.add_argument( "--cache_size", type=int, metavar="SIZE", help="The size (in bytes) of the cache for puffpatch " "operations.", ) parser.add_argument( "--debug", action="store_true", help="Turns on verbosity." ) # Parse command-line arguments. args = parser.parse_args(argv) if not os.path.isdir(args.corpus): raise Error( "Corpus directory {} is non-existent or inaccesible".format( args.corpus ) ) return args def main(argv): """The main function.""" args = ParseArguments(argv[1:]) if args.debug: logging.getLogger().setLevel(logging.DEBUG) # Construct list of appropriate files. files = list( filter( os.path.isfile, [os.path.join(args.corpus, f) for f in os.listdir(args.corpus)], ) ) # For each file in corpus run puffhuff. if _PUFFHUFF not in args.disabled_tests: for src in files: with tempfile.NamedTemporaryFile() as tgt_file: operation = "puffhuff" logging.debug("Running %s on %s", operation, src) cmd = [ "puffin", "--operation={}".format(operation), "--src_file={}".format(src), "--dst_file={}".format(tgt_file.name), ] if subprocess.call(cmd) != 0: raise Error( "Puffin failed to do {} command: {}".format( operation, cmd ) ) if not filecmp.cmp(src, tgt_file.name): raise Error( "The generated file {} is not equivalent to the " "original file {} after {} operation.".format( tgt_file.name, src, operation ) ) if _PUFFDIFF not in args.disabled_tests: # Run puffdiff and puffpatch for each pairs of files in the corpus. for src in files: for tgt in files: with tempfile.NamedTemporaryFile() as patch, \ tempfile.NamedTemporaryFile() as new_tgt: operation = "puffdiff" logging.debug( "Running %s on %s (%d) and %s (%d)", operation, os.path.basename(src), os.stat(src).st_size, os.path.basename(tgt), os.stat(tgt).st_size, ) cmd = [ "puffin", "--operation={}".format(operation), "--src_file={}".format(src), "--dst_file={}".format(tgt), "--patch_file={}".format(patch.name), ] # Running the puffdiff operation if subprocess.call(cmd) != 0: raise Error( "Puffin failed to do {} command: {}".format( operation, cmd ) ) logging.debug( "Patch size is: %d", os.stat(patch.name).st_size ) operation = "puffpatch" logging.debug( "Running %s on src file %s and patch %s", operation, os.path.basename(src), patch.name, ) cmd = [ "puffin", "--operation={}".format(operation), "--src_file={}".format(src), "--dst_file={}".format(new_tgt.name), "--patch_file={}".format(patch.name), ] if args.cache_size: cmd += ["--cache_size={}".format(args.cache_size)] # Running the puffpatch operation if subprocess.call(cmd) != 0: raise Error( "Puffin failed to do {} command: {}".format( operation, cmd ) ) if not filecmp.cmp(tgt, new_tgt.name): raise Error( "The generated file {} is not equivalent to the " "original file {} after puffpatch " "operation.".format( new_tgt.name, tgt ) ) return 0 if __name__ == "__main__": sys.exit(main(sys.argv))