summaryrefslogtreecommitdiffstats
path: root/xlate-test.py
diff options
context:
space:
mode:
authorPhil Sutter <phil@nwl.cc>2022-10-21 12:15:21 +0200
committerPhil Sutter <phil@nwl.cc>2022-11-11 19:14:28 +0100
commit223e34b057b95604f07c53e984b199c56140e309 (patch)
tree0ec4a2f11d2a89ec86ab5c34053a35f44d9fafc6 /xlate-test.py
parent595cad95fd2f61c6bc71e521ab58556f13648c30 (diff)
tests: xlate-test: Replay results for reverse direction testing
Call nft with translation output as input, then check xtables-save output to make sure iptables-nft can handle anything it suggests nft to turn its ruleset into. This extends the test case syntax to cover for expected asymmetries. When the existing syntax was something like this: | <xlate command> | <nft output1> | [<nft output2> The new syntax then is: | <xlate command>[;<replay rule part>] | <nft output1> | [<nft output2>] To keep things terse, <replay rule part> may omit the obligatory '-A <chain>' argument. If missing, <xlate command> is sanitized for how it would appear in xtables-save output: '-I' is converted into '-A' and an optional table spec is removed. Since replay mode has to manipulate the ruleset in-kernel, abort if called by unprivileged user. Also try to run in own net namespace to reduce collateral damage. Signed-off-by: Phil Sutter <phil@nwl.cc>
Diffstat (limited to 'xlate-test.py')
-rwxr-xr-xxlate-test.py145
1 files changed, 144 insertions, 1 deletions
diff --git a/xlate-test.py b/xlate-test.py
index bfcddde0..f3fcd797 100755
--- a/xlate-test.py
+++ b/xlate-test.py
@@ -57,8 +57,92 @@ def test_one_xlate(name, sourceline, expected, result):
return True
+def test_one_replay(name, sourceline, expected, result):
+ global args
+
+ searchline = None
+ if sourceline.find(';') >= 0:
+ sourceline, searchline = sourceline.split(';')
+
+ srcwords = sourceline.split()
+
+ srccmd = srcwords[0]
+ table_idx = -1
+ chain_idx = -1
+ table_name = "filter"
+ chain_name = None
+ for idx in range(1, len(srcwords)):
+ if srcwords[idx] in ["-A", "-I", "--append", "--insert"]:
+ chain_idx = idx
+ chain_name = srcwords[idx + 1]
+ elif srcwords[idx] in ["-t", "--table"]:
+ table_idx = idx
+ table_name = srcwords[idx + 1]
+
+ if not chain_name:
+ return True # nothing to do?
+
+ if searchline is None:
+ # adjust sourceline as required
+ srcwords[chain_idx] = "-A"
+ if table_idx >= 0:
+ srcwords.pop(table_idx)
+ srcwords.pop(table_idx)
+ searchline = " ".join(srcwords[1:])
+ elif not searchline.startswith("-A"):
+ tmp = ["-A", chain_name]
+ if len(searchline) > 0:
+ tmp.extend(searchline)
+ searchline = " ".join(tmp)
+
+ fam = ""
+ if srccmd.startswith("ip6"):
+ fam = "ip6 "
+ elif srccmd.startswith("ebt"):
+ fam = "bridge "
+ nft_input = [
+ "flush ruleset",
+ "add table " + fam + table_name,
+ "add chain " + fam + table_name + " " + chain_name
+ ] + [ l.removeprefix("nft ") for l in expected.split("\n") ]
+
+ # feed input via the pipe to make sure the shell "does its thing"
+ cmd = "echo \"" + "\n".join(nft_input) + "\" | " + args.nft + " -f -"
+ rc, output, error = run_proc(cmd, shell = True)
+ if rc != 0:
+ result.append(name + ": " + red("Fail"))
+ result.append(args.nft + " call failed: " + error.rstrip('\n'))
+ for line in nft_input:
+ result.append(magenta("input: ") + line)
+ return False
+
+ ipt = srccmd.split('-')[0]
+ rc, output, error = run_proc([xtables_nft_multi, ipt + "-save"])
+ if rc != 0:
+ result.append(name + ": " + red("Fail"))
+ result.append(ipt + "-save call failed: " + error)
+ return False
+
+ if output.find(searchline) < 0:
+ outline = None
+ for l in output.split('\n'):
+ if l.startswith('-A '):
+ output = l
+ break
+ result.append(name + ": " + red("Replay fail"))
+ result.append(magenta("src: '") + expected + "'")
+ result.append(magenta("exp: '") + searchline + "'")
+ for l in output.split('\n'):
+ result.append(magenta("res: ") + l)
+ return False
+
+ return True
+
+
def run_test(name, payload):
global xtables_nft_multi
+ global args
+
test_passed = True
tests = passed = failed = errors = 0
result = []
@@ -69,7 +153,10 @@ def run_test(name, payload):
line = payload.readline()
continue
- sourceline = line
+ sourceline = replayline = line.rstrip("\n")
+ if line.find(';') >= 0:
+ sourceline = line.split(';')[0]
+
expected = payload.readline().rstrip(" \n")
next_expected = payload.readline()
if next_expected.startswith("nft"):
@@ -84,6 +171,20 @@ def run_test(name, payload):
else:
errors += 1
test_passed = False
+ continue
+
+ if args.replay:
+ tests += 1
+ if test_one_replay(name, replayline, expected, result):
+ passed += 1
+ else:
+ errors += 1
+ test_passed = False
+
+ rc, output, error = run_proc([args.nft, "flush", "ruleset"])
+ if rc != 0:
+ result.append(name + ": " + red("Fail"))
+ result.append("nft flush ruleset call failed: " + error)
if (passed == tests) and not args.test:
print(name + ": " + green("OK"))
@@ -106,8 +207,44 @@ def load_test_files():
return (test_files, total_tests, total_passed, total_failed, total_error)
+def spawn_netns():
+ # prefer unshare module
+ try:
+ import unshare
+ unshare.unshare(unshare.CLONE_NEWNET)
+ return True
+ except:
+ pass
+
+ # sledgehammer style:
+ # - call ourselves prefixed by 'unshare -n' if found
+ # - pass extra --no-netns parameter to avoid another recursion
+ try:
+ import shutil
+
+ unshare = shutil.which("unshare")
+ if unshare is None:
+ return False
+
+ sys.argv.append("--no-netns")
+ os.execv(unshare, [unshare, "-n", sys.executable] + sys.argv)
+ except:
+ pass
+
+ return False
+
+
def main():
global xtables_nft_multi
+
+ if args.replay:
+ if os.getuid() != 0:
+ print("Replay test requires root, sorry", file=sys.stderr)
+ return
+ if not args.no_netns and not spawn_netns():
+ print("Cannot run in own namespace, connectivity might break",
+ file=sys.stderr)
+
if not args.host:
os.putenv("XTABLES_LIBDIR", os.path.abspath("extensions"))
xtables_nft_multi = os.path.abspath(os.path.curdir) \
@@ -139,6 +276,12 @@ def main():
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', action='store_true',
help='Run tests against installed binaries')
+parser.add_argument('-R', '--replay', action='store_true',
+ help='Replay tests to check iptables-nft parser')
+parser.add_argument('-n', '--nft', type=str, default='nft',
+ help='Replay using given nft binary (default: \'%(default)s\')')
+parser.add_argument('--no-netns', action='store_true',
+ help='Do not run testsuite in own network namespace')
parser.add_argument("test", nargs="?", help="run only the specified test file")
args = parser.parse_args()
sys.exit(main())