summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorAna Rey <anarey@gmail.com>2014-09-18 12:39:15 +0200
committerPablo Neira Ayuso <pablo@netfilter.org>2014-09-18 12:51:53 +0200
commita7fff8bc3df3c51f8d768912d2899be939ae9a97 (patch)
tree06b896b013efb9fc141a9206bf1472fb56876136 /tests
parentcdde067c88642d2218237aae068202654134a348 (diff)
tests: Add automated regression testing
Here, the automated regression testing for nftables and some test files. This script checks that the rule input and output of nft matches. More details here below. A) What is this testing? This script tests two different paths: * The rule input from the command-line. This checks the different steps from the command line to the kernel. This includes the parsing, evaluation and netlink generation steps. * The output listing that is obtained from the kernel. This checks the different steps from the kernel to the command line: The netlink message parsing, postprocess and textify steps to display the rule listing. As a final step, this script compares that the rule that is added can be listed by nft. B) What options are available? The script offers the following options: * Execute test files: ./nft-test.py # Run all test files ./nft-test.py path/file.t # Run this test file If there is a problem, it shows the differences between the rule that is added and the rule that is listed by nft. In case you hit an error, the script doesn't keep testing for more families. Unless you specify the --force-family option. * Execute broken tests: ./nft-test.sh -e This runs tests for rules that need a fix: This mode runs the lines that that start with a "-" symbol. * Debugging: ./nft-test.sh -d This shows all the commands that the script executes, so you can watch its internal behaviour. * Keep testing all families on error. ./nft-test.sh -f Don't stop testing for more families in case of error. C) What is the structure of the test file? A test file contains a set of rules that are added in the system. Here, an example of a test file: *ip;test-ipv4 # line 1 *ip6;test-ipv6 # line 2 *inet;test-inet # line 3 :input;type filter hook input priority 0 # line 4 ah hdrlength != 11-23;ok;ah hdrlength < 11 ah hdrlength > 23 # line 5 - tcp dport != {22-25} # line 6 !set1 ipv4_addr;ok # line 7 ?set1 192.168.3.8 192.168.3.9;ok # line 8 # This is a commented-line. # line 9 Line 1 defines a table. The name of the table is 'test-ip' and the family is ip. Lines 2 and 3 defines more tables for different families so the rules in this test file are also tested there. Line 4 defines the chain. The name of this chain is "input". The type is "filter", the hook is "input" and the priority is 0. Line 5 defines the rule, the ";" character is used as separator of several parts: * Part 1: "ah hdrlength != 11-23" is the rule to check. * Part 2: "ok" is the result expected with the execute of this rule. * Part 3: "ah hdrlength < 11 ah hdrlength > 23". This is the expected output. You can leave this empty if the output is the same as the input. Line 6 is a marked line. This means that this rule is tested if '-e' is passed as argument to nft-test.py. Line 7 adds a new set. The name of this set is "set1" and the type of this set is "ipv4_add". Line 8 adds two elements into the 'set1' set: "192.168.3.8" and "192.168.3.9". A whitespace separates the elements of the set. Line 9 uses the "#" symbol that means that this line is commented out. D) The test folders The test files are divided in several directories: ip, ip6, inet, arp, bridge and any. * "ip" folder contains the test files that are executed in ip and inet table. * "ip" folder contains the test files that are executed in ip6 and inet table. * "inet" folder contains the test files that are executed in the ip, ip6 and inet table. * "arp" folder contains the test files that are executed in the arp table. * "bridge" folder: Here are the test files are executed in bridge tables. * "any" folder: Here are the test files are executed in ip, ip6, inet, arp and bridge tables. E) Meaning of messages: * A warning message means the rule input and output of nft mismatches. * An error message means the nft-tool shows an error when we add it or the listing is broken after the rule is added. Signed-off-by: Ana Rey <anarey@gmail.com> Signed-off-by: Pablo Neira Ayuso <pablo@netfilter.org>
Diffstat (limited to 'tests')
-rw-r--r--tests/regression/README141
-rwxr-xr-xtests/regression/nft-test.py859
2 files changed, 1000 insertions, 0 deletions
diff --git a/tests/regression/README b/tests/regression/README
new file mode 100644
index 00000000..7b66d229
--- /dev/null
+++ b/tests/regression/README
@@ -0,0 +1,141 @@
+Author: Ana Rey <anarey@gmail.com>
+Date: 18/Sept/2014
+
+Here, the automated regression testing for nftables and some test
+files.
+
+This script checks that the rule input and output of nft matches.
+More details here below.
+
+A) What is this testing?
+
+This script tests two different paths:
+
+* The rule input from the command-line. This checks the different steps
+ from the command line to the kernel. This includes the parsing,
+ evaluation and netlink generation steps.
+
+* The output listing that is obtained from the kernel. This checks the
+ different steps from the kernel to the command line: The netlink
+ message parsing, postprocess and textify steps to display the rule
+ listing.
+
+As a final step, this script compares that the rule that is added can
+be listed by nft.
+
+B) What options are available?
+
+The script offers the following options:
+
+* Execute test files:
+
+./nft-test.py # Run all test files
+./nft-test.py path/file.t # Run this test file
+
+If there is a problem, it shows the differences between the rule that
+is added and the rule that is listed by nft.
+
+In case you hit an error, the script doesn't keep testing for more
+families. Unless you specify the --force-family option.
+
+* Execute broken tests:
+
+./nft-test.sh -e
+
+This runs tests for rules that need a fix: This mode runs the lines that
+that start with a "-" symbol.
+
+* Debugging:
+
+./nft-test.sh -d
+
+This shows all the commands that the script executes, so you can watch
+its internal behaviour.
+
+* Keep testing all families on error.
+
+./nft-test.sh -f
+
+Don't stop testing for more families in case of error.
+
+C) What is the structure of the test file?
+
+A test file contains a set of rules that are added in the system.
+
+Here, an example of a test file:
+
+ *ip;test-ipv4 # line 1
+ *ip6;test-ipv6 # line 2
+ *inet;test-inet # line 3
+
+ :input;type filter hook input priority 0 # line 4
+
+ ah hdrlength != 11-23;ok;ah hdrlength < 11 ah hdrlength > 23 # line 5
+ - tcp dport != {22-25} # line 6
+
+ !set1 ipv4_addr;ok # line 7
+ ?set1 192.168.3.8 192.168.3.9;ok # line 8
+ # This is a commented-line. # line 9
+
+Line 1 defines a table. The name of the table is 'test-ip' and the
+family is ip. Lines 2 and 3 defines more tables for different families
+so the rules in this test file are also tested there.
+
+Line 4 defines the chain. The name of this chain is "input". The type is
+"filter", the hook is "input" and the priority is 0.
+
+Line 5 defines the rule, the ";" character is used as separator of several
+parts:
+
+* Part 1: "ah hdrlength != 11-23" is the rule to check.
+* Part 2: "ok" is the result expected with the execute of this rule.
+* Part 3: "ah hdrlength < 11 ah hdrlength > 23". This is the expected
+ output. You can leave this empty if the output is the same as the
+ input.
+
+Line 6 is a marked line. This means that this rule is tested if
+'-e' is passed as argument to nft-test.py.
+
+Line 7 adds a new set. The name of this set is "set1" and the type
+of this set is "ipv4_add".
+
+Line 8 adds two elements into the 'set1' set: "192.168.3.8" and
+"192.168.3.9". A whitespace separates the elements of the set.
+
+Line 9 uses the "#" symbol that means that this line is commented out.
+
+D) The test folders
+
+The test files are divided in several directories: ip, ip6, inet, arp,
+bridge and any.
+
+ * "ip" folder contains the test files that are executed in ip and inet
+ table.
+
+ * "ip" folder contains the test files that are executed in ip6 and inet
+ table.
+
+ * "inet" folder contains the test files that are executed in the ip, ip6
+ and inet table.
+
+ * "arp" folder contains the test files that are executed in the arp
+ table.
+
+ * "bridge" folder: Here are the test files are executed in bridge
+ tables.
+
+ * "any" folder: Here are the test files are executed in ip, ip6, inet,
+ arp and bridge tables.
+
+E) Meaning of messages:
+
+* A warning message means the rule input and output of nft mismatches.
+* An error message means the nft-tool shows an error when we add it or
+ the listing is broken after the rule is added.
+
+F) Acknowledgements
+
+Thanks to the Outreach Program for Women (OPW) for sponsoring this test
+infrastructure and my mentor Pablo Neira.
+
+-EOF-
diff --git a/tests/regression/nft-test.py b/tests/regression/nft-test.py
new file mode 100755
index 00000000..d4929f08
--- /dev/null
+++ b/tests/regression/nft-test.py
@@ -0,0 +1,859 @@
+#!/usr/bin/python
+#
+# (C) 2014 by Ana Rey Botello <anarey@gmail.com>
+#
+# Based on iptables-test.py:
+# (C) 2012 by Pablo Neira Ayuso <pablo@netfilter.org>"
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# Thanks to the Outreach Program for Women (OPW) for sponsoring this test
+# infrastructure.
+
+import sys
+import os
+import subprocess
+import argparse
+import signal
+
+TERMINAL_PATH = os.getcwd()
+TESTS_PATH = os.path.dirname(os.path.abspath(__file__))
+TESTS_DIRECTORY = ["any", "arp", "bridge", "inet", "ip", "ip6"]
+LOGFILE = "/tmp/nftables-test.log"
+log_file = None
+table_list = []
+chain_list = []
+all_set = dict()
+signal_received = 0
+
+
+class Colors:
+ HEADER = '\033[95m'
+ GREEN = '\033[92m'
+ YELLOW = '\033[93m'
+ RED = '\033[91m'
+ ENDC = '\033[0m'
+
+
+def print_error(reason, filename=None, lineno=None):
+ '''
+ Prints an error with nice colors, indicating file and line number.
+ '''
+ if filename and lineno:
+ print (filename + ": " + Colors.RED + "ERROR:" +
+ Colors.ENDC + " line %d: %s" % (lineno + 1, reason))
+ else:
+ print (Colors.RED + "ERROR:" + Colors.ENDC + " %s" % (reason))
+
+
+def print_warning(reason, filename=None, lineno=None):
+ '''
+ Prints a warning with nice colors, indicating file and line number.
+ '''
+ if filename and lineno:
+ print (filename + ": " + Colors.YELLOW + "WARNING:" + \
+ Colors.ENDC + " line %d: %s" % (lineno + 1, reason))
+ else:
+ print (Colors.YELLOW + "WARNING:" + " %s" % (reason))
+
+
+def print_differences_warning(filename, lineno, rule1, rule2, cmd):
+ reason = "'" + rule1 + "' mismatches '" + rule2 + "'"
+ print filename + ": " + Colors.YELLOW + "WARNING: " + Colors.ENDC + \
+ "line: " + str(lineno + 1) + ": '" + cmd + "': " + reason
+
+
+def print_differences_error(filename, lineno, output, cmd):
+ reason = "Listing is broken."
+ print filename + ": " + Colors.RED + "ERROR: " + Colors.ENDC + \
+ "line: " + str(lineno + 1) + ": '" + cmd + "': " + reason
+
+
+def table_exist(table, filename, lineno):
+ '''
+ Exists a table.
+ '''
+ cmd = "nft list -nnn table " + table[0] + " " + table[1]
+ ret = execute_cmd(cmd, filename, lineno)
+
+ return True if (ret == 0) else False
+
+
+def table_flush(table, filename, lineno):
+ '''
+ Flush a table.
+ '''
+ cmd = "nft flush table " + str(table[0]) + " " + str(table[1])
+ ret = execute_cmd(cmd, filename, lineno)
+
+ return cmd
+
+
+def table_create(table, filename, lineno):
+ '''
+ Adds a table.
+ '''
+ ## We check if table exists.
+ if table_exist(table, filename, lineno):
+ reason = "Table " + table[1] + " already exists"
+ print_error(reason, filename, lineno)
+ return -1
+
+ table_list.append(table)
+
+ ## We add a new table
+ cmd = "nft add table " + table[0] + " " + table[1]
+ ret = execute_cmd(cmd, filename, lineno)
+
+ if ret != 0:
+ reason = "Cannot add table " + table[1]
+ print_error(reason, filename, lineno)
+ table_list.remove(table)
+ return -1
+
+ ## We check if table was added correctly.
+ if not table_exist(table, filename, lineno):
+ table_list.remove(table)
+ reason = "I have just added the table " + table[1] + \
+ " but it does not exist. Giving up!"
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def table_delete(table, filename=None, lineno=None):
+ '''
+ Deletes a table.
+ '''
+ table_info = " " + table[0] + " " + table[1] + " "
+
+ if not table_exist(table, filename, lineno):
+ reason = "Table " + table[1] + \
+ " does not exist but I added it before."
+ print_error(reason, filename, lineno)
+ return -1
+
+ cmd = "nft delete table" + table_info
+ ret = execute_cmd(cmd, filename, lineno)
+ if ret != 0:
+ reason = cmd + ": " \
+ "I cannot delete table '" + table[1] + "'. Giving up! "
+ print_error(reason, filename, lineno)
+ return -1
+
+ if table_exist(table, filename, lineno):
+ reason = "I have just deleted the table " + table[1] + \
+ " but the table still exists."
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def chain_exist(chain, table, filename, lineno):
+ '''
+ Checks a chain
+ '''
+
+ table_info = " " + table[0] + " " + table[1] + " "
+ cmd = "nft list -nnn chain" + table_info + chain
+ ret = execute_cmd(cmd, filename, lineno)
+
+ return True if (ret == 0) else False
+
+
+def chain_create(chain, chain_type, chain_list, table, filename, lineno):
+ '''
+ Adds a chain
+ '''
+
+ table_info = " " + table[0] + " " + table[1] + " "
+
+ if chain_exist(chain, table, filename, lineno):
+ reason = "This chain '" + chain + "' exists in " + table[1] + "." + \
+ "I cannot create two chains with same name."
+ print_error(reason, filename, lineno)
+ return -1
+
+ if chain_type:
+ cmd = "nft add chain" + table_info + chain + "\{ " + chain_type + "\; \}"
+ else:
+ cmd = "nft add chain" + table_info + chain
+
+ ret = execute_cmd(cmd, filename, lineno)
+ if ret != 0:
+ reason = "I cannot create the chain '" + chain
+ print_error(reason, filename, lineno)
+ return -1
+
+ if not chain in chain_list:
+ chain_list.append(chain)
+
+ if not chain_exist(chain, table, filename, lineno):
+ reason = "I have added the chain '" + chain + \
+ "' but it does not exist in " + table[1]
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def chain_delete(chain, table, filename=None, lineno=None):
+ '''
+ Flushes and deletes a chain.
+ '''
+
+ table_info = " " + table[0] + " " + table[1] + " "
+
+ if not chain_exist(chain, table, filename, lineno):
+ reason = "The chain " + chain + " does not exists in " + table[1] + \
+ ". I cannot delete it."
+ print_error(reason, filename, lineno)
+ return -1
+
+ cmd = "nft flush chain" + table_info + chain
+ ret = execute_cmd(cmd, filename, lineno)
+ if ret != 0:
+ reason = "I cannot flush this chain " + chain
+ print_error(reason, filename, lineno)
+ return -1
+
+ cmd = "nft delete chain" + table_info + chain
+ ret = execute_cmd(cmd, filename, lineno)
+ if ret != 0:
+ reason = cmd + "I cannot delete this chain. DD"
+ print_error(reason, filename, lineno)
+ return -1
+
+ if chain_exist(chain, table, filename, lineno):
+ reason = "The chain " + chain + " exists in " + table[1] + \
+ ". I cannot delete this chain"
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def set_add(set_info, table_list, filename, lineno):
+ '''
+ Adds a set.
+ '''
+
+ if not table_list:
+ reason = "Missing table to add rule"
+ print_error(reason, filename, lineno)
+ return -1
+
+ for table in table_list:
+ if set_exist(set_info[0], table, filename, lineno):
+ reason = "This set " + set_info + " exists in " + table[1] + \
+ ". I cannot add it again"
+ print_error(reason, filename, lineno)
+ return -1
+
+ table_info = " " + table[0] + " " + table[1] + " "
+ set_text = " " + set_info[0] + " { type " + set_info[1] + " \;}"
+ cmd = "nft add set" + table_info + set_text
+ ret = execute_cmd(cmd, filename, lineno)
+
+ if (ret == 0 and set_info[2].rstrip() == "fail") or \
+ (ret != 0 and set_info[2].rstrip() == "ok"):
+ reason = cmd + ": " + "I cannot add the set " + set_info[0]
+ print_error(reason, filename, lineno)
+ return -1
+
+ if not set_exist(set_info[0], table, filename, lineno):
+ reason = "I have just added the set " + set_info[0] + \
+ " to the table " + table[1] + " but it does not exist"
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def set_add_elements(set_element, set_name, set_all, state, table_list,
+ filename, lineno):
+ '''
+ Adds elements to the set.
+ '''
+
+ if not table_list:
+ reason = "Missing table to add rules"
+ print_error(reason, filename, lineno)
+ return -1
+
+ for table in table_list:
+ # Check if set exists.
+ if (not set_exist(set_name, table, filename, lineno) or
+ not set_name in set_all) and state == "ok":
+ reason = "I cannot add an element to the set " + set_name + \
+ " since it does not exist."
+ print_error(reason, filename, lineno)
+ return -1
+
+ table_info = " " + table[0] + " " + table[1] + " "
+
+ element = ""
+ for e in set_element:
+ if not element:
+ element = e
+ else:
+ element = element + ", " + e
+
+ set_text = set_name + " { " + element + " }"
+ cmd = "nft add element -nnn" + table_info + set_text
+ ret = execute_cmd(cmd, filename, lineno)
+
+ if (state == "fail" and ret == 0) or (state == "ok" and ret != 0):
+ test_state = "This rule should have failed."
+ reason = cmd + ": " + test_state
+ print_error(reason, filename, lineno)
+ return -1
+
+ # Add element into a all_set.
+ if (ret == 0 and state == "ok"):
+ for e in set_element:
+ set_all[set_name].add(e)
+
+ return 0
+
+
+def set_delete_elements(set_element, set_name, table, filename=None,
+ lineno=None):
+ '''
+ Deletes elements in a set.
+ '''
+ table_info = " " + table[0] + " " + table[1] + " "
+
+ for element in set_element:
+ set_text = set_name + " {" + element + "}"
+ cmd = "nft delete element -nnn" + table_info + set_text
+ ret = execute_cmd(cmd, filename, lineno)
+ if ret != 0:
+ reason = "I cannot delete an element" + element + \
+ " from the set '" + set_name
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def set_delete(all_set, table, filename=None, lineno=None):
+ '''
+ Deletes set and its content.
+ '''
+
+ for set_name in all_set.keys():
+ # Check if exists the set
+ if not set_exist(set_name, table, filename, lineno):
+ reason = "The set " + set_name + \
+ " does not exist, I cannot delete it"
+ print_error(reason, filename, lineno)
+ return -1
+
+ # We delete all elements in the set
+ set_delete_elements(all_set[set_name], set_name, table, filename,
+ lineno)
+
+ # We delete the set.
+ table_info = " " + table[0] + " " + table[1] + " "
+ cmd = "nft delete set " + table_info + " " + set_name
+ ret = execute_cmd(cmd, filename, lineno)
+
+ # Check if the set still exists after I deleted it.
+ if ret != 0 or set_exist(set_name, table, filename, lineno):
+ reason = "Cannot remove the set " + set_name
+ print_error(reason, filename, lineno)
+ return -1
+
+ return 0
+
+
+def set_exist(set_name, table, filename, lineno):
+ '''
+ Check if the set exists.
+ '''
+ table_info = " " + table[0] + " " + table[1] + " "
+ cmd = "nft list -nnn set" + table_info + set_name
+ ret = execute_cmd(cmd, filename, lineno)
+
+ return True if (ret == 0) else False
+
+
+def set_check_element(rule1, rule2):
+ '''
+ Check if element exists in anonymous sets.
+ '''
+ ret = -1
+ pos1 = rule1.find("{")
+ pos2 = rule2.find("{")
+ end1 = rule1.find("}")
+ end2 = rule2.find("}")
+
+ if ((pos1 != -1) and (pos2 != -1) and (end1 != -1) and (end2 != -1)):
+ list1 = (rule1[pos1 + 1:end1].replace(" ", "")).split(",")
+ list2 = (rule2[pos2 + 1:end2].replace(" ", "")).split(",")
+ list1.sort()
+ list2.sort()
+ if (cmp(list1, list2) == 0):
+ ret = 0
+ return ret
+
+
+def output_clean(pre_output, chain):
+ pos_chain = pre_output[0].find(chain)
+ if pos_chain == -1:
+ return ""
+ output_intermediate = pre_output[0][pos_chain:]
+ brace_start = output_intermediate.find("{")
+ brace_end = output_intermediate.find("}")
+ pre_rule = output_intermediate[brace_start:brace_end]
+ if pre_rule[1:].find("{") > -1: # this rule has a set.
+ set = pre_rule[1:].replace("\t", "").replace("\n", "").strip()
+ set = set.split(";")[1].strip() + "}"
+ return set
+ else:
+ rule = pre_rule.split(";")[1].replace("\t", "").replace("\n", "").strip()
+ if len(rule) < 0:
+ return ""
+ return rule
+
+
+def rule_add(rule, table_list, chain_list, filename, lineno,
+ force_all_family_option):
+ '''
+ Adds a rule
+ '''
+ # TODO Check if a rule is added correctly.
+ ret = warning = error = unit_tests = 0
+
+ if not table_list or not chain_list:
+ reason = "Missing table or chain to add rule."
+ print_error(reason, filename, lineno)
+ return [-1, warning, error, unit_tests]
+
+ for table in table_list:
+ for chain in chain_list:
+ if len(rule) == 1:
+ reason = "Skipping malformed test. (" + \
+ str(rule[0].rstrip('\n')) + ")"
+ print_warning(reason, filename, lineno)
+ continue
+
+ unit_tests += 1
+ table_flush(table, filename, lineno)
+ table_info = " " + table[0] + " " + table[1] + " "
+ cmd = "nft add rule -nnn" + table_info + chain + " " + rule[0]
+
+ ret = execute_cmd(cmd, filename, lineno)
+
+ state = rule[1].rstrip()
+ if (ret == 0 and state == "fail") or (ret != 0 and state == "ok"):
+ if state == "fail":
+ test_state = "This rule should have failed."
+ else:
+ test_state = "This rule should not have failed."
+ reason = cmd + ": " + test_state
+ print_error(reason, filename, lineno)
+ ret = -1
+ error += 1
+ if not force_all_family_option:
+ return [ret, warning, error, unit_tests]
+
+ if (state == "fail" and ret != 0):
+ ret = 0
+ continue
+
+ if ret == 0:
+ # Check output of nft
+ process = subprocess.Popen(['nft', '-nnn', 'list', 'table'] + table,
+ shell=False, stdout=subprocess.PIPE,
+ preexec_fn=preexec)
+ pre_output = process.communicate()
+ output = pre_output[0].split(";")
+ if len(output) < 2:
+ reason = cmd + ": Listing is broken."
+ print_error(reason, filename, lineno)
+ ret = -1
+ error += 1
+ if not force_all_family_option:
+ return [ret, warning, error, unit_tests]
+ else:
+ rule_output = output_clean(pre_output, chain)
+ if (len(rule) == 3):
+ teoric_exit = rule[2]
+ else:
+ teoric_exit = rule[0]
+
+ if (rule_output.rstrip() != teoric_exit.rstrip()):
+ if (rule[0].find("{") != -1): # anonymous sets
+ if (set_check_element(teoric_exit, rule_output) != 0):
+ warning += 1
+ print_differences_warning(filename, lineno,
+ rule[0], rule_output,
+ cmd)
+ if not force_all_family_option:
+ return [ret, warning, error, unit_tests]
+ else:
+ if len(rule_output) <= 0:
+ error += 1
+ print_differences_error(filename, lineno,
+ rule_output, cmd)
+ if not force_all_family_option:
+ return [ret, warning, error, unit_tests]
+ if rule[0].find(rule_output.split(" ")[0]) > -1:
+ warning += 1
+ print_differences_warning(filename, lineno,
+ rule[0], rule_output,
+ cmd)
+ else:
+ error += 1
+ print_differences_error(filename, lineno,
+ rule_output, cmd)
+ if not force_all_family_option:
+ return [ret, warning, error, unit_tests]
+
+ return [ret, warning, error, unit_tests]
+
+
+def preexec():
+ os.setpgrp() # Don't forward signals.
+
+
+def cleanup_on_exit():
+ for table in table_list:
+ for chain in chain_list:
+ ret = chain_delete(chain, table, "", "")
+ if all_set:
+ ret = set_delete(all_set, table)
+ ret = table_delete(table)
+
+
+def signal_handler(signal, frame):
+ global signal_received
+ signal_received = 1
+
+
+def execute_cmd(cmd, filename, lineno):
+ '''
+ Executes a command, checks for segfaults and returns the command exit
+ code.
+
+ :param cmd: string with the command to be executed
+ :param filename: name of the file tested (used for print_error purposes)
+ :param lineno: line number being tested (used for print_error purposes)
+ '''
+ global log_file
+ print >> log_file, "command: %s" % cmd
+ if debug_option:
+ print cmd
+ ret = subprocess.call(cmd, shell=True, universal_newlines=True,
+ stderr=subprocess.STDOUT, stdout=log_file,
+ preexec_fn=preexec)
+ log_file.flush()
+
+ if ret == -11:
+ reason = "command segfaults: " + cmd
+ print_error(reason, filename, lineno)
+
+ return ret
+
+
+def print_result(filename, tests, warning, error):
+ return str(filename) + ": " + str(tests) + " unit tests, " + \
+ str(error) + " error, " + str(warning) + " warning"
+
+
+def print_result_all(filename, tests, warning, error, unit_tests):
+ return str(filename) + ": " + str(tests) + " unit tests, " +\
+ str(unit_tests) + " total test executed, " + \
+ str(error) + " error, " + \
+ str(warning) + " warning"
+
+
+def table_process(table_line, filename, lineno):
+ if ";" in table_line:
+ table_info = table_line.split(";")
+ else:
+ table_info.append("ip")
+ table_info.append(table_line)
+
+ return table_create(table_info, filename, lineno)
+
+
+def chain_process(chain_line, filename, lineno):
+ chain_name = chain_line[0]
+ chain_type = ""
+ for table in table_list:
+ if len(chain_line) > 1:
+ chain_type = chain_line[1]
+ ret = chain_create(chain_name, chain_type, chain_list, table,
+ filename, lineno)
+ if ret != 0:
+ return -1
+ return ret
+
+
+def set_process(set_line, filename, lineno):
+ set_info = []
+ set_name = "".join(set_line[0].rstrip()[1:])
+ set_info.append(set_name)
+ set_type = set_line[1].split(";")[0]
+ set_state = set_line[1].split(";")[1] # ok or fail
+ set_info.append(set_type)
+ set_info.append(set_state)
+ ret = set_add(set_info, table_list, filename, lineno)
+ if ret == 0:
+ all_set[set_name] = set()
+
+ return ret
+
+
+def set_element_process(element_line, filename, lineno):
+ rule_state = element_line[1]
+ set_name = element_line[0].split(" ")[0]
+ set_element = element_line[0].split(" ")
+ set_element.remove(set_name)
+ return set_add_elements(set_element, set_name, all_set, rule_state,
+ table_list, filename, lineno)
+
+
+def run_test_file(filename, force_all_family_option, specific_file):
+ '''
+ Runs a test file
+
+ :param filename: name of the file with the test rules
+ '''
+
+ if specific_file:
+ filename_path = os.path.join(TERMINAL_PATH, filename)
+ else:
+ filename_path = os.path.join(TESTS_PATH, filename)
+
+ f = open(filename_path)
+ tests = passed = total_unit_run = total_warning = total_error = 0
+ table = ""
+ total_test_passed = True
+
+ for lineno, line in enumerate(f):
+ if signal_received == 1:
+ print "\nSignal received. Cleaning up and Exitting..."
+ cleanup_on_exit()
+ sys.exit(0)
+
+ if line.isspace():
+ continue
+
+ if line[0] == "#": # Command-line
+ continue
+
+ if line[0] == '*': # Table
+ table_line = line.rstrip()[1:]
+ ret = table_process(table_line, filename, lineno)
+ if (ret != 0):
+ total_test_passed = False
+ break
+ continue
+
+ if line[0] == ":": # Chain
+ chain_line = line.rstrip()[1:].split(";")
+ ret = chain_process(chain_line, filename, lineno)
+ if ret != 0:
+ total_test_passed = False
+ break
+ continue
+
+ if line[0] == "!": # Adds this set
+ set_line = line.rstrip()[0:].split(" ")
+ ret = set_process(set_line, filename, lineno)
+ tests += 1
+ if ret == -1:
+ total_test_passed = False
+ continue
+ passed += 1
+ continue
+
+ if line[0] == "?": # Adds elements in a set
+ element_line = line.rstrip()[1:].split(";")
+ ret = set_element_process(element_line, filename, lineno)
+ tests += 1
+ if ret == -1:
+ total_test_passed = False
+ continue
+
+ passed += 1
+ continue
+
+ # Rule
+ rule = line.split(';') # rule[1] Ok or FAIL
+ if line[0] == "-": # Run omitted lines
+ if line[1:].find("*") != -1:
+ continue
+ if need_fix_option:
+ rule[0] = rule[0].rstrip()[1:]
+ result = rule_add(rule, table_list, chain_list, filename,
+ lineno, force_all_family_option)
+ tests += 1
+ warning = result[1]
+ ret = result[0]
+ total_warning += warning
+ total_error += result[2]
+ total_unit_run += result[3]
+
+ if ret != 0:
+ total_test_passed = False
+ elif warning == 0:
+ passed += 1
+ continue
+ else:
+ continue
+ if need_fix_option:
+ continue
+
+ result = rule_add(rule, table_list, chain_list, filename, lineno,
+ force_all_family_option)
+ tests += 1
+ ret = result[0]
+ warning = result[1]
+ total_warning += warning
+ total_error += result[2]
+ total_unit_run += result[3]
+
+ if ret != 0:
+ total_test_passed = False
+ continue
+
+ if warning == 0: # All ok.
+ passed += 1
+
+ # Delete rules, sets, chains and tables
+ for table in table_list:
+ # We delete chains
+ for chain in chain_list:
+ ret = chain_delete(chain, table, filename, lineno)
+ if ret != 0:
+ total_test_passed = False
+
+ # We delete sets.
+ if all_set:
+ ret = set_delete(all_set, table, filename, lineno)
+ if ret != 0:
+ total_test_passed = False
+ reason = "There is a problem when we delete a set"
+ print_error(reason, filename, lineno)
+
+ # We delete tables.
+ ret = table_delete(table, filename, lineno)
+
+ if ret != 0:
+ total_test_passed = False
+
+ if specific_file:
+ if force_all_family_option:
+ print print_result_all(filename, tests, total_warning, total_error,
+ total_unit_run)
+ else:
+ print print_result(filename, tests, total_warning, total_error)
+ else:
+ if (tests == passed and tests > 0):
+ print filename + ": " + Colors.GREEN + "OK" + Colors.ENDC
+
+ f.close()
+ del table_list[:]
+ del chain_list[:]
+ all_set.clear()
+
+ return [tests, passed, total_warning, total_error, total_unit_run]
+
+
+def main():
+ parser = argparse.ArgumentParser(description='Run nft tests',
+ version='1.0')
+
+ parser.add_argument('filename', nargs='?',
+ metavar='path/to/file.t',
+ help='Run only this test')
+
+ parser.add_argument('-d', '--debug', action='store_true',
+ dest='debug',
+ help='enable debugging mode')
+
+ parser.add_argument('-e', '--need-fix', action='store_true',
+ dest='need_fix_line',
+ help='run rules that need a fix')
+
+ parser.add_argument('-f', '--force-family', action='store_true',
+ dest='force_all_family',
+ help='keep testing all families on error')
+
+ args = parser.parse_args()
+ global debug_option, need_fix_option
+ debug_option = args.debug
+ need_fix_option = args.need_fix_line
+ force_all_family_option = args.force_all_family
+ specific_file = False
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ if os.getuid() != 0:
+ print "You need to be root to run this, sorry"
+ return
+
+ test_files = files_ok = run_total = 0
+ tests = passed = warnings = errors = 0
+ global log_file
+ try:
+ log_file = open(LOGFILE, 'w')
+ except IOError:
+ print "Cannot open log file %s" % LOGFILE
+ return
+
+ file_list = []
+ if args.filename:
+ file_list = [args.filename]
+ specific_file = True
+ else:
+ for directory in TESTS_DIRECTORY:
+ path = os.path.join(TESTS_PATH, directory)
+ for root, dirs, files in os.walk(path):
+ for f in files:
+ if f.endswith(".t"):
+ file_list.append(os.path.join(directory, f))
+
+ for filename in file_list:
+ result = run_test_file(filename, force_all_family_option, specific_file)
+ file_tests = result[0]
+ file_passed = result[1]
+ file_warnings = result[2]
+ file_errors = result[3]
+ file_unit_run = result[4]
+
+ if file_warnings == 0 and file_tests == file_passed:
+ files_ok += 1
+ if file_tests:
+ tests += file_tests
+ passed += file_passed
+ errors += file_errors
+ warnings += file_warnings
+ test_files += 1
+ if force_all_family_option:
+ run_total += file_unit_run
+
+ if test_files == 0:
+ print "No test files to run"
+ else:
+ if not specific_file:
+ if force_all_family_option:
+ print ("%d test files, %d files passed, %d unit tests, %d total executed, %d error, %d warning" %
+ (test_files, files_ok, tests, run_total, errors, warnings))
+ else:
+ print ("%d test files, %d files passed, %d unit tests, %d error, %d warning" %
+ (test_files, files_ok, tests, errors, warnings))
+
+if __name__ == '__main__':
+ main()