From d196dccf1853039656f15c8da94ad349a3b7d07c Mon Sep 17 00:00:00 2001 From: Phil Sutter Date: Tue, 8 May 2018 13:08:45 +0200 Subject: tests/py: Support testing JSON input and output as well This extends nft-test.py by optional JSON testing capabilities, activated via '-j'/'--enable-json' parameter). JSON testing happens for all rules which are supposed to work: After a rule has been added and the existing tests (payload, ruleset listing output) have been performed, basically the same test is done again using a recorded JSON equivalent and (if necessary) a recorded listing output. The code tries to ease new test case creation overhead by auto-generating JSON equivalent input via listing the (non-JSON) rule in JSON format. Also, differing netlink debug and listing output are stored in *.got files to assist in analyzing/fixing failing test cases. Signed-off-by: Phil Sutter Signed-off-by: Pablo Neira Ayuso --- tests/py/nft-test.py | 186 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 181 insertions(+), 5 deletions(-) (limited to 'tests/py/nft-test.py') diff --git a/tests/py/nft-test.py b/tests/py/nft-test.py index 1e26d182..a41b95de 100755 --- a/tests/py/nft-test.py +++ b/tests/py/nft-test.py @@ -17,6 +17,7 @@ import sys import os import argparse import signal +import json TESTS_PATH = os.path.dirname(os.path.abspath(__file__)) TESTS_DIRECTORY = ["any", "arp", "bridge", "inet", "ip", "ip6"] @@ -669,6 +670,16 @@ def payload_check(payload_buffer, file, cmd): return i > 0 +def json_dump_normalize(json_string, human_readable = False): + json_obj = json.loads(json_string) + + if human_readable: + return json.dumps(json_obj, sort_keys = True, + indent = 4, separators = (',', ': ')) + else: + return json.dumps(json_obj, sort_keys = True) + + def rule_add(rule, filename, lineno, force_all_family_option, filename_path): ''' Adds a rule @@ -688,6 +699,39 @@ def rule_add(rule, filename, lineno, force_all_family_option, filename_path): except: payload_expected = None + if enable_json_option: + try: + json_log = open("%s.json" % filename_path) + json_input = json_find_expected(json_log, rule[0]) + except: + json_input = None + + if not json_input: + print_error("did not find JSON equivalent for rule '%s'" + % rule[0]) + else: + try: + json_input = json_dump_normalize(json_input) + except ValueError: + reason = "Invalid JSON syntax in rule: %s" % json_input + print_error(reason) + return [-1, warning, error, unit_tests] + + try: + json_log = open("%s.json.output" % filename_path) + json_expected = json_find_expected(json_log, rule[0]) + except: + # will use json_input for comparison + json_expected = None + + if json_expected: + try: + json_expected = json_dump_normalize(json_expected) + except ValueError: + reason = "Invalid JSON syntax in expected output: %s" % json_expected + print_error(reason) + return [-1, warning, error, unit_tests] + for table in table_list: if rule[1].strip() == "ok": table_payload_expected = None @@ -708,6 +752,7 @@ def rule_add(rule, filename, lineno, force_all_family_option, filename_path): payload_log = os.tmpfile() + # Add rule and check return code cmd = "add rule %s %s %s" % (table, chain, rule[0]) ret = execute_cmd(cmd, filename, lineno, payload_log, debug="netlink") @@ -747,7 +792,7 @@ def rule_add(rule, filename, lineno, force_all_family_option, filename_path): print_warning("Wrote payload for rule %s" % rule[0], gotf.name, 1) - # Check output of nft + # Check for matching ruleset listing numeric_old = nftables.set_numeric_output("all") stateless_old = nftables.set_stateless_output(True) list_cmd = 'list table %s' % table @@ -796,11 +841,106 @@ def rule_add(rule, filename, lineno, force_all_family_option, filename_path): if not force_all_family_option: return [ret, warning, error, unit_tests] - return [ret, warning, error, unit_tests] + if not enable_json_option: + continue + + # Generate JSON equivalent for rule if not found + if not json_input: + json_old = nftables.set_json_output(True) + rc, json_output, err = nftables.cmd(list_cmd) + nftables.set_json_output(json_old) + + json_output = json.loads(json_output) + for item in json_output["nftables"]: + if "rule" in item: + if "handle" in item["rule"]: + del(item["rule"]["handle"]) + if "position" in item["rule"]: + del(item["rule"]["position"]) + json_output = item["rule"] + break + json_input = json.dumps(json_output["expr"], sort_keys = True) + + gotf = open("%s.json.got" % filename_path, 'a') + jdump = json_dump_normalize(json_input, True) + gotf.write("# %s\n%s\n\n" % (rule[0], jdump)) + gotf.close() + print_warning("Wrote JSON equivalent for rule %s" % rule[0], + gotf.name, 1) + table_flush(table, filename, lineno) + payload_log = os.tmpfile() -def preexec(): - os.setpgrp() # Don't forward signals. + # Add rule in JSON format + cmd = json.dumps({ "nftables": [{ "add": { "rule": { + "family": table.family, + "table": table.name, + "chain": chain.name, + "expr": json.loads(json_input), + }}}]}) + + json_old = nftables.set_json_output(True) + ret = execute_cmd(cmd, filename, lineno, payload_log, debug="netlink") + nftables.set_json_output(json_old) + + if ret != 0: + reason = "Failed to add JSON equivalent rule" + print_error(reason, filename, lineno) + continue + + # Check for matching payload + if not payload_check(table_payload_expected, payload_log, cmd): + error += 1 + gotf = open("%s.json.payload.got" % filename_path, 'a') + payload_log.seek(0, 0) + gotf.write("# %s\n" % rule[0]) + while True: + line = payload_log.readline() + if line == "": + break + gotf.write(line) + gotf.close() + print_warning("Wrote JSON payload for rule %s" % rule[0], + gotf.name, 1) + + # Check for matching ruleset listing + numeric_old = nftables.set_numeric_output("all") + stateless_old = nftables.set_stateless_output(True) + json_old = nftables.set_json_output(True) + rc, json_output, err = nftables.cmd(list_cmd) + nftables.set_json_output(json_old) + nftables.set_numeric_output(numeric_old) + nftables.set_stateless_output(stateless_old) + + json_output = json.loads(json_output) + for item in json_output["nftables"]: + if "rule" in item: + if "handle" in item["rule"]: + del(item["rule"]["handle"]) + if "position" in item["rule"]: + del(item["rule"]["position"]) + json_output = item["rule"] + break + json_output = json.dumps(json_output["expr"], sort_keys = True) + + if not json_expected and json_output != json_input: + print_differences_warning(filename, lineno, + json_input, json_output, cmd) + error += 1 + gotf = open("%s.json.output.got" % filename_path, 'a') + jdump = json_dump_normalize(json_output, True) + gotf.write("# %s\n%s\n\n" % (rule[0], jdump)) + gotf.close() + print_warning("Wrote JSON output for rule %s" % rule[0], + gotf.name, 1) + # prevent further warnings and .got file updates + json_expected = json_output + elif json_expected and json_output != json_expected: + print_differences_warning(filename, lineno, + json_expected, json_output, cmd) + error += 1 + + return [ret, warning, error, unit_tests] def cleanup_on_exit(): @@ -986,6 +1126,37 @@ def payload_find_expected(payload_log, rule): return payload_buffer +def json_find_expected(json_log, rule): + ''' + Find the corresponding JSON for given rule + + :param json_log: open file handle of the json data + :param rule: nft rule we are going to add + ''' + found = 0 + json_buffer = "" + + while True: + line = json_log.readline() + if not line: + break + + if line[0] == "#": # rule start + rule_line = line.strip()[2:] + + if rule_line == rule.strip(): + found = 1 + continue + + if found == 1: + json_buffer += line.rstrip("\n").strip() + if line.isspace(): + return json_buffer + + json_log.seek(0, 0) + return json_buffer + + def run_test_file(filename, force_all_family_option, specific_file): ''' Runs a test file @@ -1139,11 +1310,16 @@ def main(): dest='force_all_family', help='keep testing all families on error') + parser.add_argument('-j', '--enable-json', action='store_true', + dest='enable_json', + help='test JSON functionality as well') + args = parser.parse_args() - global debug_option, need_fix_option + global debug_option, need_fix_option, enable_json_option debug_option = args.debug need_fix_option = args.need_fix_line force_all_family_option = args.force_all_family + enable_json_option = args.enable_json specific_file = False signal.signal(signal.SIGINT, signal_handler) -- cgit v1.2.3