diff options
Diffstat (limited to 'iptables-test.py')
-rwxr-xr-x | iptables-test.py | 160 |
1 files changed, 159 insertions, 1 deletions
diff --git a/iptables-test.py b/iptables-test.py index b5a70e44..d9b3ee4e 100755 --- a/iptables-test.py +++ b/iptables-test.py @@ -222,6 +222,153 @@ def variant_res(res, variant, alt_res=None): return alt_res return res_inverse[res] +def fast_run_possible(filename): + ''' + Keep things simple, run only for simple test files: + - no external commands + - no multiple tables + - no variant-specific results + ''' + table = None + rulecount = 0 + for line in open(filename): + if line[0] in ["#", ":"] or len(line.strip()) == 0: + continue + if line[0] == "*": + if table or rulecount > 0: + return False + table = line.rstrip()[1:] + if line[0] in ["@", "%"]: + return False + if len(line.split(";")) > 3: + return False + rulecount += 1 + + return True + +def run_test_file_fast(iptables, filename, netns): + ''' + Run a test file, but fast + + :param filename: name of the file with the test rules + :param netns: network namespace to perform test run in + ''' + + f = open(filename) + + rules = {} + table = "filter" + chain_array = [] + tests = 0 + + for lineno, line in enumerate(f): + if line[0] == "#" or len(line.strip()) == 0: + continue + + if line[0] == "*": + table = line.rstrip()[1:] + continue + + if line[0] == ":": + chain_array = line.rstrip()[1:].split(",") + continue + + if len(chain_array) == 0: + return -1 + + tests += 1 + + for chain in chain_array: + item = line.split(";") + rule = chain + " " + item[0] + + if item[1] == "=": + rule_save = chain + " " + item[0] + else: + rule_save = chain + " " + item[1] + + res = item[2].rstrip() + if res != "OK": + rule = chain + " -t " + table + " " + item[0] + ret = run_test(iptables, rule, rule_save, + res, filename, lineno + 1, netns) + + if ret < 0: + return -1 + continue + + if not chain in rules.keys(): + rules[chain] = [] + rules[chain].append((rule, rule_save)) + + restore_data = ["*" + table] + out_expect = [] + for chain in ["PREROUTING", "INPUT", "FORWARD", "OUTPUT", "POSTROUTING"]: + if not chain in rules.keys(): + continue + for rule in rules[chain]: + restore_data.append("-A " + rule[0]) + out_expect.append("-A " + rule[1]) + restore_data.append("COMMIT") + + out_expect = "\n".join(out_expect) + + # load all rules via iptables_restore + + command = EXECUTABLE + " " + iptables + "-restore" + if netns: + command = "ip netns exec " + netns + " " + command + + for line in restore_data: + print(iptables + "-restore: " + line, file=log_file) + + proc = subprocess.Popen(command, shell = True, text = True, + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + restore_data = "\n".join(restore_data) + "\n" + out, err = proc.communicate(input = restore_data) + + if proc.returncode == -11: + reason = iptables + "-restore segfaults: " + cmd + print_error(reason, filename, lineno) + return -1 + + if proc.returncode != 0: + print("%s-restore returned %d: %s" % (iptables, proc.returncode, err), + file=log_file) + return -1 + + # find all rules in iptables_save output + + command = EXECUTABLE + " " + iptables + "-save" + if netns: + command = "ip netns exec " + netns + " " + command + + proc = subprocess.Popen(command, shell = True, + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + out, err = proc.communicate() + + if proc.returncode == -11: + reason = iptables + "-save segfaults: " + cmd + print_error(reason, filename, lineno) + return -1 + + cmd = iptables + " -F -t " + table + execute_cmd(cmd, filename, 0, netns) + + out = out.decode('utf-8').rstrip() + if out.find(out_expect) < 0: + msg = ["dumps differ!"] + msg.extend(["expect: " + l for l in out_expect.split("\n")]) + msg.extend(["got: " + l for l in out.split("\n") + if not l[0] in ['*', ':', '#']]) + print("\n".join(msg), file=log_file) + return -1 + + return tests def run_test_file(filename, netns): ''' @@ -256,6 +403,14 @@ def run_test_file(filename, netns): # default to iptables if not known prefix iptables = IPTABLES + fast_failed = False + if fast_run_possible(filename): + tests = run_test_file_fast(iptables, filename, netns) + if tests > 0: + print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY)) + return tests, tests + fast_failed = True + f = open(filename) tests = 0 @@ -330,7 +485,10 @@ def run_test_file(filename, netns): if netns: execute_cmd("ip netns del " + netns, filename) if total_test_passed: - print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY)) + suffix = "" + if fast_failed: + suffix = maybe_colored('red', " but fast mode failed!", STDOUT_IS_TTY) + print(filename + ": " + maybe_colored('green', "OK", STDOUT_IS_TTY) + suffix) f.close() return tests, passed |