diff options
Diffstat (limited to 'py')
-rw-r--r-- | py/.gitignore | 1 | ||||
-rw-r--r-- | py/nftables.py | 224 |
2 files changed, 225 insertions, 0 deletions
diff --git a/py/.gitignore b/py/.gitignore new file mode 100644 index 00000000..0d20b648 --- /dev/null +++ b/py/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/py/nftables.py b/py/nftables.py new file mode 100644 index 00000000..c1759750 --- /dev/null +++ b/py/nftables.py @@ -0,0 +1,224 @@ +import json +from ctypes import * +import sys + +class Nftables: + """A class representing libnftables interface""" + + debug_flags = { + "scanner": 0x1, + "parser": 0x2, + "eval": 0x4, + "netlink": 0x8, + "mnl": 0x10, + "proto-ctx": 0x20, + "segtree": 0x40, + } + + numeric_levels = { + "none": 0, + "addr": 1, + "port": 2, + "all": 3, + } + + def __init__(self, sofile="libnftables.so"): + """Instantiate a new Nftables class object. + + Accepts a shared object file to open, by default standard search path + is searched for a file named 'libnftables.so'. + + After loading the library using ctypes module, a new nftables context + is requested from the library and buffering of output and error streams + is turned on. + """ + lib = cdll.LoadLibrary(sofile) + + ### API function definitions + + self.nft_ctx_new = lib.nft_ctx_new + self.nft_ctx_new.restype = c_void_p + self.nft_ctx_new.argtypes = [c_int] + + self.nft_ctx_output_get_handle = lib.nft_ctx_output_get_handle + self.nft_ctx_output_get_handle.restype = c_bool + self.nft_ctx_output_get_handle.argtypes = [c_void_p] + + self.nft_ctx_output_set_handle = lib.nft_ctx_output_set_handle + self.nft_ctx_output_set_handle.argtypes = [c_void_p, c_bool] + + self.nft_ctx_output_get_numeric = lib.nft_ctx_output_get_numeric + self.nft_ctx_output_get_numeric.restype = c_int + self.nft_ctx_output_get_numeric.argtypes = [c_void_p] + + self.nft_ctx_output_set_numeric = lib.nft_ctx_output_set_numeric + self.nft_ctx_output_set_numeric.argtypes = [c_void_p, c_int] + + self.nft_ctx_output_get_stateless = lib.nft_ctx_output_get_stateless + self.nft_ctx_output_get_stateless.restype = c_bool + self.nft_ctx_output_get_stateless.argtypes = [c_void_p] + + self.nft_ctx_output_set_stateless = lib.nft_ctx_output_set_stateless + self.nft_ctx_output_set_stateless.argtypes = [c_void_p, c_bool] + + self.nft_ctx_output_get_debug = lib.nft_ctx_output_get_debug + self.nft_ctx_output_get_debug.restype = c_int + self.nft_ctx_output_get_debug.argtypes = [c_void_p] + + self.nft_ctx_output_set_debug = lib.nft_ctx_output_set_debug + self.nft_ctx_output_set_debug.argtypes = [c_void_p, c_int] + + self.nft_ctx_buffer_output = lib.nft_ctx_buffer_output + self.nft_ctx_buffer_output.restype = c_int + self.nft_ctx_buffer_output.argtypes = [c_void_p] + + self.nft_ctx_get_output_buffer = lib.nft_ctx_get_output_buffer + self.nft_ctx_get_output_buffer.restype = c_char_p + self.nft_ctx_get_output_buffer.argtypes = [c_void_p] + + self.nft_ctx_buffer_error = lib.nft_ctx_buffer_error + self.nft_ctx_buffer_error.restype = c_int + self.nft_ctx_buffer_error.argtypes = [c_void_p] + + self.nft_ctx_get_error_buffer = lib.nft_ctx_get_error_buffer + self.nft_ctx_get_error_buffer.restype = c_char_p + self.nft_ctx_get_error_buffer.argtypes = [c_void_p] + + self.nft_run_cmd_from_buffer = lib.nft_run_cmd_from_buffer + self.nft_run_cmd_from_buffer.restype = c_int + self.nft_run_cmd_from_buffer.argtypes = [c_void_p, c_char_p, c_int] + + self.nft_ctx_free = lib.nft_ctx_free + lib.nft_ctx_free.argtypes = [c_void_p] + + # initialize libnftables context + self.__ctx = self.nft_ctx_new(0) + self.nft_ctx_buffer_output(self.__ctx) + self.nft_ctx_buffer_error(self.__ctx) + + def get_handle_output(self): + """Get the current state of handle output. + + Returns a boolean indicating whether handle output is active or not. + """ + return self.nft_ctx_output_get_handle(self.__ctx) + + def set_handle_output(self, val): + """Enable or disable handle output. + + Accepts a boolean turning handle output on or off. + + Returns the previous value. + """ + old = self.get_handle_output() + self.nft_ctx_output_set_handle(self.__ctx, val) + return old + + def get_numeric_output(self): + """Get the current state of numeric output. + + Returns a boolean indicating whether boolean output is active or not. + """ + return self.nft_ctx_output_get_numeric(self.__ctx) + + def set_numeric_output(self, val): + """Enable or disable numeric output. + + Accepts a boolean turning numeric output on or off. + + Returns the previous value. + """ + old = self.get_numeric_output() + + if type(val) is str: + val = self.numeric_levels[val] + self.nft_ctx_output_set_numeric(self.__ctx, val) + + return old + + def get_stateless_output(self): + """Get the current state of stateless output. + + Returns a boolean indicating whether stateless output is active or not. + """ + return self.nft_ctx_output_get_stateless(self.__ctx) + + def set_stateless_output(self, val): + """Enable or Disable stateless output. + + Accepts a boolean turning stateless output either on or off. + + Returns the previous value. + """ + old = self.get_stateless_output() + self.nft_ctx_output_set_stateless(self.__ctx, val) + return old + + def get_debug(self): + """Get currently active debug flags. + + Returns a set of flag names. See set_debug() for details. + """ + val = self.nft_ctx_output_get_debug(self.__ctx) + + names = [] + for n,v in self.debug_flags.items(): + if val & v: + names.append(n) + val &= ~v + if val: + names.append(val) + + return names + + def set_debug(self, values): + """Set debug output flags. + + Accepts either a single flag or a set of flags. Each flag might be + given either as string or integer value as shown in the following + table: + + Name | Value (hex) + ----------------------- + scanner | 0x1 + parser | 0x2 + eval | 0x4 + netlink | 0x8 + mnl | 0x10 + proto-ctx | 0x20 + segtree | 0x40 + + Returns a set of previously active debug flags, as returned by + get_debug() method. + """ + old = self.get_debug() + + if type(values) in [str, int]: + values = [values] + + val = 0 + for v in values: + if type(v) is str: + v = self.debug_flags[v] + val |= v + + self.nft_ctx_output_set_debug(self.__ctx, val) + + return old + + def cmd(self, cmdline): + """Run a simple nftables command via libnftables. + + Accepts a string containing an nftables command just like what one + would enter into an interactive nftables (nft -i) session. + + Returns a tuple (rc, output, error): + rc -- return code as returned by nft_run_cmd_from_buffer() fuction + output -- a string containing output written to stdout + error -- a string containing output written to stderr + """ + rc = self.nft_run_cmd_from_buffer(self.__ctx, cmdline, len(cmdline)) + output = self.nft_ctx_get_output_buffer(self.__ctx) + error = self.nft_ctx_get_error_buffer(self.__ctx) + + return (rc, output, error) |