xref: /aosp_15_r20/external/iptables/xlate-test.py (revision a71a954618bbadd4a345637e5edcf36eec826889)
1*a71a9546SAutomerger Merge Worker#!/usr/bin/env python3
2*a71a9546SAutomerger Merge Worker# encoding: utf-8
3*a71a9546SAutomerger Merge Worker
4*a71a9546SAutomerger Merge Workerimport os
5*a71a9546SAutomerger Merge Workerimport sys
6*a71a9546SAutomerger Merge Workerimport shlex
7*a71a9546SAutomerger Merge Workerimport argparse
8*a71a9546SAutomerger Merge Workerfrom subprocess import Popen, PIPE
9*a71a9546SAutomerger Merge Worker
10*a71a9546SAutomerger Merge Workerdef run_proc(args, shell = False, input = None):
11*a71a9546SAutomerger Merge Worker    """A simple wrapper around Popen, returning (rc, stdout, stderr)"""
12*a71a9546SAutomerger Merge Worker    process = Popen(args, text = True, shell = shell,
13*a71a9546SAutomerger Merge Worker                    stdin = PIPE, stdout = PIPE, stderr = PIPE)
14*a71a9546SAutomerger Merge Worker    output, error = process.communicate(input)
15*a71a9546SAutomerger Merge Worker    return (process.returncode, output, error)
16*a71a9546SAutomerger Merge Worker
17*a71a9546SAutomerger Merge Workerkeywords = ("iptables-translate", "ip6tables-translate", "ebtables-translate")
18*a71a9546SAutomerger Merge Workerxtables_nft_multi = 'xtables-nft-multi'
19*a71a9546SAutomerger Merge Worker
20*a71a9546SAutomerger Merge Workerif sys.stdout.isatty():
21*a71a9546SAutomerger Merge Worker    colors = {"magenta": "\033[95m", "green": "\033[92m", "yellow": "\033[93m",
22*a71a9546SAutomerger Merge Worker              "red": "\033[91m", "end": "\033[0m"}
23*a71a9546SAutomerger Merge Workerelse:
24*a71a9546SAutomerger Merge Worker    colors = {"magenta": "", "green": "", "yellow": "", "red": "", "end": ""}
25*a71a9546SAutomerger Merge Worker
26*a71a9546SAutomerger Merge Worker
27*a71a9546SAutomerger Merge Workerdef magenta(string):
28*a71a9546SAutomerger Merge Worker    return colors["magenta"] + string + colors["end"]
29*a71a9546SAutomerger Merge Worker
30*a71a9546SAutomerger Merge Worker
31*a71a9546SAutomerger Merge Workerdef red(string):
32*a71a9546SAutomerger Merge Worker    return colors["red"] + string + colors["end"]
33*a71a9546SAutomerger Merge Worker
34*a71a9546SAutomerger Merge Worker
35*a71a9546SAutomerger Merge Workerdef yellow(string):
36*a71a9546SAutomerger Merge Worker    return colors["yellow"] + string + colors["end"]
37*a71a9546SAutomerger Merge Worker
38*a71a9546SAutomerger Merge Worker
39*a71a9546SAutomerger Merge Workerdef green(string):
40*a71a9546SAutomerger Merge Worker    return colors["green"] + string + colors["end"]
41*a71a9546SAutomerger Merge Worker
42*a71a9546SAutomerger Merge Worker
43*a71a9546SAutomerger Merge Workerdef test_one_xlate(name, sourceline, expected, result):
44*a71a9546SAutomerger Merge Worker    rc, output, error = run_proc([xtables_nft_multi] + shlex.split(sourceline))
45*a71a9546SAutomerger Merge Worker    if rc != 0:
46*a71a9546SAutomerger Merge Worker        result.append(name + ": " + red("Error: ") + "iptables-translate failure")
47*a71a9546SAutomerger Merge Worker        result.append(error)
48*a71a9546SAutomerger Merge Worker        return False
49*a71a9546SAutomerger Merge Worker
50*a71a9546SAutomerger Merge Worker    translation = output.rstrip(" \n")
51*a71a9546SAutomerger Merge Worker    if translation != expected:
52*a71a9546SAutomerger Merge Worker        result.append(name + ": " + red("Fail"))
53*a71a9546SAutomerger Merge Worker        result.append(magenta("src: ") + sourceline.rstrip(" \n"))
54*a71a9546SAutomerger Merge Worker        result.append(magenta("exp: ") + expected)
55*a71a9546SAutomerger Merge Worker        result.append(magenta("res: ") + translation + "\n")
56*a71a9546SAutomerger Merge Worker        return False
57*a71a9546SAutomerger Merge Worker
58*a71a9546SAutomerger Merge Worker    return True
59*a71a9546SAutomerger Merge Worker
60*a71a9546SAutomerger Merge Workerdef test_one_replay(name, sourceline, expected, result):
61*a71a9546SAutomerger Merge Worker    global args
62*a71a9546SAutomerger Merge Worker
63*a71a9546SAutomerger Merge Worker    searchline = None
64*a71a9546SAutomerger Merge Worker    if sourceline.find(';') >= 0:
65*a71a9546SAutomerger Merge Worker        sourceline, searchline = sourceline.split(';')
66*a71a9546SAutomerger Merge Worker
67*a71a9546SAutomerger Merge Worker    srcwords = shlex.split(sourceline)
68*a71a9546SAutomerger Merge Worker
69*a71a9546SAutomerger Merge Worker    srccmd = srcwords[0]
70*a71a9546SAutomerger Merge Worker    ipt = srccmd.split('-')[0]
71*a71a9546SAutomerger Merge Worker    table_idx = -1
72*a71a9546SAutomerger Merge Worker    chain_idx = -1
73*a71a9546SAutomerger Merge Worker    table_name = "filter"
74*a71a9546SAutomerger Merge Worker    chain_name = None
75*a71a9546SAutomerger Merge Worker    for idx in range(1, len(srcwords)):
76*a71a9546SAutomerger Merge Worker        if srcwords[idx] in ["-A", "-I", "--append", "--insert"]:
77*a71a9546SAutomerger Merge Worker            chain_idx = idx
78*a71a9546SAutomerger Merge Worker            chain_name = srcwords[idx + 1]
79*a71a9546SAutomerger Merge Worker        elif srcwords[idx] in ["-t", "--table"]:
80*a71a9546SAutomerger Merge Worker            table_idx = idx
81*a71a9546SAutomerger Merge Worker            table_name = srcwords[idx + 1]
82*a71a9546SAutomerger Merge Worker
83*a71a9546SAutomerger Merge Worker    if not chain_name:
84*a71a9546SAutomerger Merge Worker        return True     # nothing to do?
85*a71a9546SAutomerger Merge Worker
86*a71a9546SAutomerger Merge Worker    if searchline is None:
87*a71a9546SAutomerger Merge Worker        # adjust sourceline as required
88*a71a9546SAutomerger Merge Worker        checkcmd = srcwords[:]
89*a71a9546SAutomerger Merge Worker        checkcmd[0] = ipt
90*a71a9546SAutomerger Merge Worker        checkcmd[chain_idx] = "--check"
91*a71a9546SAutomerger Merge Worker    else:
92*a71a9546SAutomerger Merge Worker        checkcmd = [ipt, "-t", table_name]
93*a71a9546SAutomerger Merge Worker        checkcmd += ["--check", chain_name, searchline]
94*a71a9546SAutomerger Merge Worker
95*a71a9546SAutomerger Merge Worker    fam = ""
96*a71a9546SAutomerger Merge Worker    if srccmd.startswith("ip6"):
97*a71a9546SAutomerger Merge Worker        fam = "ip6 "
98*a71a9546SAutomerger Merge Worker    elif srccmd.startswith("ebt"):
99*a71a9546SAutomerger Merge Worker        fam = "bridge "
100*a71a9546SAutomerger Merge Worker
101*a71a9546SAutomerger Merge Worker    expected = [ l.removeprefix("nft ").strip(" '") for l in expected.split("\n") ]
102*a71a9546SAutomerger Merge Worker    nft_input = [
103*a71a9546SAutomerger Merge Worker            "flush ruleset",
104*a71a9546SAutomerger Merge Worker            "add table " + fam + table_name,
105*a71a9546SAutomerger Merge Worker            "add chain " + fam + table_name + " " + chain_name,
106*a71a9546SAutomerger Merge Worker    ] + expected
107*a71a9546SAutomerger Merge Worker
108*a71a9546SAutomerger Merge Worker    rc, output, error = run_proc([args.nft, "-f", "-"], shell = False, input = "\n".join(nft_input))
109*a71a9546SAutomerger Merge Worker    if rc != 0:
110*a71a9546SAutomerger Merge Worker        result.append(name + ": " + red("Replay Fail"))
111*a71a9546SAutomerger Merge Worker        result.append(args.nft + " call failed: " + error.rstrip('\n'))
112*a71a9546SAutomerger Merge Worker        for line in nft_input:
113*a71a9546SAutomerger Merge Worker            result.append(magenta("input: ") + line)
114*a71a9546SAutomerger Merge Worker        return False
115*a71a9546SAutomerger Merge Worker
116*a71a9546SAutomerger Merge Worker    rc, output, error = run_proc([xtables_nft_multi] + checkcmd)
117*a71a9546SAutomerger Merge Worker    if rc != 0:
118*a71a9546SAutomerger Merge Worker        result.append(name + ": " + red("Check Fail"))
119*a71a9546SAutomerger Merge Worker        result.append(magenta("check: ") + " ".join(checkcmd))
120*a71a9546SAutomerger Merge Worker        result.append(magenta("error: ") + error)
121*a71a9546SAutomerger Merge Worker        rc, output, error = run_proc([xtables_nft_multi, ipt + "-save"])
122*a71a9546SAutomerger Merge Worker        for l in output.split("\n"):
123*a71a9546SAutomerger Merge Worker            result.append(magenta("ipt: ") + l)
124*a71a9546SAutomerger Merge Worker        rc, output, error = run_proc([args.nft, "list", "ruleset"])
125*a71a9546SAutomerger Merge Worker        for l in output.split("\n"):
126*a71a9546SAutomerger Merge Worker            result.append(magenta("nft: ") + l)
127*a71a9546SAutomerger Merge Worker        return False
128*a71a9546SAutomerger Merge Worker
129*a71a9546SAutomerger Merge Worker    return True
130*a71a9546SAutomerger Merge Worker
131*a71a9546SAutomerger Merge Worker
132*a71a9546SAutomerger Merge Workerdef run_test(name, payload):
133*a71a9546SAutomerger Merge Worker    global xtables_nft_multi
134*a71a9546SAutomerger Merge Worker    global args
135*a71a9546SAutomerger Merge Worker
136*a71a9546SAutomerger Merge Worker    test_passed = True
137*a71a9546SAutomerger Merge Worker    tests = passed = failed = errors = 0
138*a71a9546SAutomerger Merge Worker    result = []
139*a71a9546SAutomerger Merge Worker
140*a71a9546SAutomerger Merge Worker    line = payload.readline()
141*a71a9546SAutomerger Merge Worker    while line:
142*a71a9546SAutomerger Merge Worker        if not line.startswith(keywords):
143*a71a9546SAutomerger Merge Worker            line = payload.readline()
144*a71a9546SAutomerger Merge Worker            continue
145*a71a9546SAutomerger Merge Worker
146*a71a9546SAutomerger Merge Worker        sourceline = replayline = line.rstrip("\n")
147*a71a9546SAutomerger Merge Worker        if line.find(';') >= 0:
148*a71a9546SAutomerger Merge Worker            sourceline = line.split(';')[0]
149*a71a9546SAutomerger Merge Worker
150*a71a9546SAutomerger Merge Worker        expected = payload.readline().rstrip(" \n")
151*a71a9546SAutomerger Merge Worker        next_expected = payload.readline()
152*a71a9546SAutomerger Merge Worker        if next_expected.startswith("nft"):
153*a71a9546SAutomerger Merge Worker            expected += "\n" + next_expected.rstrip(" \n")
154*a71a9546SAutomerger Merge Worker            line = payload.readline()
155*a71a9546SAutomerger Merge Worker        else:
156*a71a9546SAutomerger Merge Worker            line = next_expected
157*a71a9546SAutomerger Merge Worker
158*a71a9546SAutomerger Merge Worker        tests += 1
159*a71a9546SAutomerger Merge Worker        if test_one_xlate(name, sourceline, expected, result):
160*a71a9546SAutomerger Merge Worker            passed += 1
161*a71a9546SAutomerger Merge Worker        else:
162*a71a9546SAutomerger Merge Worker            errors += 1
163*a71a9546SAutomerger Merge Worker            test_passed = False
164*a71a9546SAutomerger Merge Worker            continue
165*a71a9546SAutomerger Merge Worker
166*a71a9546SAutomerger Merge Worker        if args.replay:
167*a71a9546SAutomerger Merge Worker            tests += 1
168*a71a9546SAutomerger Merge Worker            if test_one_replay(name, replayline, expected, result):
169*a71a9546SAutomerger Merge Worker                passed += 1
170*a71a9546SAutomerger Merge Worker            else:
171*a71a9546SAutomerger Merge Worker                errors += 1
172*a71a9546SAutomerger Merge Worker                test_passed = False
173*a71a9546SAutomerger Merge Worker
174*a71a9546SAutomerger Merge Worker            rc, output, error = run_proc([args.nft, "flush", "ruleset"])
175*a71a9546SAutomerger Merge Worker            if rc != 0:
176*a71a9546SAutomerger Merge Worker                result.append(name + ": " + red("Fail"))
177*a71a9546SAutomerger Merge Worker                result.append("nft flush ruleset call failed: " + error)
178*a71a9546SAutomerger Merge Worker
179*a71a9546SAutomerger Merge Worker    if (passed == tests):
180*a71a9546SAutomerger Merge Worker        print(name + ": " + green("OK"))
181*a71a9546SAutomerger Merge Worker    if not test_passed:
182*a71a9546SAutomerger Merge Worker        print("\n".join(result), file=sys.stderr)
183*a71a9546SAutomerger Merge Worker    return tests, passed, failed, errors
184*a71a9546SAutomerger Merge Worker
185*a71a9546SAutomerger Merge Worker
186*a71a9546SAutomerger Merge Workerdef load_test_files():
187*a71a9546SAutomerger Merge Worker    test_files = total_tests = total_passed = total_error = total_failed = 0
188*a71a9546SAutomerger Merge Worker    tests = sorted(os.listdir("extensions"))
189*a71a9546SAutomerger Merge Worker    for test in ['extensions/' + f for f in tests if f.endswith(".txlate")]:
190*a71a9546SAutomerger Merge Worker        with open(test, "r") as payload:
191*a71a9546SAutomerger Merge Worker            tests, passed, failed, errors = run_test(test, payload)
192*a71a9546SAutomerger Merge Worker            test_files += 1
193*a71a9546SAutomerger Merge Worker            total_tests += tests
194*a71a9546SAutomerger Merge Worker            total_passed += passed
195*a71a9546SAutomerger Merge Worker            total_failed += failed
196*a71a9546SAutomerger Merge Worker            total_error += errors
197*a71a9546SAutomerger Merge Worker    return (test_files, total_tests, total_passed, total_failed, total_error)
198*a71a9546SAutomerger Merge Worker
199*a71a9546SAutomerger Merge Worker
200*a71a9546SAutomerger Merge Workerdef spawn_netns():
201*a71a9546SAutomerger Merge Worker    # prefer unshare module
202*a71a9546SAutomerger Merge Worker    try:
203*a71a9546SAutomerger Merge Worker        import unshare
204*a71a9546SAutomerger Merge Worker        unshare.unshare(unshare.CLONE_NEWNET)
205*a71a9546SAutomerger Merge Worker        return True
206*a71a9546SAutomerger Merge Worker    except:
207*a71a9546SAutomerger Merge Worker        pass
208*a71a9546SAutomerger Merge Worker
209*a71a9546SAutomerger Merge Worker    # sledgehammer style:
210*a71a9546SAutomerger Merge Worker    # - call ourselves prefixed by 'unshare -n' if found
211*a71a9546SAutomerger Merge Worker    # - pass extra --no-netns parameter to avoid another recursion
212*a71a9546SAutomerger Merge Worker    try:
213*a71a9546SAutomerger Merge Worker        import shutil
214*a71a9546SAutomerger Merge Worker
215*a71a9546SAutomerger Merge Worker        unshare = shutil.which("unshare")
216*a71a9546SAutomerger Merge Worker        if unshare is None:
217*a71a9546SAutomerger Merge Worker            return False
218*a71a9546SAutomerger Merge Worker
219*a71a9546SAutomerger Merge Worker        sys.argv.append("--no-netns")
220*a71a9546SAutomerger Merge Worker        os.execv(unshare, [unshare, "-n", sys.executable] + sys.argv)
221*a71a9546SAutomerger Merge Worker    except:
222*a71a9546SAutomerger Merge Worker        pass
223*a71a9546SAutomerger Merge Worker
224*a71a9546SAutomerger Merge Worker    return False
225*a71a9546SAutomerger Merge Worker
226*a71a9546SAutomerger Merge Worker
227*a71a9546SAutomerger Merge Workerdef main():
228*a71a9546SAutomerger Merge Worker    global xtables_nft_multi
229*a71a9546SAutomerger Merge Worker
230*a71a9546SAutomerger Merge Worker    if args.replay:
231*a71a9546SAutomerger Merge Worker        if os.getuid() != 0:
232*a71a9546SAutomerger Merge Worker            print("Replay test requires root, sorry", file=sys.stderr)
233*a71a9546SAutomerger Merge Worker            return
234*a71a9546SAutomerger Merge Worker        if not args.no_netns and not spawn_netns():
235*a71a9546SAutomerger Merge Worker            print("Cannot run in own namespace, connectivity might break",
236*a71a9546SAutomerger Merge Worker                  file=sys.stderr)
237*a71a9546SAutomerger Merge Worker
238*a71a9546SAutomerger Merge Worker    if not args.host:
239*a71a9546SAutomerger Merge Worker        os.putenv("XTABLES_LIBDIR", os.path.abspath("extensions"))
240*a71a9546SAutomerger Merge Worker        xtables_nft_multi = os.path.abspath(os.path.curdir) \
241*a71a9546SAutomerger Merge Worker                            + '/iptables/' + xtables_nft_multi
242*a71a9546SAutomerger Merge Worker
243*a71a9546SAutomerger Merge Worker    files = tests = passed = failed = errors = 0
244*a71a9546SAutomerger Merge Worker    for test in args.test:
245*a71a9546SAutomerger Merge Worker        if not test.endswith(".txlate"):
246*a71a9546SAutomerger Merge Worker            test += ".txlate"
247*a71a9546SAutomerger Merge Worker        try:
248*a71a9546SAutomerger Merge Worker            with open(test, "r") as payload:
249*a71a9546SAutomerger Merge Worker                t, p, f, e = run_test(test, payload)
250*a71a9546SAutomerger Merge Worker                files += 1
251*a71a9546SAutomerger Merge Worker                tests += t
252*a71a9546SAutomerger Merge Worker                passed += p
253*a71a9546SAutomerger Merge Worker                failed += f
254*a71a9546SAutomerger Merge Worker                errors += e
255*a71a9546SAutomerger Merge Worker        except IOError:
256*a71a9546SAutomerger Merge Worker            print(red("Error: ") + "test file does not exist", file=sys.stderr)
257*a71a9546SAutomerger Merge Worker            return 99
258*a71a9546SAutomerger Merge Worker
259*a71a9546SAutomerger Merge Worker    if files == 0:
260*a71a9546SAutomerger Merge Worker        files, tests, passed, failed, errors = load_test_files()
261*a71a9546SAutomerger Merge Worker
262*a71a9546SAutomerger Merge Worker    if files > 1:
263*a71a9546SAutomerger Merge Worker        file_word = "files"
264*a71a9546SAutomerger Merge Worker    else:
265*a71a9546SAutomerger Merge Worker        file_word = "file"
266*a71a9546SAutomerger Merge Worker    print("%d test %s, %d tests, %d tests passed, %d tests failed, %d errors"
267*a71a9546SAutomerger Merge Worker            % (files, file_word, tests, passed, failed, errors))
268*a71a9546SAutomerger Merge Worker    return passed - tests
269*a71a9546SAutomerger Merge Worker
270*a71a9546SAutomerger Merge Worker
271*a71a9546SAutomerger Merge Workerparser = argparse.ArgumentParser()
272*a71a9546SAutomerger Merge Workerparser.add_argument('-H', '--host', action='store_true',
273*a71a9546SAutomerger Merge Worker                    help='Run tests against installed binaries')
274*a71a9546SAutomerger Merge Workerparser.add_argument('-R', '--replay', action='store_true',
275*a71a9546SAutomerger Merge Worker                    help='Replay tests to check iptables-nft parser')
276*a71a9546SAutomerger Merge Workerparser.add_argument('-n', '--nft', type=str, default='nft',
277*a71a9546SAutomerger Merge Worker                    help='Replay using given nft binary (default: \'%(default)s\')')
278*a71a9546SAutomerger Merge Workerparser.add_argument('--no-netns', action='store_true',
279*a71a9546SAutomerger Merge Worker                    help='Do not run testsuite in own network namespace')
280*a71a9546SAutomerger Merge Workerparser.add_argument("test", nargs="*", help="run only the specified test file(s)")
281*a71a9546SAutomerger Merge Workerargs = parser.parse_args()
282*a71a9546SAutomerger Merge Workersys.exit(main())
283