#!/usr/bin/python # # (C) 2014 by Ana Rey Botello # # Based on iptables-test.py: # (C) 2012 by Pablo Neira Ayuso " # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # Thanks to the Outreach Program for Women (OPW) for sponsoring this test # infrastructure. import sys import os import subprocess import argparse import signal TERMINAL_PATH = os.getcwd() NFT_BIN = "src/nft" TESTS_PATH = os.path.dirname(os.path.abspath(__file__)) TESTS_DIRECTORY = ["any", "arp", "bridge", "inet", "ip", "ip6"] LOGFILE = "/tmp/nftables-test.log" log_file = None table_list = [] chain_list = [] all_set = dict() signal_received = 0 class Colors: if sys.stdout.isatty(): HEADER = '\033[95m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' ENDC = '\033[0m' else: HEADER = '' GREEN = '' YELLOW = '' RED = '' ENDC = '' def print_msg(reason, filename=None, lineno=None, color=None, errstr=None): ''' Prints a message with nice colors, indicating file and line number. ''' if filename and lineno: print (filename + ": " + color + "ERROR:" + Colors.ENDC + " line %d: %s" % (lineno + 1, reason)) else: print (color + "ERROR:" + Colors.ENDC + " %s" % (reason)) def print_error(reason, filename=None, lineno=None): print_msg(reason, filename, lineno, Colors.RED, "ERROR:") def print_warning(reason, filename=None, lineno=None): print_msg(reason, filename, lineno, Colors.YELLOW, "WARNING:") def print_differences_warning(filename, lineno, rule1, rule2, cmd): reason = "'" + rule1 + "' mismatches '" + rule2 + "'" print filename + ": " + Colors.YELLOW + "WARNING: " + Colors.ENDC + \ "line: " + str(lineno + 1) + ": '" + cmd + "': " + reason def print_differences_error(filename, lineno, output, cmd): reason = "Listing is broken." print filename + ": " + Colors.RED + "ERROR: " + Colors.ENDC + \ "line: " + str(lineno + 1) + ": '" + cmd + "': " + reason def table_exist(table, filename, lineno): ''' Exists a table. ''' cmd = NFT_BIN + " list -nnn table " + table[0] + " " + table[1] ret = execute_cmd(cmd, filename, lineno) return True if (ret == 0) else False def table_flush(table, filename, lineno): ''' Flush a table. ''' cmd = NFT_BIN + " flush table " + str(table[0]) + " " + str(table[1]) ret = execute_cmd(cmd, filename, lineno) return cmd def table_create(table, filename, lineno): ''' Adds a table. ''' ## We check if table exists. if table_exist(table, filename, lineno): reason = "Table " + table[1] + " already exists" print_error(reason, filename, lineno) return -1 table_list.append(table) ## We add a new table cmd = NFT_BIN + " add table " + table[0] + " " + table[1] ret = execute_cmd(cmd, filename, lineno) if ret != 0: reason = "Cannot add table " + table[1] print_error(reason, filename, lineno) table_list.remove(table) return -1 ## We check if table was added correctly. if not table_exist(table, filename, lineno): table_list.remove(table) reason = "I have just added the table " + table[1] + \ " but it does not exist. Giving up!" print_error(reason, filename, lineno) return -1 return 0 def table_delete(table, filename=None, lineno=None): ''' Deletes a table. ''' table_info = " " + table[0] + " " + table[1] + " " if not table_exist(table, filename, lineno): reason = "Table " + table[1] + \ " does not exist but I added it before." print_error(reason, filename, lineno) return -1 cmd = NFT_BIN + " delete table" + table_info ret = execute_cmd(cmd, filename, lineno) if ret != 0: reason = cmd + ": " \ "I cannot delete table '" + table[1] + "'. Giving up! " print_error(reason, filename, lineno) return -1 if table_exist(table, filename, lineno): reason = "I have just deleted the table " + table[1] + \ " but the table still exists." print_error(reason, filename, lineno) return -1 return 0 def chain_exist(chain, table, filename, lineno): ''' Checks a chain ''' table_info = " " + table[0] + " " + table[1] + " " cmd = NFT_BIN + " list -nnn chain" + table_info + chain ret = execute_cmd(cmd, filename, lineno) return True if (ret == 0) else False def chain_create(chain, chain_type, chain_list, table, filename, lineno): ''' Adds a chain ''' table_info = " " + table[0] + " " + table[1] + " " if chain_exist(chain, table, filename, lineno): reason = "This chain '" + chain + "' exists in " + table[1] + "." + \ "I cannot create two chains with same name." print_error(reason, filename, lineno) return -1 if chain_type: cmd = NFT_BIN + " add chain" + table_info + chain + "\{ " + chain_type + "\; \}" else: cmd = NFT_BIN + " add chain" + table_info + chain ret = execute_cmd(cmd, filename, lineno) if ret != 0: reason = "I cannot create the chain '" + chain print_error(reason, filename, lineno) return -1 if not chain in chain_list: chain_list.append(chain) if not chain_exist(chain, table, filename, lineno): reason = "I have added the chain '" + chain + \ "' but it does not exist in " + table[1] print_error(reason, filename, lineno) return -1 return 0 def chain_delete(chain, table, filename=None, lineno=None): ''' Flushes and deletes a chain. ''' table_info = " " + table[0] + " " + table[1] + " " if not chain_exist(chain, table, filename, lineno): reason = "The chain " + chain + " does not exists in " + table[1] + \ ". I cannot delete it." print_error(reason, filename, lineno) return -1 cmd = NFT_BIN + " flush chain" + table_info + chain ret = execute_cmd(cmd, filename, lineno) if ret != 0: reason = "I cannot flush this chain " + chain print_error(reason, filename, lineno) return -1 cmd = NFT_BIN + " delete chain" + table_info + chain ret = execute_cmd(cmd, filename, lineno) if ret != 0: reason = cmd + "I cannot delete this chain. DD" print_error(reason, filename, lineno) return -1 if chain_exist(chain, table, filename, lineno): reason = "The chain " + chain + " exists in " + table[1] + \ ". I cannot delete this chain" print_error(reason, filename, lineno) return -1 return 0 def set_add(set_info, table_list, filename, lineno): ''' Adds a set. ''' if not table_list: reason = "Missing table to add rule" print_error(reason, filename, lineno) return -1 for table in table_list: if set_exist(set_info[0], table, filename, lineno): reason = "This set " + set_info + " exists in " + table[1] + \ ". I cannot add it again" print_error(reason, filename, lineno) return -1 table_info = " " + table[0] + " " + table[1] + " " set_text = " " + set_info[0] + " { type " + set_info[1] + " \;}" cmd = NFT_BIN + " add set" + table_info + set_text ret = execute_cmd(cmd, filename, lineno) if (ret == 0 and set_info[2].rstrip() == "fail") or \ (ret != 0 and set_info[2].rstrip() == "ok"): reason = cmd + ": " + "I cannot add the set " + set_info[0] print_error(reason, filename, lineno) return -1 if not set_exist(set_info[0], table, filename, lineno): reason = "I have just added the set " + set_info[0] + \ " to the table " + table[1] + " but it does not exist" print_error(reason, filename, lineno) return -1 return 0 def set_add_elements(set_element, set_name, set_all, state, table_list, filename, lineno): ''' Adds elements to the set. ''' if not table_list: reason = "Missing table to add rules" print_error(reason, filename, lineno) return -1 for table in table_list: # Check if set exists. if (not set_exist(set_name, table, filename, lineno) or not set_name in set_all) and state == "ok": reason = "I cannot add an element to the set " + set_name + \ " since it does not exist." print_error(reason, filename, lineno) return -1 table_info = " " + table[0] + " " + table[1] + " " element = "" for e in set_element: if not element: element = e else: element = element + ", " + e set_text = set_name + " { " + element + " }" cmd = NFT_BIN + " add element" + table_info + set_text ret = execute_cmd(cmd, filename, lineno) if (state == "fail" and ret == 0) or (state == "ok" and ret != 0): test_state = "This rule should have failed." reason = cmd + ": " + test_state print_error(reason, filename, lineno) return -1 # Add element into a all_set. if (ret == 0 and state == "ok"): for e in set_element: set_all[set_name].add(e) return 0 def set_delete_elements(set_element, set_name, table, filename=None, lineno=None): ''' Deletes elements in a set. ''' table_info = " " + table[0] + " " + table[1] + " " for element in set_element: set_text = set_name + " {" + element + "}" cmd = NFT_BIN + " delete element" + table_info + set_text ret = execute_cmd(cmd, filename, lineno) if ret != 0: reason = "I cannot delete an element" + element + \ " from the set '" + set_name print_error(reason, filename, lineno) return -1 return 0 def set_delete(all_set, table, filename=None, lineno=None): ''' Deletes set and its content. ''' for set_name in all_set.keys(): # Check if exists the set if not set_exist(set_name, table, filename, lineno): reason = "The set " + set_name + \ " does not exist, I cannot delete it" print_error(reason, filename, lineno) return -1 # We delete all elements in the set set_delete_elements(all_set[set_name], set_name, table, filename, lineno) # We delete the set. table_info = " " + table[0] + " " + table[1] + " " cmd = NFT_BIN + " delete set " + table_info + " " + set_name ret = execute_cmd(cmd, filename, lineno) # Check if the set still exists after I deleted it. if ret != 0 or set_exist(set_name, table, filename, lineno): reason = "Cannot remove the set " + set_name print_error(reason, filename, lineno) return -1 return 0 def set_exist(set_name, table, filename, lineno): ''' Check if the set exists. ''' table_info = " " + table[0] + " " + table[1] + " " cmd = NFT_BIN + " list -nnn set" + table_info + set_name ret = execute_cmd(cmd, filename, lineno) return True if (ret == 0) else False def set_check_element(rule1, rule2): ''' Check if element exists in anonymous sets. ''' ret = -1 pos1 = rule1.find("{") pos2 = rule2.find("{") end1 = rule1.find("}") end2 = rule2.find("}") if ((pos1 != -1) and (pos2 != -1) and (end1 != -1) and (end2 != -1)): list1 = (rule1[pos1 + 1:end1].replace(" ", "")).split(",") list2 = (rule2[pos2 + 1:end2].replace(" ", "")).split(",") list1.sort() list2.sort() if (cmp(list1, list2) == 0): ret = 0 return ret def output_clean(pre_output, chain): pos_chain = pre_output[0].find(chain) if pos_chain == -1: return "" output_intermediate = pre_output[0][pos_chain:] brace_start = output_intermediate.find("{") brace_end = output_intermediate.find("}") pre_rule = output_intermediate[brace_start:brace_end] if pre_rule[1:].find("{") > -1: # this rule has a set. set = pre_rule[1:].replace("\t", "").replace("\n", "").strip() set = set.split(";")[2].strip() + "}" return set else: rule = pre_rule.split(";")[2].replace("\t", "").replace("\n", "").strip() if len(rule) < 0: return "" return rule def payload_check_elems_to_set(elems): newset = set() for n, line in enumerate(elems.split('[end]')): e = line.strip() if e in newset: print_error("duplicate", e, n) return newset newset.add(e) return newset def payload_check_set_elems(want, got): if want.find('element') < 0 or want.find('[end]') < 0: return 0 if got.find('element') < 0 or got.find('[end]') < 0: return 0 set_want = payload_check_elems_to_set(want) set_got = payload_check_elems_to_set(got) return set_want == set_got def payload_check(payload_buffer, file, cmd): file.seek(0, 0) ret = False i = 0 for lineno, want_line in enumerate(payload_buffer): line = file.readline() if want_line == line: i += 1 continue if want_line.find('[') < 0 and line.find('[') < 0: continue if want_line.find(']') < 0 and line.find(']') < 0: continue if payload_check_set_elems(want_line, line): continue print_differences_warning(file.name, lineno, want_line.strip(), line.strip(), cmd); return 0 return i > 0 def rule_add(rule, table_list, chain_list, filename, lineno, force_all_family_option, filename_path): ''' Adds a rule ''' # TODO Check if a rule is added correctly. ret = warning = error = unit_tests = 0 if not table_list or not chain_list: reason = "Missing table or chain to add rule." print_error(reason, filename, lineno) return [-1, warning, error, unit_tests] payload_expected = [] for table in table_list: try: payload_log = open("%s.payload.%s" % (filename_path, table[0])) except (IOError): payload_log = open("%s.payload" % filename_path) if rule[1].strip() == "ok": try: payload_expected.index(rule[0]) except (ValueError): payload_expected = payload_find_expected(payload_log, rule[0]) if payload_expected == []: print_error("did not find payload information for rule '%s'" % rule[0], payload_log.name, 1) for chain in chain_list: if len(rule) == 1: reason = "Skipping malformed test. (" + \ str(rule[0].rstrip('\n')) + ")" print_warning(reason, filename, lineno) continue unit_tests += 1 table_flush(table, filename, lineno) table_info = " " + table[0] + " " + table[1] + " " cmd = NFT_BIN + " add rule" + table_info + chain + " " + rule[0] payload_log = os.tmpfile(); cmd = NFT_BIN + " add rule --debug=netlink" + table_info + chain + " " + rule[0] ret = execute_cmd(cmd, filename, lineno, payload_log) state = rule[1].rstrip() if (ret == 0 and state == "fail") or (ret != 0 and state == "ok"): if state == "fail": test_state = "This rule should have failed." else: test_state = "This rule should not have failed." reason = cmd + ": " + test_state print_error(reason, filename, lineno) ret = -1 error += 1 if not force_all_family_option: return [ret, warning, error, unit_tests] if (state == "fail" and ret != 0): ret = 0 continue if ret == 0: # Check for matching payload if state == "ok" and not payload_check(payload_expected, payload_log, cmd): error += 1 gotf = open("%s.payload.got" % filename_path, 'a') payload_log.seek(0, 0) gotf.write("# %s\n" % rule[0]) while True: line = payload_log.readline() if line == "": break gotf.write(line) gotf.close() print_warning("Wrote payload for rule %s" % rule[0], gotf.name, 1) # Check output of nft process = subprocess.Popen([NFT_BIN, '-nnn', 'list', 'table'] + table, shell=False, stdout=subprocess.PIPE, preexec_fn=preexec) pre_output = process.communicate() output = pre_output[0].split(";") if len(output) < 2: reason = cmd + ": Listing is broken." print_error(reason, filename, lineno) ret = -1 error += 1 if not force_all_family_option: return [ret, warning, error, unit_tests] else: rule_output = output_clean(pre_output, chain) if (len(rule) == 3): teoric_exit = rule[2] else: teoric_exit = rule[0] if (rule_output.rstrip() != teoric_exit.rstrip()): if (rule[0].find("{") != -1): # anonymous sets if (set_check_element(teoric_exit, rule_output) != 0): warning += 1 print_differences_warning(filename, lineno, rule[0], rule_output, cmd) if not force_all_family_option: return [ret, warning, error, unit_tests] else: if len(rule_output) <= 0: error += 1 print_differences_error(filename, lineno, rule_output, cmd) if not force_all_family_option: return [ret, warning, error, unit_tests] warning += 1 print_differences_warning(filename, lineno, teoric_exit.rstrip(), rule_output, cmd) if not force_all_family_option: return [ret, warning, error, unit_tests] return [ret, warning, error, unit_tests] def preexec(): os.setpgrp() # Don't forward signals. def cleanup_on_exit(): for table in table_list: for chain in chain_list: ret = chain_delete(chain, table, "", "") if all_set: ret = set_delete(all_set, table) ret = table_delete(table) def signal_handler(signal, frame): global signal_received signal_received = 1 def execute_cmd(cmd, filename, lineno, stdout_log = False): ''' Executes a command, checks for segfaults and returns the command exit code. :param cmd: string with the command to be executed :param filename: name of the file tested (used for print_error purposes) :param lineno: line number being tested (used for print_error purposes) ''' global log_file print >> log_file, "command: %s" % cmd if debug_option: print cmd if not stdout_log: stdout_log = log_file ret = subprocess.call(cmd, shell=True, universal_newlines=True, stderr=log_file, stdout=stdout_log, preexec_fn=preexec) log_file.flush() if ret == -11: reason = "command segfaults: " + cmd print_error(reason, filename, lineno) return ret def print_result(filename, tests, warning, error): return str(filename) + ": " + str(tests) + " unit tests, " + \ str(error) + " error, " + str(warning) + " warning" def print_result_all(filename, tests, warning, error, unit_tests): return str(filename) + ": " + str(tests) + " unit tests, " +\ str(unit_tests) + " total test executed, " + \ str(error) + " error, " + \ str(warning) + " warning" def table_process(table_line, filename, lineno): if ";" in table_line: table_info = table_line.split(";") else: table_info.append("ip") table_info.append(table_line) return table_create(table_info, filename, lineno) def chain_process(chain_line, filename, lineno): chain_name = chain_line[0] chain_type = "" for table in table_list: if len(chain_line) > 1: chain_type = chain_line[1] ret = chain_create(chain_name, chain_type, chain_list, table, filename, lineno) if ret != 0: return -1 return ret def set_process(set_line, filename, lineno): set_info = [] set_name = "".join(set_line[0].rstrip()[1:]) set_info.append(set_name) set_type = set_line[1].split(";")[0] set_state = set_line[1].split(";")[1] # ok or fail set_info.append(set_type) set_info.append(set_state) ret = set_add(set_info, table_list, filename, lineno) if ret == 0: all_set[set_name] = set() return ret def set_element_process(element_line, filename, lineno): rule_state = element_line[1] set_name = element_line[0].split(" ")[0] set_element = element_line[0].split(" ") set_element.remove(set_name) return set_add_elements(set_element, set_name, all_set, rule_state, table_list, filename, lineno) def payload_find_expected(payload_log, rule): ''' Find the netlink payload that should be generated by given rule in payload_log :param payload_log: open file handle of the payload data :param rule: nft rule we are going to add ''' found = 0 pos = 0 payload_buffer = [] while True: line = payload_log.readline() if not line: break if line[0] == "#": # rule start rule_line = line.strip()[2:] if rule_line == rule.strip(): found = 1 continue if found == 1: payload_buffer.append(line) if line.isspace(): return payload_buffer payload_log.seek(0, 0) return payload_buffer def run_test_file(filename, force_all_family_option, specific_file): ''' Runs a test file :param filename: name of the file with the test rules ''' if specific_file: filename_path = os.path.join(TERMINAL_PATH, filename) else: filename_path = os.path.join(TESTS_PATH, filename) f = open(filename_path) tests = passed = total_unit_run = total_warning = total_error = 0 table = "" total_test_passed = True for lineno, line in enumerate(f): if signal_received == 1: print "\nSignal received. Cleaning up and Exitting..." cleanup_on_exit() sys.exit(0) if line.isspace(): continue if line[0] == "#": # Command-line continue if line[0] == '*': # Table table_line = line.rstrip()[1:] ret = table_process(table_line, filename, lineno) if (ret != 0): total_test_passed = False break continue if line[0] == ":": # Chain chain_line = line.rstrip()[1:].split(";") ret = chain_process(chain_line, filename, lineno) if ret != 0: total_test_passed = False break continue if line[0] == "!": # Adds this set set_line = line.rstrip()[0:].split(" ") ret = set_process(set_line, filename, lineno) tests += 1 if ret == -1: total_test_passed = False continue passed += 1 continue if line[0] == "?": # Adds elements in a set element_line = line.rstrip()[1:].split(";") ret = set_element_process(element_line, filename, lineno) tests += 1 if ret == -1: total_test_passed = False continue passed += 1 continue # Rule rule = line.split(';') # rule[1] Ok or FAIL if line[0] == "-": # Run omitted lines if need_fix_option: rule[0] = rule[0].rstrip()[1:].strip() else: continue elif need_fix_option: continue result = rule_add(rule, table_list, chain_list, filename, lineno, force_all_family_option, filename_path) tests += 1 ret = result[0] warning = result[1] total_warning += warning total_error += result[2] total_unit_run += result[3] if ret != 0: total_test_passed = False continue if warning == 0: # All ok. passed += 1 # Delete rules, sets, chains and tables for table in table_list: # We delete chains for chain in chain_list: ret = chain_delete(chain, table, filename, lineno) if ret != 0: total_test_passed = False # We delete sets. if all_set: ret = set_delete(all_set, table, filename, lineno) if ret != 0: total_test_passed = False reason = "There is a problem when we delete a set" print_error(reason, filename, lineno) # We delete tables. ret = table_delete(table, filename, lineno) if ret != 0: total_test_passed = False if specific_file: if force_all_family_option: print print_result_all(filename, tests, total_warning, total_error, total_unit_run) else: print print_result(filename, tests, total_warning, total_error) else: if (tests == passed and tests > 0): print filename + ": " + Colors.GREEN + "OK" + Colors.ENDC f.close() del table_list[:] del chain_list[:] all_set.clear() return [tests, passed, total_warning, total_error, total_unit_run] def main(): parser = argparse.ArgumentParser(description='Run nft tests', version='1.0') parser.add_argument('filename', nargs='?', metavar='path/to/file.t', help='Run only this test') parser.add_argument('-d', '--debug', action='store_true', dest='debug', help='enable debugging mode') parser.add_argument('-e', '--need-fix', action='store_true', dest='need_fix_line', help='run rules that need a fix') parser.add_argument('-f', '--force-family', action='store_true', dest='force_all_family', help='keep testing all families on error') args = parser.parse_args() global debug_option, need_fix_option debug_option = args.debug need_fix_option = args.need_fix_line force_all_family_option = args.force_all_family specific_file = False signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) if os.getuid() != 0: print "You need to be root to run this, sorry" return # Change working directory to repository root os.chdir(TESTS_PATH + "/../..") if not os.path.isfile(NFT_BIN): print "The nft binary does not exist. You need to build the project." return test_files = files_ok = run_total = 0 tests = passed = warnings = errors = 0 global log_file try: log_file = open(LOGFILE, 'w') except IOError: print "Cannot open log file %s" % LOGFILE return file_list = [] if args.filename: file_list = [args.filename] specific_file = True else: for directory in TESTS_DIRECTORY: path = os.path.join(TESTS_PATH, directory) for root, dirs, files in os.walk(path): for f in files: if f.endswith(".t"): file_list.append(os.path.join(directory, f)) for filename in file_list: result = run_test_file(filename, force_all_family_option, specific_file) file_tests = result[0] file_passed = result[1] file_warnings = result[2] file_errors = result[3] file_unit_run = result[4] test_files += 1 if file_warnings == 0 and file_tests == file_passed: files_ok += 1 if file_tests: tests += file_tests passed += file_passed errors += file_errors warnings += file_warnings if force_all_family_option: run_total += file_unit_run if test_files == 0: print "No test files to run" else: if not specific_file: if force_all_family_option: print ("%d test files, %d files passed, %d unit tests, %d total executed, %d error, %d warning" % (test_files, files_ok, tests, run_total, errors, warnings)) else: print ("%d test files, %d files passed, %d unit tests, %d error, %d warning" % (test_files, files_ok, tests, errors, warnings)) if __name__ == '__main__': main()