From 223e34b057b95604f07c53e984b199c56140e309 Mon Sep 17 00:00:00 2001 From: Phil Sutter Date: Fri, 21 Oct 2022 12:15:21 +0200 Subject: tests: xlate-test: Replay results for reverse direction testing Call nft with translation output as input, then check xtables-save output to make sure iptables-nft can handle anything it suggests nft to turn its ruleset into. This extends the test case syntax to cover for expected asymmetries. When the existing syntax was something like this: | | | [ The new syntax then is: | [;] | | [] To keep things terse, may omit the obligatory '-A ' argument. If missing, is sanitized for how it would appear in xtables-save output: '-I' is converted into '-A' and an optional table spec is removed. Since replay mode has to manipulate the ruleset in-kernel, abort if called by unprivileged user. Also try to run in own net namespace to reduce collateral damage. Signed-off-by: Phil Sutter --- xlate-test.py | 145 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 144 insertions(+), 1 deletion(-) (limited to 'xlate-test.py') diff --git a/xlate-test.py b/xlate-test.py index bfcddde0..f3fcd797 100755 --- a/xlate-test.py +++ b/xlate-test.py @@ -57,8 +57,92 @@ def test_one_xlate(name, sourceline, expected, result): return True +def test_one_replay(name, sourceline, expected, result): + global args + + searchline = None + if sourceline.find(';') >= 0: + sourceline, searchline = sourceline.split(';') + + srcwords = sourceline.split() + + srccmd = srcwords[0] + table_idx = -1 + chain_idx = -1 + table_name = "filter" + chain_name = None + for idx in range(1, len(srcwords)): + if srcwords[idx] in ["-A", "-I", "--append", "--insert"]: + chain_idx = idx + chain_name = srcwords[idx + 1] + elif srcwords[idx] in ["-t", "--table"]: + table_idx = idx + table_name = srcwords[idx + 1] + + if not chain_name: + return True # nothing to do? + + if searchline is None: + # adjust sourceline as required + srcwords[chain_idx] = "-A" + if table_idx >= 0: + srcwords.pop(table_idx) + srcwords.pop(table_idx) + searchline = " ".join(srcwords[1:]) + elif not searchline.startswith("-A"): + tmp = ["-A", chain_name] + if len(searchline) > 0: + tmp.extend(searchline) + searchline = " ".join(tmp) + + fam = "" + if srccmd.startswith("ip6"): + fam = "ip6 " + elif srccmd.startswith("ebt"): + fam = "bridge " + nft_input = [ + "flush ruleset", + "add table " + fam + table_name, + "add chain " + fam + table_name + " " + chain_name + ] + [ l.removeprefix("nft ") for l in expected.split("\n") ] + + # feed input via the pipe to make sure the shell "does its thing" + cmd = "echo \"" + "\n".join(nft_input) + "\" | " + args.nft + " -f -" + rc, output, error = run_proc(cmd, shell = True) + if rc != 0: + result.append(name + ": " + red("Fail")) + result.append(args.nft + " call failed: " + error.rstrip('\n')) + for line in nft_input: + result.append(magenta("input: ") + line) + return False + + ipt = srccmd.split('-')[0] + rc, output, error = run_proc([xtables_nft_multi, ipt + "-save"]) + if rc != 0: + result.append(name + ": " + red("Fail")) + result.append(ipt + "-save call failed: " + error) + return False + + if output.find(searchline) < 0: + outline = None + for l in output.split('\n'): + if l.startswith('-A '): + output = l + break + result.append(name + ": " + red("Replay fail")) + result.append(magenta("src: '") + expected + "'") + result.append(magenta("exp: '") + searchline + "'") + for l in output.split('\n'): + result.append(magenta("res: ") + l) + return False + + return True + + def run_test(name, payload): global xtables_nft_multi + global args + test_passed = True tests = passed = failed = errors = 0 result = [] @@ -69,7 +153,10 @@ def run_test(name, payload): line = payload.readline() continue - sourceline = line + sourceline = replayline = line.rstrip("\n") + if line.find(';') >= 0: + sourceline = line.split(';')[0] + expected = payload.readline().rstrip(" \n") next_expected = payload.readline() if next_expected.startswith("nft"): @@ -84,6 +171,20 @@ def run_test(name, payload): else: errors += 1 test_passed = False + continue + + if args.replay: + tests += 1 + if test_one_replay(name, replayline, expected, result): + passed += 1 + else: + errors += 1 + test_passed = False + + rc, output, error = run_proc([args.nft, "flush", "ruleset"]) + if rc != 0: + result.append(name + ": " + red("Fail")) + result.append("nft flush ruleset call failed: " + error) if (passed == tests) and not args.test: print(name + ": " + green("OK")) @@ -106,8 +207,44 @@ def load_test_files(): return (test_files, total_tests, total_passed, total_failed, total_error) +def spawn_netns(): + # prefer unshare module + try: + import unshare + unshare.unshare(unshare.CLONE_NEWNET) + return True + except: + pass + + # sledgehammer style: + # - call ourselves prefixed by 'unshare -n' if found + # - pass extra --no-netns parameter to avoid another recursion + try: + import shutil + + unshare = shutil.which("unshare") + if unshare is None: + return False + + sys.argv.append("--no-netns") + os.execv(unshare, [unshare, "-n", sys.executable] + sys.argv) + except: + pass + + return False + + def main(): global xtables_nft_multi + + if args.replay: + if os.getuid() != 0: + print("Replay test requires root, sorry", file=sys.stderr) + return + if not args.no_netns and not spawn_netns(): + print("Cannot run in own namespace, connectivity might break", + file=sys.stderr) + if not args.host: os.putenv("XTABLES_LIBDIR", os.path.abspath("extensions")) xtables_nft_multi = os.path.abspath(os.path.curdir) \ @@ -139,6 +276,12 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument('-H', '--host', action='store_true', help='Run tests against installed binaries') +parser.add_argument('-R', '--replay', action='store_true', + help='Replay tests to check iptables-nft parser') +parser.add_argument('-n', '--nft', type=str, default='nft', + help='Replay using given nft binary (default: \'%(default)s\')') +parser.add_argument('--no-netns', action='store_true', + help='Do not run testsuite in own network namespace') parser.add_argument("test", nargs="?", help="run only the specified test file") args = parser.parse_args() sys.exit(main()) -- cgit v1.2.3