summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xiptables-test.py160
1 files changed, 159 insertions, 1 deletions
diff --git a/iptables-test.py b/iptables-test.py
index b5a70e44..d9b3ee4e 100755
--- a/iptables-test.py
+++ b/iptables-test.py
@@ -222,6 +222,153 @@ def variant_res(res, variant, alt_res=None):
return alt_res
return res_inverse[res]
+def fast_run_possible(filename):
+ '''
+ Keep things simple, run only for simple test files:
+ - no external commands
+ - no multiple tables
+ - no variant-specific results
+ '''
+ table = None
+ rulecount = 0
+ for line in open(filename):
+ if line[0] in ["#", ":"] or len(line.strip()) == 0:
+ continue
+ if line[0] == "*":
+ if table or rulecount > 0:
+ return False
+ table = line.rstrip()[1:]
+ if line[0] in ["@", "%"]:
+ return False
+ if len(line.split(";")) > 3:
+ return False
+ rulecount += 1
+
+ return True
+
+def run_test_file_fast(iptables, filename, netns):
+ '''
+ Run a test file, but fast
+
+ :param filename: name of the file with the test rules
+ :param netns: network namespace to perform test run in
+ '''
+
+ f = open(filename)
+
+ rules = {}
+ table = "filter"
+ chain_array = []
+ tests = 0
+
+ for lineno, line in enumerate(f):
+ if line[0] == "#" or len(line.strip()) == 0:
+ continue
+
+ if line[0] == "*":
+ table = line.rstrip()[1:]
+ continue
+
+ if line[0] == ":":
+ chain_array = line.rstrip()[1:].split(",")
+ continue
+
+ if len(chain_array) == 0:
+ return -1
+
+ tests += 1
+
+ for chain in chain_array:
+ item = line.split(";")
+ rule = chain + " " + item[0]
+
+ if item[1] == "=":
+ rule_save = chain + " " + item[0]
+ else:
+ rule_save = chain + " " + item[1]
+
+ res = item[2].rstrip()
+ if res != "OK":
+ rule = chain + " -t " + table + " " + item[0]
+ ret = run_test(iptables, rule, rule_save,
+ res, filename, lineno + 1, netns)
+
+ if ret < 0:
+ return -1
+ continue
+
+ if not chain in rules.keys():
+ rules[chain] = []
+ rules[chain].append((rule, rule_save))
+
+ restore_data = ["*" + table]
+ out_expect = []
+ for chain in ["PREROUTING", "INPUT", "FORWARD", "OUTPUT", "POSTROUTING"]:
+ if not chain in rules.keys():
+ continue
+ for rule in rules[chain]:
+ restore_data.append("-A " + rule[0])
+ out_expect.append("-A " + rule[1])
+ restore_data.append("COMMIT")
+
+ out_expect = "\n".join(out_expect)
+
+ # load all rules via iptables_restore
+
+ command = EXECUTABLE + " " + iptables + "-restore"
+ if netns:
+ command = "ip netns exec " + netns + " " + command
+
+ for line in restore_data:
+ print(iptables + "-restore: " + line, file=log_file)
+
+ proc = subprocess.Popen(command, shell = True, text = True,
+ stdin = subprocess.PIPE,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+ restore_data = "\n".join(restore_data) + "\n"
+ out, err = proc.communicate(input = restore_data)
+
+ if proc.returncode == -11:
+ reason = iptables + "-restore segfaults: " + cmd
+ print_error(reason, filename, lineno)
+ return -1
+
+ if proc.returncode != 0:
+ print("%s-restore returned %d: %s" % (iptables, proc.returncode, err),
+ file=log_file)
+ return -1
+
+ # find all rules in iptables_save output
+
+ command = EXECUTABLE + " " + iptables + "-save"
+ if netns:
+ command = "ip netns exec " + netns + " " + command
+
+ proc = subprocess.Popen(command, shell = True,
+ stdin = subprocess.PIPE,
+ stdout = subprocess.PIPE,
+ stderr = subprocess.PIPE)
+ out, err = proc.communicate()
+
+ if proc.returncode == -11:
+ reason = iptables + "-save segfaults: " + cmd
+ print_error(reason, filename, lineno)
+ return -1
+
+ cmd = iptables + " -F -t " + table
+ execute_cmd(cmd, filename, 0, netns)
+
+ out = out.decode('utf-8').rstrip()
+ if out.find(out_expect) < 0:
+ msg = ["dumps differ!"]
+ msg.extend(["expect: " + l for l in out_expect.split("\n")])
+ msg.extend(["got: " + l for l in out.split("\n")
+ if not l[0] in ['*', ':', '#']])
+ print("\n".join(msg), file=log_file)
+ return -1
+
+ return tests
def run_test_file(filename, netns):
'''
@@ -256,6 +403,14 @@ def run_test_file(filename, netns):
# default to iptables if not known prefix
iptables = IPTABLES
+ fast_failed = False
+ if fast_run_possible(filename):
+ tests = run_test_file_fast(iptables, filename, netns)
+ if tests > 0:
+ print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY))
+ return tests, tests
+ fast_failed = True
+
f = open(filename)
tests = 0
@@ -330,7 +485,10 @@ def run_test_file(filename, netns):
if netns:
execute_cmd("ip netns del " + netns, filename)
if total_test_passed:
- print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY))
+ suffix = ""
+ if fast_failed:
+ suffix = maybe_colored('red', " but fast mode failed!", STDOUT_IS_TTY)
+ print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY) + suffix)
f.close()
return tests, passed