diff options
author | Arturo Borrero <arturo.borrero.glez@gmail.com> | 2015-12-11 11:10:14 +0100 |
---|---|---|
committer | Pablo Neira Ayuso <pablo@netfilter.org> | 2015-12-15 21:36:14 +0100 |
commit | 6b29a5bebb957387fe1aac8fcbfd431e6be237f0 (patch) | |
tree | 7e0be595ee6acfbdd2611045a7d227d5f329a16e /tests/regression/nft-test.py | |
parent | 564b0e7c13f98b9ad054058137b4c0bb61bd46b4 (diff) |
tests/: rearrange tests directory
Rearrange the directory to obtain a better organization of files and
tests-suites.
We end with a tree like this:
tests
|
.--- py
.--- shell
.--- files
This was suggested by Pablo.
Signed-off-by: Arturo Borrero Gonzalez <arturo.borrero.glez@gmail.com>
Signed-off-by: Pablo Neira Ayuso <pablo@netfilter.org>
Diffstat (limited to 'tests/regression/nft-test.py')
-rwxr-xr-x | tests/regression/nft-test.py | 968 |
1 files changed, 0 insertions, 968 deletions
diff --git a/tests/regression/nft-test.py b/tests/regression/nft-test.py deleted file mode 100755 index e68087f3..00000000 --- a/tests/regression/nft-test.py +++ /dev/null @@ -1,968 +0,0 @@ -#!/usr/bin/python -# -# (C) 2014 by Ana Rey Botello <anarey@gmail.com> -# -# Based on iptables-test.py: -# (C) 2012 by Pablo Neira Ayuso <pablo@netfilter.org>" -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# Thanks to the Outreach Program for Women (OPW) for sponsoring this test -# infrastructure. - -import sys -import os -import subprocess -import argparse -import signal - -TERMINAL_PATH = os.getcwd() -NFT_BIN = "src/nft" -TESTS_PATH = os.path.dirname(os.path.abspath(__file__)) -TESTS_DIRECTORY = ["any", "arp", "bridge", "inet", "ip", "ip6"] -LOGFILE = "/tmp/nftables-test.log" -log_file = None -table_list = [] -chain_list = [] -all_set = dict() -signal_received = 0 - - -class Colors: - if sys.stdout.isatty(): - HEADER = '\033[95m' - GREEN = '\033[92m' - YELLOW = '\033[93m' - RED = '\033[91m' - ENDC = '\033[0m' - else: - HEADER = '' - GREEN = '' - YELLOW = '' - RED = '' - ENDC = '' - -def print_msg(reason, filename=None, lineno=None, color=None, errstr=None): - ''' - Prints a message with nice colors, indicating file and line number. - ''' - if filename and lineno: - print (filename + ": " + color + "ERROR:" + - Colors.ENDC + " line %d: %s" % (lineno + 1, reason)) - else: - print (color + "ERROR:" + Colors.ENDC + " %s" % (reason)) - -def print_error(reason, filename=None, lineno=None): - print_msg(reason, filename, lineno, Colors.RED, "ERROR:") - -def print_warning(reason, filename=None, lineno=None): - print_msg(reason, filename, lineno, Colors.YELLOW, "WARNING:") - - -def print_differences_warning(filename, lineno, rule1, rule2, cmd): - reason = "'" + rule1 + "' mismatches '" + rule2 + "'" - print filename + ": " + Colors.YELLOW + "WARNING: " + Colors.ENDC + \ - "line: " + str(lineno + 1) + ": '" + cmd + "': " + reason - - -def print_differences_error(filename, lineno, output, cmd): - reason = "Listing is broken." - print filename + ": " + Colors.RED + "ERROR: " + Colors.ENDC + \ - "line: " + str(lineno + 1) + ": '" + cmd + "': " + reason - - -def table_exist(table, filename, lineno): - ''' - Exists a table. - ''' - cmd = NFT_BIN + " list -nnn table " + table[0] + " " + table[1] - ret = execute_cmd(cmd, filename, lineno) - - return True if (ret == 0) else False - - -def table_flush(table, filename, lineno): - ''' - Flush a table. - ''' - cmd = NFT_BIN + " flush table " + str(table[0]) + " " + str(table[1]) - ret = execute_cmd(cmd, filename, lineno) - - return cmd - - -def table_create(table, filename, lineno): - ''' - Adds a table. - ''' - ## We check if table exists. - if table_exist(table, filename, lineno): - reason = "Table " + table[1] + " already exists" - print_error(reason, filename, lineno) - return -1 - - table_list.append(table) - - ## We add a new table - cmd = NFT_BIN + " add table " + table[0] + " " + table[1] - ret = execute_cmd(cmd, filename, lineno) - - if ret != 0: - reason = "Cannot add table " + table[1] - print_error(reason, filename, lineno) - table_list.remove(table) - return -1 - - ## We check if table was added correctly. - if not table_exist(table, filename, lineno): - table_list.remove(table) - reason = "I have just added the table " + table[1] + \ - " but it does not exist. Giving up!" - print_error(reason, filename, lineno) - return -1 - - return 0 - - -def table_delete(table, filename=None, lineno=None): - ''' - Deletes a table. - ''' - table_info = " " + table[0] + " " + table[1] + " " - - if not table_exist(table, filename, lineno): - reason = "Table " + table[1] + \ - " does not exist but I added it before." - print_error(reason, filename, lineno) - return -1 - - cmd = NFT_BIN + " delete table" + table_info - ret = execute_cmd(cmd, filename, lineno) - if ret != 0: - reason = cmd + ": " \ - "I cannot delete table '" + table[1] + "'. Giving up! " - print_error(reason, filename, lineno) - return -1 - - if table_exist(table, filename, lineno): - reason = "I have just deleted the table " + table[1] + \ - " but the table still exists." - print_error(reason, filename, lineno) - return -1 - - return 0 - - -def chain_exist(chain, table, filename, lineno): - ''' - Checks a chain - ''' - - table_info = " " + table[0] + " " + table[1] + " " - cmd = NFT_BIN + " list -nnn chain" + table_info + chain - ret = execute_cmd(cmd, filename, lineno) - - return True if (ret == 0) else False - - -def chain_create(chain, chain_type, chain_list, table, filename, lineno): - ''' - Adds a chain - ''' - - table_info = " " + table[0] + " " + table[1] + " " - - if chain_exist(chain, table, filename, lineno): - reason = "This chain '" + chain + "' exists in " + table[1] + "." + \ - "I cannot create two chains with same name." - print_error(reason, filename, lineno) - return -1 - - if chain_type: - cmd = NFT_BIN + " add chain" + table_info + chain + "\{ " + chain_type + "\; \}" - else: - cmd = NFT_BIN + " add chain" + table_info + chain - - ret = execute_cmd(cmd, filename, lineno) - if ret != 0: - reason = "I cannot create the chain '" + chain - print_error(reason, filename, lineno) - return -1 - - if not chain in chain_list: - chain_list.append(chain) - - if not chain_exist(chain, table, filename, lineno): - reason = "I have added the chain '" + chain + \ - "' but it does not exist in " + table[1] - print_error(reason, filename, lineno) - return -1 - - return 0 - - -def chain_delete(chain, table, filename=None, lineno=None): - ''' - Flushes and deletes a chain. - ''' - - table_info = " " + table[0] + " " + table[1] + " " - - if not chain_exist(chain, table, filename, lineno): - reason = "The chain " + chain + " does not exists in " + table[1] + \ - ". I cannot delete it." - print_error(reason, filename, lineno) - return -1 - - cmd = NFT_BIN + " flush chain" + table_info + chain - ret = execute_cmd(cmd, filename, lineno) - if ret != 0: - reason = "I cannot flush this chain " + chain - print_error(reason, filename, lineno) - return -1 - - cmd = NFT_BIN + " delete chain" + table_info + chain - ret = execute_cmd(cmd, filename, lineno) - if ret != 0: - reason = cmd + "I cannot delete this chain. DD" - print_error(reason, filename, lineno) - return -1 - - if chain_exist(chain, table, filename, lineno): - reason = "The chain " + chain + " exists in " + table[1] + \ - ". I cannot delete this chain" - print_error(reason, filename, lineno) - return -1 - - return 0 - - -def set_add(set_info, table_list, filename, lineno): - ''' - Adds a set. - ''' - - if not table_list: - reason = "Missing table to add rule" - print_error(reason, filename, lineno) - return -1 - - for table in table_list: - if set_exist(set_info[0], table, filename, lineno): - reason = "This set " + set_info + " exists in " + table[1] + \ - ". I cannot add it again" - print_error(reason, filename, lineno) - return -1 - - table_info = " " + table[0] + " " + table[1] + " " - set_text = " " + set_info[0] + " { type " + set_info[1] + " \;}" - cmd = NFT_BIN + " add set" + table_info + set_text - ret = execute_cmd(cmd, filename, lineno) - - if (ret == 0 and set_info[2].rstrip() == "fail") or \ - (ret != 0 and set_info[2].rstrip() == "ok"): - reason = cmd + ": " + "I cannot add the set " + set_info[0] - print_error(reason, filename, lineno) - return -1 - - if not set_exist(set_info[0], table, filename, lineno): - reason = "I have just added the set " + set_info[0] + \ - " to the table " + table[1] + " but it does not exist" - print_error(reason, filename, lineno) - return -1 - - return 0 - - -def set_add_elements(set_element, set_name, set_all, state, table_list, - filename, lineno): - ''' - Adds elements to the set. - ''' - - if not table_list: - reason = "Missing table to add rules" - print_error(reason, filename, lineno) - return -1 - - for table in table_list: - # Check if set exists. - if (not set_exist(set_name, table, filename, lineno) or - not set_name in set_all) and state == "ok": - reason = "I cannot add an element to the set " + set_name + \ - " since it does not exist." - print_error(reason, filename, lineno) - return -1 - - table_info = " " + table[0] + " " + table[1] + " " - - element = "" - for e in set_element: - if not element: - element = e - else: - element = element + ", " + e - - set_text = set_name + " { " + element + " }" - cmd = NFT_BIN + " add element" + table_info + set_text - ret = execute_cmd(cmd, filename, lineno) - - if (state == "fail" and ret == 0) or (state == "ok" and ret != 0): - test_state = "This rule should have failed." - reason = cmd + ": " + test_state - print_error(reason, filename, lineno) - return -1 - - # Add element into a all_set. - if (ret == 0 and state == "ok"): - for e in set_element: - set_all[set_name].add(e) - - return 0 - - -def set_delete_elements(set_element, set_name, table, filename=None, - lineno=None): - ''' - Deletes elements in a set. - ''' - table_info = " " + table[0] + " " + table[1] + " " - - for element in set_element: - set_text = set_name + " {" + element + "}" - cmd = NFT_BIN + " delete element" + table_info + set_text - ret = execute_cmd(cmd, filename, lineno) - if ret != 0: - reason = "I cannot delete an element" + element + \ - " from the set '" + set_name - print_error(reason, filename, lineno) - return -1 - - return 0 - - -def set_delete(all_set, table, filename=None, lineno=None): - ''' - Deletes set and its content. - ''' - - for set_name in all_set.keys(): - # Check if exists the set - if not set_exist(set_name, table, filename, lineno): - reason = "The set " + set_name + \ - " does not exist, I cannot delete it" - print_error(reason, filename, lineno) - return -1 - - # We delete all elements in the set - set_delete_elements(all_set[set_name], set_name, table, filename, - lineno) - - # We delete the set. - table_info = " " + table[0] + " " + table[1] + " " - cmd = NFT_BIN + " delete set " + table_info + " " + set_name - ret = execute_cmd(cmd, filename, lineno) - - # Check if the set still exists after I deleted it. - if ret != 0 or set_exist(set_name, table, filename, lineno): - reason = "Cannot remove the set " + set_name - print_error(reason, filename, lineno) - return -1 - - return 0 - - -def set_exist(set_name, table, filename, lineno): - ''' - Check if the set exists. - ''' - table_info = " " + table[0] + " " + table[1] + " " - cmd = NFT_BIN + " list -nnn set" + table_info + set_name - ret = execute_cmd(cmd, filename, lineno) - - return True if (ret == 0) else False - - -def set_check_element(rule1, rule2): - ''' - Check if element exists in anonymous sets. - ''' - ret = -1 - pos1 = rule1.find("{") - pos2 = rule2.find("{") - end1 = rule1.find("}") - end2 = rule2.find("}") - - if ((pos1 != -1) and (pos2 != -1) and (end1 != -1) and (end2 != -1)): - list1 = (rule1[pos1 + 1:end1].replace(" ", "")).split(",") - list2 = (rule2[pos2 + 1:end2].replace(" ", "")).split(",") - list1.sort() - list2.sort() - if (cmp(list1, list2) == 0): - ret = 0 - return ret - - -def output_clean(pre_output, chain): - pos_chain = pre_output[0].find(chain) - if pos_chain == -1: - return "" - output_intermediate = pre_output[0][pos_chain:] - brace_start = output_intermediate.find("{") - brace_end = output_intermediate.find("}") - pre_rule = output_intermediate[brace_start:brace_end] - if pre_rule[1:].find("{") > -1: # this rule has a set. - set = pre_rule[1:].replace("\t", "").replace("\n", "").strip() - set = set.split(";")[2].strip() + "}" - return set - else: - rule = pre_rule.split(";")[2].replace("\t", "").replace("\n", "").strip() - if len(rule) < 0: - return "" - return rule - -def payload_check_elems_to_set(elems): - newset = set() - - for n, line in enumerate(elems.split('[end]')): - e = line.strip() - if e in newset: - print_error("duplicate", e, n) - return newset - - newset.add(e) - - return newset - -def payload_check_set_elems(want, got): - - if want.find('element') < 0 or want.find('[end]') < 0: - return 0 - - if got.find('element') < 0 or got.find('[end]') < 0: - return 0 - - set_want = payload_check_elems_to_set(want) - set_got = payload_check_elems_to_set(got) - - return set_want == set_got - -def payload_check(payload_buffer, file, cmd): - - file.seek(0, 0) - - ret = False - i = 0 - - for lineno, want_line in enumerate(payload_buffer): - line = file.readline() - - if want_line == line: - i += 1 - continue - - if want_line.find('[') < 0 and line.find('[') < 0: - continue - if want_line.find(']') < 0 and line.find(']') < 0: - continue - - if payload_check_set_elems(want_line, line): - continue - - print_differences_warning(file.name, lineno, want_line.strip(), line.strip(), cmd); - return 0 - - return i > 0 - -def rule_add(rule, table_list, chain_list, filename, lineno, - force_all_family_option, filename_path): - ''' - Adds a rule - ''' - # TODO Check if a rule is added correctly. - ret = warning = error = unit_tests = 0 - - if not table_list or not chain_list: - reason = "Missing table or chain to add rule." - print_error(reason, filename, lineno) - return [-1, warning, error, unit_tests] - - payload_expected = [] - - for table in table_list: - try: - payload_log = open("%s.payload.%s" % (filename_path, table[0])) - except (IOError): - payload_log = open("%s.payload" % filename_path) - - if rule[1].strip() == "ok": - try: - payload_expected.index(rule[0]) - except (ValueError): - payload_expected = payload_find_expected(payload_log, rule[0]) - - if payload_expected == []: - print_error("did not find payload information for rule '%s'" % rule[0], payload_log.name, 1) - - for chain in chain_list: - unit_tests += 1 - table_flush(table, filename, lineno) - table_info = " " + table[0] + " " + table[1] + " " - cmd = NFT_BIN + " add rule" + table_info + chain + " " + rule[0] - - payload_log = os.tmpfile(); - - cmd = NFT_BIN + " add rule --debug=netlink" + table_info + chain + " " + rule[0] - ret = execute_cmd(cmd, filename, lineno, payload_log) - - state = rule[1].rstrip() - if (ret == 0 and state == "fail") or (ret != 0 and state == "ok"): - if state == "fail": - test_state = "This rule should have failed." - else: - test_state = "This rule should not have failed." - reason = cmd + ": " + test_state - print_error(reason, filename, lineno) - ret = -1 - error += 1 - if not force_all_family_option: - return [ret, warning, error, unit_tests] - - if (state == "fail" and ret != 0): - ret = 0 - continue - - if ret == 0: - # Check for matching payload - if state == "ok" and not payload_check(payload_expected, payload_log, cmd): - error += 1 - gotf = open("%s.payload.got" % filename_path, 'a') - payload_log.seek(0, 0) - gotf.write("# %s\n" % rule[0]) - while True: - line = payload_log.readline() - if line == "": - break - gotf.write(line) - gotf.close() - print_warning("Wrote payload for rule %s" % rule[0], gotf.name, 1) - - # Check output of nft - process = subprocess.Popen([NFT_BIN, '-nnn', 'list', 'table'] + table, - shell=False, stdout=subprocess.PIPE, - preexec_fn=preexec) - pre_output = process.communicate() - output = pre_output[0].split(";") - if len(output) < 2: - reason = cmd + ": Listing is broken." - print_error(reason, filename, lineno) - ret = -1 - error += 1 - if not force_all_family_option: - return [ret, warning, error, unit_tests] - else: - rule_output = output_clean(pre_output, chain) - if (len(rule) == 3): - teoric_exit = rule[2] - else: - teoric_exit = rule[0] - - if (rule_output.rstrip() != teoric_exit.rstrip()): - if (rule[0].find("{") != -1): # anonymous sets - if (set_check_element(teoric_exit, rule_output) != 0): - warning += 1 - print_differences_warning(filename, lineno, - rule[0], rule_output, - cmd) - if not force_all_family_option: - return [ret, warning, error, unit_tests] - else: - if len(rule_output) <= 0: - error += 1 - print_differences_error(filename, lineno, - rule_output, cmd) - if not force_all_family_option: - return [ret, warning, error, unit_tests] - - warning += 1 - print_differences_warning(filename, lineno, - teoric_exit.rstrip(), rule_output, - cmd) - - if not force_all_family_option: - return [ret, warning, error, unit_tests] - - return [ret, warning, error, unit_tests] - - -def preexec(): - os.setpgrp() # Don't forward signals. - - -def cleanup_on_exit(): - for table in table_list: - for chain in chain_list: - ret = chain_delete(chain, table, "", "") - if all_set: - ret = set_delete(all_set, table) - ret = table_delete(table) - - -def signal_handler(signal, frame): - global signal_received - signal_received = 1 - - -def execute_cmd(cmd, filename, lineno, stdout_log = False): - ''' - Executes a command, checks for segfaults and returns the command exit - code. - - :param cmd: string with the command to be executed - :param filename: name of the file tested (used for print_error purposes) - :param lineno: line number being tested (used for print_error purposes) - ''' - global log_file - print >> log_file, "command: %s" % cmd - if debug_option: - print cmd - - if not stdout_log: - stdout_log = log_file - - ret = subprocess.call(cmd, shell=True, universal_newlines=True, - stderr=log_file, stdout=stdout_log, - preexec_fn=preexec) - log_file.flush() - - if ret == -11: - reason = "command segfaults: " + cmd - print_error(reason, filename, lineno) - - return ret - - -def print_result(filename, tests, warning, error): - return str(filename) + ": " + str(tests) + " unit tests, " + \ - str(error) + " error, " + str(warning) + " warning" - - -def print_result_all(filename, tests, warning, error, unit_tests): - return str(filename) + ": " + str(tests) + " unit tests, " +\ - str(unit_tests) + " total test executed, " + \ - str(error) + " error, " + \ - str(warning) + " warning" - - -def table_process(table_line, filename, lineno): - if ";" in table_line: - table_info = table_line.split(";") - else: - table_info.append("ip") - table_info.append(table_line) - - return table_create(table_info, filename, lineno) - - -def chain_process(chain_line, filename, lineno): - chain_name = chain_line[0] - chain_type = "" - for table in table_list: - if len(chain_line) > 1: - chain_type = chain_line[1] - ret = chain_create(chain_name, chain_type, chain_list, table, - filename, lineno) - if ret != 0: - return -1 - return ret - - -def set_process(set_line, filename, lineno): - set_info = [] - set_name = "".join(set_line[0].rstrip()[1:]) - set_info.append(set_name) - set_type = set_line[1].split(";")[0] - set_state = set_line[1].split(";")[1] # ok or fail - set_info.append(set_type) - set_info.append(set_state) - ret = set_add(set_info, table_list, filename, lineno) - if ret == 0: - all_set[set_name] = set() - - return ret - - -def set_element_process(element_line, filename, lineno): - rule_state = element_line[1] - set_name = element_line[0].split(" ")[0] - set_element = element_line[0].split(" ") - set_element.remove(set_name) - return set_add_elements(set_element, set_name, all_set, rule_state, - table_list, filename, lineno) - -def payload_find_expected(payload_log, rule): - ''' - Find the netlink payload that should be generated by given rule in payload_log - - :param payload_log: open file handle of the payload data - :param rule: nft rule we are going to add - ''' - found = 0 - pos = 0 - payload_buffer = [] - - while True: - line = payload_log.readline() - if not line: - break - - if line[0] == "#": # rule start - rule_line = line.strip()[2:] - - if rule_line == rule.strip(): - found = 1 - continue - - if found == 1: - payload_buffer.append(line) - if line.isspace(): - return payload_buffer - - payload_log.seek(0, 0) - return payload_buffer - -def run_test_file(filename, force_all_family_option, specific_file): - ''' - Runs a test file - - :param filename: name of the file with the test rules - ''' - - if specific_file: - filename_path = os.path.join(TERMINAL_PATH, filename) - else: - filename_path = os.path.join(TESTS_PATH, filename) - - f = open(filename_path) - tests = passed = total_unit_run = total_warning = total_error = 0 - table = "" - total_test_passed = True - - for lineno, line in enumerate(f): - if signal_received == 1: - print "\nSignal received. Cleaning up and Exitting..." - cleanup_on_exit() - sys.exit(0) - - if line.isspace(): - continue - - if line[0] == "#": # Command-line - continue - - if line[0] == '*': # Table - table_line = line.rstrip()[1:] - ret = table_process(table_line, filename, lineno) - if (ret != 0): - total_test_passed = False - break - continue - - if line[0] == ":": # Chain - chain_line = line.rstrip()[1:].split(";") - ret = chain_process(chain_line, filename, lineno) - if ret != 0: - total_test_passed = False - break - continue - - if line[0] == "!": # Adds this set - set_line = line.rstrip()[0:].split(" ") - ret = set_process(set_line, filename, lineno) - tests += 1 - if ret == -1: - total_test_passed = False - continue - passed += 1 - continue - - if line[0] == "?": # Adds elements in a set - element_line = line.rstrip()[1:].split(";") - ret = set_element_process(element_line, filename, lineno) - tests += 1 - if ret == -1: - total_test_passed = False - continue - - passed += 1 - continue - - # Rule - rule = line.split(';') # rule[1] Ok or FAIL - if len(rule) == 1 or len(rule) > 3 or rule[1].rstrip() not in {"ok", "fail"}: - reason = "Skipping malformed rule test. (" + line.rstrip('\n') + ")" - print_warning(reason, filename, lineno) - continue - - if line[0] == "-": # Run omitted lines - if need_fix_option: - rule[0] = rule[0].rstrip()[1:].strip() - else: - continue - elif need_fix_option: - continue - - result = rule_add(rule, table_list, chain_list, filename, lineno, - force_all_family_option, filename_path) - tests += 1 - ret = result[0] - warning = result[1] - total_warning += warning - total_error += result[2] - total_unit_run += result[3] - - if ret != 0: - total_test_passed = False - continue - - if warning == 0: # All ok. - passed += 1 - - # Delete rules, sets, chains and tables - for table in table_list: - # We delete chains - for chain in chain_list: - ret = chain_delete(chain, table, filename, lineno) - if ret != 0: - total_test_passed = False - - # We delete sets. - if all_set: - ret = set_delete(all_set, table, filename, lineno) - if ret != 0: - total_test_passed = False - reason = "There is a problem when we delete a set" - print_error(reason, filename, lineno) - - # We delete tables. - ret = table_delete(table, filename, lineno) - - if ret != 0: - total_test_passed = False - - if specific_file: - if force_all_family_option: - print print_result_all(filename, tests, total_warning, total_error, - total_unit_run) - else: - print print_result(filename, tests, total_warning, total_error) - else: - if (tests == passed and tests > 0): - print filename + ": " + Colors.GREEN + "OK" + Colors.ENDC - - f.close() - del table_list[:] - del chain_list[:] - all_set.clear() - - return [tests, passed, total_warning, total_error, total_unit_run] - - -def main(): - parser = argparse.ArgumentParser(description='Run nft tests', - version='1.0') - - parser.add_argument('filename', nargs='?', - metavar='path/to/file.t', - help='Run only this test') - - parser.add_argument('-d', '--debug', action='store_true', - dest='debug', - help='enable debugging mode') - - parser.add_argument('-e', '--need-fix', action='store_true', - dest='need_fix_line', - help='run rules that need a fix') - - parser.add_argument('-f', '--force-family', action='store_true', - dest='force_all_family', - help='keep testing all families on error') - - args = parser.parse_args() - global debug_option, need_fix_option - debug_option = args.debug - need_fix_option = args.need_fix_line - force_all_family_option = args.force_all_family - specific_file = False - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - if os.getuid() != 0: - print "You need to be root to run this, sorry" - return - - # Change working directory to repository root - os.chdir(TESTS_PATH + "/../..") - - if not os.path.isfile(NFT_BIN): - print "The nft binary does not exist. You need to build the project." - return - - test_files = files_ok = run_total = 0 - tests = passed = warnings = errors = 0 - global log_file - try: - log_file = open(LOGFILE, 'w') - except IOError: - print "Cannot open log file %s" % LOGFILE - return - - file_list = [] - if args.filename: - file_list = [args.filename] - specific_file = True - else: - for directory in TESTS_DIRECTORY: - path = os.path.join(TESTS_PATH, directory) - for root, dirs, files in os.walk(path): - for f in files: - if f.endswith(".t"): - file_list.append(os.path.join(directory, f)) - - for filename in file_list: - result = run_test_file(filename, force_all_family_option, specific_file) - file_tests = result[0] - file_passed = result[1] - file_warnings = result[2] - file_errors = result[3] - file_unit_run = result[4] - - test_files += 1 - - if file_warnings == 0 and file_tests == file_passed: - files_ok += 1 - if file_tests: - tests += file_tests - passed += file_passed - errors += file_errors - warnings += file_warnings - if force_all_family_option: - run_total += file_unit_run - - if test_files == 0: - print "No test files to run" - else: - if not specific_file: - if force_all_family_option: - print ("%d test files, %d files passed, %d unit tests, %d total executed, %d error, %d warning" % - (test_files, files_ok, tests, run_total, errors, warnings)) - else: - print ("%d test files, %d files passed, %d unit tests, %d error, %d warning" % - (test_files, files_ok, tests, errors, warnings)) - -if __name__ == '__main__': - main() |