summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xxlate-test.py145
1 files changed, 144 insertions, 1 deletions
diff --git a/xlate-test.py b/xlate-test.py
index bfcddde0..f3fcd797 100755
--- a/xlate-test.py
+++ b/xlate-test.py
@@ -57,8 +57,92 @@ def test_one_xlate(name, sourceline, expected, result):
return True
+def test_one_replay(name, sourceline, expected, result):
+ global args
+
+ searchline = None
+ if sourceline.find(';') >= 0:
+ sourceline, searchline = sourceline.split(';')
+
+ srcwords = sourceline.split()
+
+ srccmd = srcwords[0]
+ table_idx = -1
+ chain_idx = -1
+ table_name = "filter"
+ chain_name = None
+ for idx in range(1, len(srcwords)):
+ if srcwords[idx] in ["-A", "-I", "--append", "--insert"]:
+ chain_idx = idx
+ chain_name = srcwords[idx + 1]
+ elif srcwords[idx] in ["-t", "--table"]:
+ table_idx = idx
+ table_name = srcwords[idx + 1]
+
+ if not chain_name:
+ return True # nothing to do?
+
+ if searchline is None:
+ # adjust sourceline as required
+ srcwords[chain_idx] = "-A"
+ if table_idx >= 0:
+ srcwords.pop(table_idx)
+ srcwords.pop(table_idx)
+ searchline = " ".join(srcwords[1:])
+ elif not searchline.startswith("-A"):
+ tmp = ["-A", chain_name]
+ if len(searchline) > 0:
+ tmp.extend(searchline)
+ searchline = " ".join(tmp)
+
+ fam = ""
+ if srccmd.startswith("ip6"):
+ fam = "ip6 "
+ elif srccmd.startswith("ebt"):
+ fam = "bridge "
+ nft_input = [
+ "flush ruleset",
+ "add table " + fam + table_name,
+ "add chain " + fam + table_name + " " + chain_name
+ ] + [ l.removeprefix("nft ") for l in expected.split("\n") ]
+
+ # feed input via the pipe to make sure the shell "does its thing"
+ cmd = "echo \"" + "\n".join(nft_input) + "\" | " + args.nft + " -f -"
+ rc, output, error = run_proc(cmd, shell = True)
+ if rc != 0:
+ result.append(name + ": " + red("Fail"))
+ result.append(args.nft + " call failed: " + error.rstrip('\n'))
+ for line in nft_input:
+ result.append(magenta("input: ") + line)
+ return False
+
+ ipt = srccmd.split('-')[0]
+ rc, output, error = run_proc([xtables_nft_multi, ipt + "-save"])
+ if rc != 0:
+ result.append(name + ": " + red("Fail"))
+ result.append(ipt + "-save call failed: " + error)
+ return False
+
+ if output.find(searchline) < 0:
+ outline = None
+ for l in output.split('\n'):
+ if l.startswith('-A '):
+ output = l
+ break
+ result.append(name + ": " + red("Replay fail"))
+ result.append(magenta("src: '") + expected + "'")
+ result.append(magenta("exp: '") + searchline + "'")
+ for l in output.split('\n'):
+ result.append(magenta("res: ") + l)
+ return False
+
+ return True
+
+
def run_test(name, payload):
global xtables_nft_multi
+ global args
+
test_passed = True
tests = passed = failed = errors = 0
result = []
@@ -69,7 +153,10 @@ def run_test(name, payload):
line = payload.readline()
continue
- sourceline = line
+ sourceline = replayline = line.rstrip("\n")
+ if line.find(';') >= 0:
+ sourceline = line.split(';')[0]
+
expected = payload.readline().rstrip(" \n")
next_expected = payload.readline()
if next_expected.startswith("nft"):
@@ -84,6 +171,20 @@ def run_test(name, payload):
else:
errors += 1
test_passed = False
+ continue
+
+ if args.replay:
+ tests += 1
+ if test_one_replay(name, replayline, expected, result):
+ passed += 1
+ else:
+ errors += 1
+ test_passed = False
+
+ rc, output, error = run_proc([args.nft, "flush", "ruleset"])
+ if rc != 0:
+ result.append(name + ": " + red("Fail"))
+ result.append("nft flush ruleset call failed: " + error)
if (passed == tests) and not args.test:
print(name + ": " + green("OK"))
@@ -106,8 +207,44 @@ def load_test_files():
return (test_files, total_tests, total_passed, total_failed, total_error)
+def spawn_netns():
+ # prefer unshare module
+ try:
+ import unshare
+ unshare.unshare(unshare.CLONE_NEWNET)
+ return True
+ except:
+ pass
+
+ # sledgehammer style:
+ # - call ourselves prefixed by 'unshare -n' if found
+ # - pass extra --no-netns parameter to avoid another recursion
+ try:
+ import shutil
+
+ unshare = shutil.which("unshare")
+ if unshare is None:
+ return False
+
+ sys.argv.append("--no-netns")
+ os.execv(unshare, [unshare, "-n", sys.executable] + sys.argv)
+ except:
+ pass
+
+ return False
+
+
def main():
global xtables_nft_multi
+
+ if args.replay:
+ if os.getuid() != 0:
+ print("Replay test requires root, sorry", file=sys.stderr)
+ return
+ if not args.no_netns and not spawn_netns():
+ print("Cannot run in own namespace, connectivity might break",
+ file=sys.stderr)
+
if not args.host:
os.putenv("XTABLES_LIBDIR", os.path.abspath("extensions"))
xtables_nft_multi = os.path.abspath(os.path.curdir) \
@@ -139,6 +276,12 @@ def main():
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', action='store_true',
help='Run tests against installed binaries')
+parser.add_argument('-R', '--replay', action='store_true',
+ help='Replay tests to check iptables-nft parser')
+parser.add_argument('-n', '--nft', type=str, default='nft',
+ help='Replay using given nft binary (default: \'%(default)s\')')
+parser.add_argument('--no-netns', action='store_true',
+ help='Do not run testsuite in own network namespace')
parser.add_argument("test", nargs="?", help="run only the specified test file")
args = parser.parse_args()
sys.exit(main())