summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tests/regression/README141
-rwxr-xr-xtests/regression/nft-test.py859
2 files changed, 1000 insertions, 0 deletions
diff --git a/tests/regression/README b/tests/regression/README
new file mode 100644
index 00000000..7b66d229
--- /dev/null
+++ b/tests/regression/README
@@ -0,0 +1,141 @@
+Author: Ana Rey <anarey@gmail.com>
+Date: 18/Sept/2014
+
+Here, the automated regression testing for nftables and some test
+files.
+
+This script checks that the rule input and output of nft matches.
+More details here below.
+
+A) What is this testing?
+
+This script tests two different paths:
+
+* The rule input from the command-line. This checks the different steps
+ from the command line to the kernel. This includes the parsing,
+ evaluation and netlink generation steps.
+
+* The output listing that is obtained from the kernel. This checks the
+ different steps from the kernel to the command line: The netlink
+ message parsing, postprocess and textify steps to display the rule
+ listing.
+
+As a final step, this script compares that the rule that is added can
+be listed by nft.
+
+B) What options are available?
+
+The script offers the following options:
+
+* Execute test files:
+
+./nft-test.py # Run all test files
+./nft-test.py path/file.t # Run this test file
+
+If there is a problem, it shows the differences between the rule that
+is added and the rule that is listed by nft.
+
+In case you hit an error, the script doesn't keep testing for more
+families. Unless you specify the --force-family option.
+
+* Execute broken tests:
+
+./nft-test.sh -e
+
+This runs tests for rules that need a fix: This mode runs the lines that
+that start with a "-" symbol.
+
+* Debugging:
+
+./nft-test.sh -d
+
+This shows all the commands that the script executes, so you can watch
+its internal behaviour.
+
+* Keep testing all families on error.
+
+./nft-test.sh -f
+
+Don't stop testing for more families in case of error.
+
+C) What is the structure of the test file?
+
+A test file contains a set of rules that are added in the system.
+
+Here, an example of a test file:
+
+ *ip;test-ipv4 # line 1
+ *ip6;test-ipv6 # line 2
+ *inet;test-inet # line 3
+
+ :input;type filter hook input priority 0 # line 4
+
+ ah hdrlength != 11-23;ok;ah hdrlength < 11 ah hdrlength > 23 # line 5
+ - tcp dport != {22-25} # line 6
+
+ !set1 ipv4_addr;ok # line 7
+ ?set1 192.168.3.8 192.168.3.9;ok # line 8
+ # This is a commented-line. # line 9
+
+Line 1 defines a table. The name of the table is 'test-ip' and the
+family is ip. Lines 2 and 3 defines more tables for different families
+so the rules in this test file are also tested there.
+
+Line 4 defines the chain. The name of this chain is "input". The type is
+"filter", the hook is "input" and the priority is 0.
+
+Line 5 defines the rule, the ";" character is used as separator of several
+parts:
+
+* Part 1: "ah hdrlength != 11-23" is the rule to check.
+* Part 2: "ok" is the result expected with the execute of this rule.
+* Part 3: "ah hdrlength < 11 ah hdrlength > 23". This is the expected
+ output. You can leave this empty if the output is the same as the
+ input.
+
+Line 6 is a marked line. This means that this rule is tested if
+'-e' is passed as argument to nft-test.py.
+
+Line 7 adds a new set. The name of this set is "set1" and the type
+of this set is "ipv4_add".
+
+Line 8 adds two elements into the 'set1' set: "192.168.3.8" and
+"192.168.3.9". A whitespace separates the elements of the set.
+
+Line 9 uses the "#" symbol that means that this line is commented out.
+
+D) The test folders
+
+The test files are divided in several directories: ip, ip6, inet, arp,
+bridge and any.
+
+ * "ip" folder contains the test files that are executed in ip and inet
+ table.
+
+ * "ip" folder contains the test files that are executed in ip6 and inet
+ table.
+
+ * "inet" folder contains the test files that are executed in the ip, ip6
+ and inet table.
+
+ * "arp" folder contains the test files that are executed in the arp
+ table.
+
+ * "bridge" folder: Here are the test files are executed in bridge
+ tables.
+
+ * "any" folder: Here are the test files are executed in ip, ip6, inet,
+ arp and bridge tables.
+
+E) Meaning of messages:
+
+* A warning message means the rule input and output of nft mismatches.
+* An error message means the nft-tool shows an error when we add it or
+ the listing is broken after the rule is added.
+
+F) Acknowledgements
+
+Thanks to the Outreach Program for Women (OPW) for sponsoring this test
+infrastructure and my mentor Pablo Neira.
+
+-EOF-
diff --git a/tests/regression/nft-test.py b/tests/regression/nft-test.py
new file mode 100755
index 00000000..d4929f08
--- /dev/null
+++ b/tests/regression/nft-test.py
@@ -0,0 +1,859 @@
+#!/usr/bin/python
+#
+# (C) 2014 by Ana Rey Botello <anarey@gmail.com>
+#
+# Based on iptables-test.py:
+# (C) 2012 by Pablo Neira Ayuso <pablo@netfilter.org>"
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# Thanks to the Outreach Program for Women (OPW) for sponsoring this test
+# infrastructure.
+
+import sys
+import os
+import subprocess
+import argparse
+import signal
+
+TERMINAL_PATH = os.getcwd()
+TESTS_PATH = os.path.dirname(os.path.abspath(__file__))
+TESTS_DIRECTORY = ["any", "arp", "bridge", "inet", "ip", "ip6"]
+LOGFILE = "/tmp/nftables-test.log"
+log_file = None
+table_list = []
+chain_list = []
+all_set = dict()
+signal_received = 0
+
+
+class Colors:
+ HEADER = '\033[95m'
+ GREEN = '\033[92m'
+ YELLOW = '\033[93m'
+ RED = '\033[91m'
+ ENDC = '\033[0m'
+
+
+def print_error(reason, filename=None, lineno=None):
+ '''
+ Prints an error with nice colors, indicating file and line number.
+ '''
+ if filename and lineno:
+ print (filename + ": " + Colors.RED + "ERROR:" +
+ Colors.ENDC + " line %d: %s" % (lineno + 1, reason))
+ else:
+ print (Colors.RED + "ERROR:" + Colors.ENDC + " %s" % (reason))
+
+
+def print_warning(reason, filename=None, lineno=None):
+ '''
+ Prints a warning with nice colors, indicating file and line number.
+ '''
+ if filename and lineno:
+ print (filename + ": " + Colors.YELLOW + "WARNING:" + \
+ Colors.ENDC + " line %d: %s" % (lineno + 1, reason))
+ else:
+ print (Colors.YELLOW + "WARNING:" + " %s" % (reason))
+
+
+def print_differences_warning(filename, lineno, rule1, rule2, cmd):
+ reason = "'" + rule1 + "' mismatches '" + rule2 + "'"
+ print filename + ": " + Colors.YELLOW + "WARNING: " + Colors.ENDC + \
+ "line: " + str(lineno + 1) + ": '" + cmd + "': " + reason
+
+
+def print_differences_error(filename, lineno, output, cmd):
+ reason = "Listing is broken."
+ print filename + ": " + Colors.RED + "ERROR: " + Colors.ENDC + \
+ "line: " + str(lineno + 1) + ": '" + cmd + "': " + reason
+
+
+def table_exist(table, filename, lineno):
+ '''
+ Exists a table.
+ '''
+ cmd = "nft list -nnn table " + table[0] + " " + table[1]
+ ret = execute_cmd(cmd, filename, lineno)
+
+ return True if (ret == 0) else False
+
+
+def table_flush(table, filename, lineno):
+ '''
+ Flush a table.
+ '''
+ cmd = "nft flush table " + str(table[0]) + " " + str(table[1])
+ ret = execute_cmd(cmd, filename, lineno)
+
+ return cmd
+
+
+def table_create(table, filename, lineno):
+ '''
+ Adds a table.
+ '''
+ ## We check if table exists.
+ if table_exist(table, filename, lineno):
+ reason = "Table " + table[1] + " already exists"
+ print_error(reason, filename, lineno)
+ return -1
+
+ table_list.append(table)
+
+ ## We add a new table
+ cmd = "nft add table " + table[0] + " " + table[1]
+ ret = execute_cmd(cmd, filename, lineno)
+
+ if ret != 0:
+ reason = "Cannot add table " + table[1]
+ print_error(reason, filename, lineno)
+ table_list.remove(table)
+ return -1
+
+ ## We check if table was added correctly.
+ if not table_exist(table, filename, lineno):
+ table_list.remove(table)
+ reason = "I have just added the table " + table[1] + \
+ " but it does not exist. Giving up!"
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def table_delete(table, filename=None, lineno=None):
+ '''
+ Deletes a table.
+ '''
+ table_info = " " + table[0] + " " + table[1] + " "
+
+ if not table_exist(table, filename, lineno):
+ reason = "Table " + table[1] + \
+ " does not exist but I added it before."
+ print_error(reason, filename, lineno)
+ return -1
+
+ cmd = "nft delete table" + table_info
+ ret = execute_cmd(cmd, filename, lineno)
+ if ret != 0:
+ reason = cmd + ": " \
+ "I cannot delete table '" + table[1] + "'. Giving up! "
+ print_error(reason, filename, lineno)
+ return -1
+
+ if table_exist(table, filename, lineno):
+ reason = "I have just deleted the table " + table[1] + \
+ " but the table still exists."
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def chain_exist(chain, table, filename, lineno):
+ '''
+ Checks a chain
+ '''
+
+ table_info = " " + table[0] + " " + table[1] + " "
+ cmd = "nft list -nnn chain" + table_info + chain
+ ret = execute_cmd(cmd, filename, lineno)
+
+ return True if (ret == 0) else False
+
+
+def chain_create(chain, chain_type, chain_list, table, filename, lineno):
+ '''
+ Adds a chain
+ '''
+
+ table_info = " " + table[0] + " " + table[1] + " "
+
+ if chain_exist(chain, table, filename, lineno):
+ reason = "This chain '" + chain + "' exists in " + table[1] + "." + \
+ "I cannot create two chains with same name."
+ print_error(reason, filename, lineno)
+ return -1
+
+ if chain_type:
+ cmd = "nft add chain" + table_info + chain + "\{ " + chain_type + "\; \}"
+ else:
+ cmd = "nft add chain" + table_info + chain
+
+ ret = execute_cmd(cmd, filename, lineno)
+ if ret != 0:
+ reason = "I cannot create the chain '" + chain
+ print_error(reason, filename, lineno)
+ return -1
+
+ if not chain in chain_list:
+ chain_list.append(chain)
+
+ if not chain_exist(chain, table, filename, lineno):
+ reason = "I have added the chain '" + chain + \
+ "' but it does not exist in " + table[1]
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def chain_delete(chain, table, filename=None, lineno=None):
+ '''
+ Flushes and deletes a chain.
+ '''
+
+ table_info = " " + table[0] + " " + table[1] + " "
+
+ if not chain_exist(chain, table, filename, lineno):
+ reason = "The chain " + chain + " does not exists in " + table[1] + \
+ ". I cannot delete it."
+ print_error(reason, filename, lineno)
+ return -1
+
+ cmd = "nft flush chain" + table_info + chain
+ ret = execute_cmd(cmd, filename, lineno)
+ if ret != 0:
+ reason = "I cannot flush this chain " + chain
+ print_error(reason, filename, lineno)
+ return -1
+
+ cmd = "nft delete chain" + table_info + chain
+ ret = execute_cmd(cmd, filename, lineno)
+ if ret != 0:
+ reason = cmd + "I cannot delete this chain. DD"
+ print_error(reason, filename, lineno)
+ return -1
+
+ if chain_exist(chain, table, filename, lineno):
+ reason = "The chain " + chain + " exists in " + table[1] + \
+ ". I cannot delete this chain"
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def set_add(set_info, table_list, filename, lineno):
+ '''
+ Adds a set.
+ '''
+
+ if not table_list:
+ reason = "Missing table to add rule"
+ print_error(reason, filename, lineno)
+ return -1
+
+ for table in table_list:
+ if set_exist(set_info[0], table, filename, lineno):
+ reason = "This set " + set_info + " exists in " + table[1] + \
+ ". I cannot add it again"
+ print_error(reason, filename, lineno)
+ return -1
+
+ table_info = " " + table[0] + " " + table[1] + " "
+ set_text = " " + set_info[0] + " { type " + set_info[1] + " \;}"
+ cmd = "nft add set" + table_info + set_text
+ ret = execute_cmd(cmd, filename, lineno)
+
+ if (ret == 0 and set_info[2].rstrip() == "fail") or \
+ (ret != 0 and set_info[2].rstrip() == "ok"):
+ reason = cmd + ": " + "I cannot add the set " + set_info[0]
+ print_error(reason, filename, lineno)
+ return -1
+
+ if not set_exist(set_info[0], table, filename, lineno):
+ reason = "I have just added the set " + set_info[0] + \
+ " to the table " + table[1] + " but it does not exist"
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def set_add_elements(set_element, set_name, set_all, state, table_list,
+ filename, lineno):
+ '''
+ Adds elements to the set.
+ '''
+
+ if not table_list:
+ reason = "Missing table to add rules"
+ print_error(reason, filename, lineno)
+ return -1
+
+ for table in table_list:
+ # Check if set exists.
+ if (not set_exist(set_name, table, filename, lineno) or
+ not set_name in set_all) and state == "ok":
+ reason = "I cannot add an element to the set " + set_name + \
+ " since it does not exist."
+ print_error(reason, filename, lineno)
+ return -1
+
+ table_info = " " + table[0] + " " + table[1] + " "
+
+ element = ""
+ for e in set_element:
+ if not element:
+ element = e
+ else:
+ element = element + ", " + e
+
+ set_text = set_name + " { " + element + " }"
+ cmd = "nft add element -nnn" + table_info + set_text
+ ret = execute_cmd(cmd, filename, lineno)
+
+ if (state == "fail" and ret == 0) or (state == "ok" and ret != 0):
+ test_state = "This rule should have failed."
+ reason = cmd + ": " + test_state
+ print_error(reason, filename, lineno)
+ return -1
+
+ # Add element into a all_set.
+ if (ret == 0 and state == "ok"):
+ for e in set_element:
+ set_all[set_name].add(e)
+
+ return 0
+
+
+def set_delete_elements(set_element, set_name, table, filename=None,
+ lineno=None):
+ '''
+ Deletes elements in a set.
+ '''
+ table_info = " " + table[0] + " " + table[1] + " "
+
+ for element in set_element:
+ set_text = set_name + " {" + element + "}"
+ cmd = "nft delete element -nnn" + table_info + set_text
+ ret = execute_cmd(cmd, filename, lineno)
+ if ret != 0:
+ reason = "I cannot delete an element" + element + \
+ " from the set '" + set_name
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def set_delete(all_set, table, filename=None, lineno=None):
+ '''
+ Deletes set and its content.
+ '''
+
+ for set_name in all_set.keys():
+ # Check if exists the set
+ if not set_exist(set_name, table, filename, lineno):
+ reason = "The set " + set_name + \
+ " does not exist, I cannot delete it"
+ print_error(reason, filename, lineno)
+ return -1
+
+ # We delete all elements in the set
+ set_delete_elements(all_set[set_name], set_name, table, filename,
+ lineno)
+
+ # We delete the set.
+ table_info = " " + table[0] + " " + table[1] + " "
+ cmd = "nft delete set " + table_info + " " + set_name
+ ret = execute_cmd(cmd, filename, lineno)
+
+ # Check if the set still exists after I deleted it.
+ if ret != 0 or set_exist(set_name, table, filename, lineno):
+ reason = "Cannot remove the set " + set_name
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def set_exist(set_name, table, filename, lineno):
+ '''
+ Check if the set exists.
+ '''
+ table_info = " " + table[0] + " " + table[1] + " "
+ cmd = "nft list -nnn set" + table_info + set_name
+ ret = execute_cmd(cmd, filename, lineno)
+
+ return True if (ret == 0) else False
+
+
+def set_check_element(rule1, rule2):
+ '''
+ Check if element exists in anonymous sets.
+ '''
+ ret = -1
+ pos1 = rule1.find("{")
+ pos2 = rule2.find("{")
+ end1 = rule1.find("}")
+ end2 = rule2.find("}")
+
+ if ((pos1 != -1) and (pos2 != -1) and (end1 != -1) and (end2 != -1)):
+ list1 = (rule1[pos1 + 1:end1].replace(" ", "")).split(",")
+ list2 = (rule2[pos2 + 1:end2].replace(" ", "")).split(",")
+ list1.sort()
+ list2.sort()
+ if (cmp(list1, list2) == 0):
+ ret = 0
+ return ret
+
+
+def output_clean(pre_output, chain):
+ pos_chain = pre_output[0].find(chain)
+ if pos_chain == -1:
+ return ""
+ output_intermediate = pre_output[0][pos_chain:]
+ brace_start = output_intermediate.find("{")
+ brace_end = output_intermediate.find("}")
+ pre_rule = output_intermediate[brace_start:brace_end]
+ if pre_rule[1:].find("{") > -1: # this rule has a set.
+ set = pre_rule[1:].replace("\t", "").replace("\n", "").strip()
+ set = set.split(";")[1].strip() + "}"
+ return set
+ else:
+ rule = pre_rule.split(";")[1].replace("\t", "").replace("\n", "").strip()
+ if len(rule) < 0:
+ return ""
+ return rule
+
+
+def rule_add(rule, table_list, chain_list, filename, lineno,
+ force_all_family_option):
+ '''
+ Adds a rule
+ '''
+ # TODO Check if a rule is added correctly.
+ ret = warning = error = unit_tests = 0
+
+ if not table_list or not chain_list:
+ reason = "Missing table or chain to add rule."
+ print_error(reason, filename, lineno)
+ return [-1, warning, error, unit_tests]
+
+ for table in table_list:
+ for chain in chain_list:
+ if len(rule) == 1:
+ reason = "Skipping malformed test. (" + \
+ str(rule[0].rstrip('\n')) + ")"
+ print_warning(reason, filename, lineno)
+ continue
+
+ unit_tests += 1
+ table_flush(table, filename, lineno)
+ table_info = " " + table[0] + " " + table[1] + " "
+ cmd = "nft add rule -nnn" + table_info + chain + " " + rule[0]
+
+ ret = execute_cmd(cmd, filename, lineno)
+
+ state = rule[1].rstrip()
+ if (ret == 0 and state == "fail") or (ret != 0 and state == "ok"):
+ if state == "fail":
+ test_state = "This rule should have failed."
+ else:
+ test_state = "This rule should not have failed."
+ reason = cmd + ": " + test_state
+ print_error(reason, filename, lineno)
+ ret = -1
+ error += 1
+ if not force_all_family_option:
+ return [ret, warning, error, unit_tests]
+
+ if (state == "fail" and ret != 0):
+ ret = 0
+ continue
+
+ if ret == 0:
+ # Check output of nft
+ process = subprocess.Popen(['nft', '-nnn', 'list', 'table'] + table,
+ shell=False, stdout=subprocess.PIPE,
+ preexec_fn=preexec)
+ pre_output = process.communicate()
+ output = pre_output[0].split(";")
+ if len(output) < 2:
+ reason = cmd + ": Listing is broken."
+ print_error(reason, filename, lineno)
+ ret = -1
+ error += 1
+ if not force_all_family_option:
+ return [ret, warning, error, unit_tests]
+ else:
+ rule_output = output_clean(pre_output, chain)
+ if (len(rule) == 3):
+ teoric_exit = rule[2]
+ else:
+ teoric_exit = rule[0]
+
+ if (rule_output.rstrip() != teoric_exit.rstrip()):
+ if (rule[0].find("{") != -1): # anonymous sets
+ if (set_check_element(teoric_exit, rule_output) != 0):
+ warning += 1
+ print_differences_warning(filename, lineno,
+ rule[0], rule_output,
+ cmd)
+ if not force_all_family_option:
+ return [ret, warning, error, unit_tests]
+ else:
+ if len(rule_output) <= 0:
+ error += 1
+ print_differences_error(filename, lineno,
+ rule_output, cmd)
+ if not force_all_family_option:
+ return [ret, warning, error, unit_tests]
+ if rule[0].find(rule_output.split(" ")[0]) > -1:
+ warning += 1
+ print_differences_warning(filename, lineno,
+ rule[0], rule_output,
+ cmd)
+ else:
+ error += 1
+ print_differences_error(filename, lineno,
+ rule_output, cmd)
+ if not force_all_family_option:
+ return [ret, warning, error, unit_tests]
+
+ return [ret, warning, error, unit_tests]
+
+
+def preexec():
+ os.setpgrp() # Don't forward signals.
+
+
+def cleanup_on_exit():
+ for table in table_list:
+ for chain in chain_list:
+ ret = chain_delete(chain, table, "", "")
+ if all_set:
+ ret = set_delete(all_set, table)
+ ret = table_delete(table)
+
+
+def signal_handler(signal, frame):
+ global signal_received
+ signal_received = 1
+
+
+def execute_cmd(cmd, filename, lineno):
+ '''
+ Executes a command, checks for segfaults and returns the command exit
+ code.
+
+ :param cmd: string with the command to be executed
+ :param filename: name of the file tested (used for print_error purposes)
+ :param lineno: line number being tested (used for print_error purposes)
+ '''
+ global log_file
+ print >> log_file, "command: %s" % cmd
+ if debug_option:
+ print cmd
+ ret = subprocess.call(cmd, shell=True, universal_newlines=True,
+ stderr=subprocess.STDOUT, stdout=log_file,
+ preexec_fn=preexec)
+ log_file.flush()
+
+ if ret == -11:
+ reason = "command segfaults: " + cmd
+ print_error(reason, filename, lineno)
+
+ return ret
+
+
+def print_result(filename, tests, warning, error):
+ return str(filename) + ": " + str(tests) + " unit tests, " + \
+ str(error) + " error, " + str(warning) + " warning"
+
+
+def print_result_all(filename, tests, warning, error, unit_tests):
+ return str(filename) + ": " + str(tests) + " unit tests, " +\
+ str(unit_tests) + " total test executed, " + \
+ str(error) + " error, " + \
+ str(warning) + " warning"
+
+
+def table_process(table_line, filename, lineno):
+ if ";" in table_line:
+ table_info = table_line.split(";")
+ else:
+ table_info.append("ip")
+ table_info.append(table_line)
+
+ return table_create(table_info, filename, lineno)
+
+
+def chain_process(chain_line, filename, lineno):
+ chain_name = chain_line[0]
+ chain_type = ""
+ for table in table_list:
+ if len(chain_line) > 1:
+ chain_type = chain_line[1]
+ ret = chain_create(chain_name, chain_type, chain_list, table,
+ filename, lineno)
+ if ret != 0:
+ return -1
+ return ret
+
+
+def set_process(set_line, filename, lineno):
+ set_info = []
+ set_name = "".join(set_line[0].rstrip()[1:])
+ set_info.append(set_name)
+ set_type = set_line[1].split(";")[0]
+ set_state = set_line[1].split(";")[1] # ok or fail
+ set_info.append(set_type)
+ set_info.append(set_state)
+ ret = set_add(set_info, table_list, filename, lineno)
+ if ret == 0:
+ all_set[set_name] = set()
+
+ return ret
+
+
+def set_element_process(element_line, filename, lineno):
+ rule_state = element_line[1]
+ set_name = element_line[0].split(" ")[0]
+ set_element = element_line[0].split(" ")
+ set_element.remove(set_name)
+ return set_add_elements(set_element, set_name, all_set, rule_state,
+ table_list, filename, lineno)
+
+
+def run_test_file(filename, force_all_family_option, specific_file):
+ '''
+ Runs a test file
+
+ :param filename: name of the file with the test rules
+ '''
+
+ if specific_file:
+ filename_path = os.path.join(TERMINAL_PATH, filename)
+ else:
+ filename_path = os.path.join(TESTS_PATH, filename)
+
+ f = open(filename_path)
+ tests = passed = total_unit_run = total_warning = total_error = 0
+ table = ""
+ total_test_passed = True
+
+ for lineno, line in enumerate(f):
+ if signal_received == 1:
+ print "\nSignal received. Cleaning up and Exitting..."
+ cleanup_on_exit()
+ sys.exit(0)
+
+ if line.isspace():
+ continue
+
+ if line[0] == "#": # Command-line
+ continue
+
+ if line[0] == '*': # Table
+ table_line = line.rstrip()[1:]
+ ret = table_process(table_line, filename, lineno)
+ if (ret != 0):
+ total_test_passed = False
+ break
+ continue
+
+ if line[0] == ":": # Chain
+ chain_line = line.rstrip()[1:].split(";")
+ ret = chain_process(chain_line, filename, lineno)
+ if ret != 0:
+ total_test_passed = False
+ break
+ continue
+
+ if line[0] == "!": # Adds this set
+ set_line = line.rstrip()[0:].split(" ")
+ ret = set_process(set_line, filename, lineno)
+ tests += 1
+ if ret == -1:
+ total_test_passed = False
+ continue
+ passed += 1
+ continue
+
+ if line[0] == "?": # Adds elements in a set
+ element_line = line.rstrip()[1:].split(";")
+ ret = set_element_process(element_line, filename, lineno)
+ tests += 1
+ if ret == -1:
+ total_test_passed = False
+ continue
+
+ passed += 1
+ continue
+
+ # Rule
+ rule = line.split(';') # rule[1] Ok or FAIL
+ if line[0] == "-": # Run omitted lines
+ if line[1:].find("*") != -1:
+ continue
+ if need_fix_option:
+ rule[0] = rule[0].rstrip()[1:]
+ result = rule_add(rule, table_list, chain_list, filename,
+ lineno, force_all_family_option)
+ tests += 1
+ warning = result[1]
+ ret = result[0]
+ total_warning += warning
+ total_error += result[2]
+ total_unit_run += result[3]
+
+ if ret != 0:
+ total_test_passed = False
+ elif warning == 0:
+ passed += 1
+ continue
+ else:
+ continue
+ if need_fix_option:
+ continue
+
+ result = rule_add(rule, table_list, chain_list, filename, lineno,
+ force_all_family_option)
+ tests += 1
+ ret = result[0]
+ warning = result[1]
+ total_warning += warning
+ total_error += result[2]
+ total_unit_run += result[3]
+
+ if ret != 0:
+ total_test_passed = False
+ continue
+
+ if warning == 0: # All ok.
+ passed += 1
+
+ # Delete rules, sets, chains and tables
+ for table in table_list:
+ # We delete chains
+ for chain in chain_list:
+ ret = chain_delete(chain, table, filename, lineno)
+ if ret != 0:
+ total_test_passed = False
+
+ # We delete sets.
+ if all_set:
+ ret = set_delete(all_set, table, filename, lineno)
+ if ret != 0:
+ total_test_passed = False
+ reason = "There is a problem when we delete a set"
+ print_error(reason, filename, lineno)
+
+ # We delete tables.
+ ret = table_delete(table, filename, lineno)
+
+ if ret != 0:
+ total_test_passed = False
+
+ if specific_file:
+ if force_all_family_option:
+ print print_result_all(filename, tests, total_warning, total_error,
+ total_unit_run)
+ else:
+ print print_result(filename, tests, total_warning, total_error)
+ else:
+ if (tests == passed and tests > 0):
+ print filename + ": " + Colors.GREEN + "OK" + Colors.ENDC
+
+ f.close()
+ del table_list[:]
+ del chain_list[:]
+ all_set.clear()
+
+ return [tests, passed, total_warning, total_error, total_unit_run]
+
+
+def main():
+ parser = argparse.ArgumentParser(description='Run nft tests',
+ version='1.0')
+
+ parser.add_argument('filename', nargs='?',
+ metavar='path/to/file.t',
+ help='Run only this test')
+
+ parser.add_argument('-d', '--debug', action='store_true',
+ dest='debug',
+ help='enable debugging mode')
+
+ parser.add_argument('-e', '--need-fix', action='store_true',
+ dest='need_fix_line',
+ help='run rules that need a fix')
+
+ parser.add_argument('-f', '--force-family', action='store_true',
+ dest='force_all_family',
+ help='keep testing all families on error')
+
+ args = parser.parse_args()
+ global debug_option, need_fix_option
+ debug_option = args.debug
+ need_fix_option = args.need_fix_line
+ force_all_family_option = args.force_all_family
+ specific_file = False
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ if os.getuid() != 0:
+ print "You need to be root to run this, sorry"
+ return
+
+ test_files = files_ok = run_total = 0
+ tests = passed = warnings = errors = 0
+ global log_file
+ try:
+ log_file = open(LOGFILE, 'w')
+ except IOError:
+ print "Cannot open log file %s" % LOGFILE
+ return
+
+ file_list = []
+ if args.filename:
+ file_list = [args.filename]
+ specific_file = True
+ else:
+ for directory in TESTS_DIRECTORY:
+ path = os.path.join(TESTS_PATH, directory)
+ for root, dirs, files in os.walk(path):
+ for f in files:
+ if f.endswith(".t"):
+ file_list.append(os.path.join(directory, f))
+
+ for filename in file_list:
+ result = run_test_file(filename, force_all_family_option, specific_file)
+ file_tests = result[0]
+ file_passed = result[1]
+ file_warnings = result[2]
+ file_errors = result[3]
+ file_unit_run = result[4]
+
+ if file_warnings == 0 and file_tests == file_passed:
+ files_ok += 1
+ if file_tests:
+ tests += file_tests
+ passed += file_passed
+ errors += file_errors
+ warnings += file_warnings
+ test_files += 1
+ if force_all_family_option:
+ run_total += file_unit_run
+
+ if test_files == 0:
+ print "No test files to run"
+ else:
+ if not specific_file:
+ if force_all_family_option:
+ print ("%d test files, %d files passed, %d unit tests, %d total executed, %d error, %d warning" %
+ (test_files, files_ok, tests, run_total, errors, warnings))
+ else:
+ print ("%d test files, %d files passed, %d unit tests, %d error, %d warning" %
+ (test_files, files_ok, tests, errors, warnings))
+
+if __name__ == '__main__':
+ main()