xref: /aosp_15_r20/external/puffin/scripts/test_corpus.py (revision 07fb1d065b7cfb4729786fadd42a612532d2f466)
1#!/usr/bin/env python3
2# Copyright 2018 The ChromiumOS Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""A tool for running puffin tests in a corpus of deflate compressed files."""
7
8import argparse
9import filecmp
10import logging
11import os
12import subprocess
13import sys
14import tempfile
15
16
17_PUFFHUFF = "puffhuff"
18_PUFFDIFF = "puffdiff"
19TESTS = (_PUFFHUFF, _PUFFDIFF)
20
21
22class Error(Exception):
23    """Puffin general processing error."""
24
25
26def ParseArguments(argv):
27    """Parses and Validates command line arguments.
28
29    Args:
30        argv: command line arguments to parse.
31
32    Returns:
33        The arguments list.
34    """
35    parser = argparse.ArgumentParser()
36
37    parser.add_argument(
38        "corpus",
39        metavar="CORPUS",
40        help="A corpus directory containing compressed files",
41    )
42    parser.add_argument(
43        "-d",
44        "--disabled_tests",
45        default=(),
46        metavar="",
47        nargs="*",
48        help=(
49            "Space separated list of tests to disable. "
50            "Allowed options include: " + ", ".join(TESTS)
51        ),
52        choices=TESTS,
53    )
54    parser.add_argument(
55        "--cache_size",
56        type=int,
57        metavar="SIZE",
58        help="The size (in bytes) of the cache for puffpatch " "operations.",
59    )
60    parser.add_argument(
61        "--debug", action="store_true", help="Turns on verbosity."
62    )
63
64    # Parse command-line arguments.
65    args = parser.parse_args(argv)
66
67    if not os.path.isdir(args.corpus):
68        raise Error(
69            "Corpus directory {} is non-existent or inaccesible".format(
70                args.corpus
71            )
72        )
73    return args
74
75
76def main(argv):
77    """The main function."""
78    args = ParseArguments(argv[1:])
79
80    if args.debug:
81        logging.getLogger().setLevel(logging.DEBUG)
82
83    # Construct list of appropriate files.
84    files = list(
85        filter(
86            os.path.isfile,
87            [os.path.join(args.corpus, f) for f in os.listdir(args.corpus)],
88        )
89    )
90
91    # For each file in corpus run puffhuff.
92    if _PUFFHUFF not in args.disabled_tests:
93        for src in files:
94            with tempfile.NamedTemporaryFile() as tgt_file:
95                operation = "puffhuff"
96                logging.debug("Running %s on %s", operation, src)
97                cmd = [
98                    "puffin",
99                    "--operation={}".format(operation),
100                    "--src_file={}".format(src),
101                    "--dst_file={}".format(tgt_file.name),
102                ]
103                if subprocess.call(cmd) != 0:
104                    raise Error(
105                        "Puffin failed to do {} command: {}".format(
106                            operation, cmd
107                        )
108                    )
109
110                if not filecmp.cmp(src, tgt_file.name):
111                    raise Error(
112                        "The generated file {} is not equivalent to the "
113                        "original file {} after {} operation.".format(
114                            tgt_file.name, src, operation
115                        )
116                    )
117
118    if _PUFFDIFF not in args.disabled_tests:
119        # Run puffdiff and puffpatch for each pairs of files in the corpus.
120        for src in files:
121            for tgt in files:
122                with tempfile.NamedTemporaryFile() as patch, \
123                tempfile.NamedTemporaryFile() as new_tgt:
124                    operation = "puffdiff"
125                    logging.debug(
126                        "Running %s on %s (%d) and %s (%d)",
127                        operation,
128                        os.path.basename(src),
129                        os.stat(src).st_size,
130                        os.path.basename(tgt),
131                        os.stat(tgt).st_size,
132                    )
133                    cmd = [
134                        "puffin",
135                        "--operation={}".format(operation),
136                        "--src_file={}".format(src),
137                        "--dst_file={}".format(tgt),
138                        "--patch_file={}".format(patch.name),
139                    ]
140
141                    # Running the puffdiff operation
142                    if subprocess.call(cmd) != 0:
143                        raise Error(
144                            "Puffin failed to do {} command: {}".format(
145                                operation, cmd
146                            )
147                        )
148
149                    logging.debug(
150                        "Patch size is: %d", os.stat(patch.name).st_size
151                    )
152
153                    operation = "puffpatch"
154                    logging.debug(
155                        "Running %s on src file %s and patch %s",
156                        operation,
157                        os.path.basename(src),
158                        patch.name,
159                    )
160                    cmd = [
161                        "puffin",
162                        "--operation={}".format(operation),
163                        "--src_file={}".format(src),
164                        "--dst_file={}".format(new_tgt.name),
165                        "--patch_file={}".format(patch.name),
166                    ]
167                    if args.cache_size:
168                        cmd += ["--cache_size={}".format(args.cache_size)]
169
170                    # Running the puffpatch operation
171                    if subprocess.call(cmd) != 0:
172                        raise Error(
173                            "Puffin failed to do {} command: {}".format(
174                                operation, cmd
175                            )
176                        )
177
178                    if not filecmp.cmp(tgt, new_tgt.name):
179                        raise Error(
180                            "The generated file {} is not equivalent to the "
181                            "original file {} after puffpatch "
182                            "operation.".format(
183                                new_tgt.name, tgt
184                            )
185                        )
186
187    return 0
188
189
190if __name__ == "__main__":
191    sys.exit(main(sys.argv))
192