diff options
Diffstat (limited to 'py/src')
-rw-r--r-- | py/src/__init__.py | 1 | ||||
-rw-r--r-- | py/src/nftables.py | 604 | ||||
-rw-r--r-- | py/src/schema.json | 16 |
3 files changed, 621 insertions, 0 deletions
diff --git a/py/src/__init__.py b/py/src/__init__.py new file mode 100644 index 00000000..7567f095 --- /dev/null +++ b/py/src/__init__.py @@ -0,0 +1 @@ +from .nftables import * diff --git a/py/src/nftables.py b/py/src/nftables.py new file mode 100644 index 00000000..f1e43ade --- /dev/null +++ b/py/src/nftables.py @@ -0,0 +1,604 @@ +#!/usr/bin/python +# Copyright(C) 2018 Phil Sutter <phil@nwl.cc> + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +import json +from ctypes import * +import sys +import os + +NFTABLES_VERSION = "0.1" + +class SchemaValidator: + """Libnftables JSON validator using jsonschema""" + + def __init__(self): + schema_path = os.path.join(os.path.dirname(__file__), "schema.json") + with open(schema_path, 'r') as schema_file: + self.schema = json.load(schema_file) + import jsonschema + self.jsonschema = jsonschema + + def validate(self, json): + self.jsonschema.validate(instance=json, schema=self.schema) + +class Nftables: + """A class representing libnftables interface""" + + input_flags = { + "no-dns": 0x1, + "json": 0x2, + } + + debug_flags = { + "scanner": 0x1, + "parser": 0x2, + "eval": 0x4, + "netlink": 0x8, + "mnl": 0x10, + "proto-ctx": 0x20, + "segtree": 0x40, + } + + output_flags = { + "reversedns": (1 << 0), + "service": (1 << 1), + "stateless": (1 << 2), + "handle": (1 << 3), + "json": (1 << 4), + "echo": (1 << 5), + "guid": (1 << 6), + "numeric_proto": (1 << 7), + "numeric_prio": (1 << 8), + "numeric_symbol": (1 << 9), + "numeric_time": (1 << 10), + "terse": (1 << 11), + } + + validator = None + + def __init__(self, sofile="libnftables.so.1"): + """Instantiate a new Nftables class object. + + Accepts a shared object file to open, by default standard search path + is searched for a file named 'libnftables.so'. + + After loading the library using ctypes module, a new nftables context + is requested from the library and buffering of output and error streams + is turned on. + """ + self.__ctx = None + + lib = cdll.LoadLibrary(sofile) + + ### API function definitions + + self.nft_ctx_new = lib.nft_ctx_new + self.nft_ctx_new.restype = c_void_p + self.nft_ctx_new.argtypes = [c_int] + + self.nft_ctx_input_get_flags = lib.nft_ctx_input_get_flags + self.nft_ctx_input_get_flags.restype = c_uint + self.nft_ctx_input_get_flags.argtypes = [c_void_p] + + self.nft_ctx_input_set_flags = lib.nft_ctx_input_set_flags + self.nft_ctx_input_set_flags.restype = c_uint + self.nft_ctx_input_set_flags.argtypes = [c_void_p, c_uint] + + self.nft_ctx_output_get_flags = lib.nft_ctx_output_get_flags + self.nft_ctx_output_get_flags.restype = c_uint + self.nft_ctx_output_get_flags.argtypes = [c_void_p] + + self.nft_ctx_output_set_flags = lib.nft_ctx_output_set_flags + self.nft_ctx_output_set_flags.argtypes = [c_void_p, c_uint] + + self.nft_ctx_output_get_debug = lib.nft_ctx_output_get_debug + self.nft_ctx_output_get_debug.restype = c_int + self.nft_ctx_output_get_debug.argtypes = [c_void_p] + + self.nft_ctx_output_set_debug = lib.nft_ctx_output_set_debug + self.nft_ctx_output_set_debug.argtypes = [c_void_p, c_int] + + self.nft_ctx_buffer_output = lib.nft_ctx_buffer_output + self.nft_ctx_buffer_output.restype = c_int + self.nft_ctx_buffer_output.argtypes = [c_void_p] + + self.nft_ctx_get_output_buffer = lib.nft_ctx_get_output_buffer + self.nft_ctx_get_output_buffer.restype = c_char_p + self.nft_ctx_get_output_buffer.argtypes = [c_void_p] + + self.nft_ctx_buffer_error = lib.nft_ctx_buffer_error + self.nft_ctx_buffer_error.restype = c_int + self.nft_ctx_buffer_error.argtypes = [c_void_p] + + self.nft_ctx_get_error_buffer = lib.nft_ctx_get_error_buffer + self.nft_ctx_get_error_buffer.restype = c_char_p + self.nft_ctx_get_error_buffer.argtypes = [c_void_p] + + self.nft_run_cmd_from_buffer = lib.nft_run_cmd_from_buffer + self.nft_run_cmd_from_buffer.restype = c_int + self.nft_run_cmd_from_buffer.argtypes = [c_void_p, c_char_p] + + self.nft_run_cmd_from_filename = lib.nft_run_cmd_from_filename + self.nft_run_cmd_from_filename.restype = c_int + self.nft_run_cmd_from_filename.argtypes = [c_void_p, c_char_p] + + self.nft_ctx_add_include_path = lib.nft_ctx_add_include_path + self.nft_ctx_add_include_path.restype = c_int + self.nft_ctx_add_include_path.argtypes = [c_void_p, c_char_p] + + self.nft_ctx_clear_include_paths = lib.nft_ctx_clear_include_paths + self.nft_ctx_clear_include_paths.argtypes = [c_void_p] + + self.nft_ctx_get_dry_run = lib.nft_ctx_get_dry_run + self.nft_ctx_get_dry_run.restype = c_bool + self.nft_ctx_get_dry_run.argtypes = [c_void_p] + + self.nft_ctx_set_dry_run = lib.nft_ctx_set_dry_run + self.nft_ctx_set_dry_run.argtypes = [c_void_p, c_bool] + + self.nft_ctx_add_var = lib.nft_ctx_add_var + self.nft_ctx_add_var.restype = c_int + self.nft_ctx_add_var.argtypes = [c_void_p, c_char_p] + + self.nft_ctx_clear_vars = lib.nft_ctx_clear_vars + self.nft_ctx_clear_vars.argtypes = [c_void_p] + + self.nft_ctx_free = lib.nft_ctx_free + lib.nft_ctx_free.argtypes = [c_void_p] + + # initialize libnftables context + self.__ctx = self.nft_ctx_new(0) + self.nft_ctx_buffer_output(self.__ctx) + self.nft_ctx_buffer_error(self.__ctx) + + def __del__(self): + if self.__ctx is not None: + self.nft_ctx_free(self.__ctx) + self.__ctx = None + + def _flags_from_numeric(self, flags_dict, val): + names = [] + for n, v in flags_dict.items(): + if val & v: + names.append(n) + val &= ~v + if val: + names.append(val) + return names + + def _flags_to_numeric(self, flags_dict, values): + if isinstance(values, (str, int)): + values = (values,) + + val = 0 + for v in values: + if isinstance(v, str): + v = flags_dict.get(v) + if v is None: + raise ValueError("Invalid argument") + elif isinstance(v, int): + if v < 0 or v > 0xFFFFFFFF: + raise ValueError("Invalid argument") + else: + raise TypeError("Not a valid flag") + val |= v + + return val + + def get_input_flags(self): + """Get currently active input flags. + + Returns a set of flag names. See set_input_flags() for details. + """ + val = self.nft_ctx_input_get_flags(self.__ctx) + return self._flags_from_numeric(self.input_flags, val) + + def set_input_flags(self, values): + """Set input flags. + + Resets all input flags to values. Accepts either a single flag or a list + of flags. Each flag might be given either as string or integer value as + shown in the following table: + + Name | Value (hex) + ----------------------- + "no-dns" | 0x1 + "json" | 0x2 + + "no-dns" disables blocking address lookup. + "json" enables JSON mode for input. + + Returns a set of previously active input flags, as returned by + get_input_flags() method. + """ + val = self._flags_to_numeric(self.input_flags, values) + old = self.nft_ctx_input_set_flags(self.__ctx, val) + return self._flags_from_numeric(self.input_flags, old) + + def __get_output_flag(self, name): + flag = self.output_flags[name] + return (self.nft_ctx_output_get_flags(self.__ctx) & flag) != 0 + + def __set_output_flag(self, name, val): + flag = self.output_flags[name] + flags = self.nft_ctx_output_get_flags(self.__ctx) + if val: + new_flags = flags | flag + else: + new_flags = flags & ~flag + self.nft_ctx_output_set_flags(self.__ctx, new_flags) + return (flags & flag) != 0 + + def get_reversedns_output(self): + """Get the current state of reverse DNS output. + + Returns a boolean indicating whether reverse DNS lookups are performed + for IP addresses in output. + """ + return self.__get_output_flag("reversedns") + + def set_reversedns_output(self, val): + """Enable or disable reverse DNS output. + + Accepts a boolean turning reverse DNS lookups in output on or off. + + Returns the previous value. + """ + return self.__set_output_flag("reversedns", val) + + def get_service_output(self): + """Get the current state of service name output. + + Returns a boolean indicating whether service names are used for port + numbers in output or not. + """ + return self.__get_output_flag("service") + + def set_service_output(self, val): + """Enable or disable service name output. + + Accepts a boolean turning service names for port numbers in output on + or off. + + Returns the previous value. + """ + return self.__set_output_flag("service", val) + + def get_stateless_output(self): + """Get the current state of stateless output. + + Returns a boolean indicating whether stateless output is active or not. + """ + return self.__get_output_flag("stateless") + + def set_stateless_output(self, val): + """Enable or disable stateless output. + + Accepts a boolean turning stateless output either on or off. + + Returns the previous value. + """ + return self.__set_output_flag("stateless", val) + + def get_handle_output(self): + """Get the current state of handle output. + + Returns a boolean indicating whether handle output is active or not. + """ + return self.__get_output_flag("handle") + + def set_handle_output(self, val): + """Enable or disable handle output. + + Accepts a boolean turning handle output on or off. + + Returns the previous value. + """ + return self.__set_output_flag("handle", val) + + def get_json_output(self): + """Get the current state of JSON output. + + Returns a boolean indicating whether JSON output is active or not. + """ + return self.__get_output_flag("json") + + def set_json_output(self, val): + """Enable or disable JSON output. + + Accepts a boolean turning JSON output either on or off. + + Returns the previous value. + """ + return self.__set_output_flag("json", val) + + def get_echo_output(self): + """Get the current state of echo output. + + Returns a boolean indicating whether echo output is active or not. + """ + return self.__get_output_flag("echo") + + def set_echo_output(self, val): + """Enable or disable echo output. + + Accepts a boolean turning echo output on or off. + + Returns the previous value. + """ + return self.__set_output_flag("echo", val) + + def get_guid_output(self): + """Get the current state of GID/UID output. + + Returns a boolean indicating whether names for group/user IDs are used + in output or not. + """ + return self.__get_output_flag("guid") + + def set_guid_output(self, val): + """Enable or disable GID/UID output. + + Accepts a boolean turning names for group/user IDs on or off. + + Returns the previous value. + """ + return self.__set_output_flag("guid", val) + + def get_numeric_proto_output(self): + """Get current status of numeric protocol output flag. + + Returns a boolean value indicating the status. + """ + return self.__get_output_flag("numeric_proto") + + def set_numeric_proto_output(self, val): + """Set numeric protocol output flag. + + Accepts a boolean turning numeric protocol output either on or off. + + Returns the previous value. + """ + return self.__set_output_flag("numeric_proto", val) + + def get_numeric_prio_output(self): + """Get current status of numeric chain priority output flag. + + Returns a boolean value indicating the status. + """ + return self.__get_output_flag("numeric_prio") + + def set_numeric_prio_output(self, val): + """Set numeric chain priority output flag. + + Accepts a boolean turning numeric chain priority output either on or + off. + + Returns the previous value. + """ + return self.__set_output_flag("numeric_prio", val) + + def get_numeric_symbol_output(self): + """Get current status of numeric symbols output flag. + + Returns a boolean value indicating the status. + """ + return self.__get_output_flag("numeric_symbol") + + def set_numeric_symbol_output(self, val): + """Set numeric symbols output flag. + + Accepts a boolean turning numeric representation of symbolic constants + in output either on or off. + + Returns the previous value. + """ + return self.__set_output_flag("numeric_symbol", val) + + def get_numeric_time_output(self): + """Get current status of numeric times output flag. + + Returns a boolean value indicating the status. + """ + return self.__get_output_flag("numeric_time") + + def set_numeric_time_output(self, val): + """Set numeric times output flag. + + Accepts a boolean turning numeric representation of time values + in output either on or off. + + Returns the previous value. + """ + return self.__set_output_flag("numeric_time", val) + + def get_terse_output(self): + """Get the current state of terse output. + + Returns a boolean indicating whether terse output is active or not. + """ + return self.__get_output_flag("terse") + + def set_terse_output(self, val): + """Enable or disable terse output. + + Accepts a boolean turning terse output either on or off. + + Returns the previous value. + """ + return self.__set_output_flag("terse", val) + + def get_debug(self): + """Get currently active debug flags. + + Returns a set of flag names. See set_debug() for details. + """ + val = self.nft_ctx_output_get_debug(self.__ctx) + return self._flags_from_numeric(self.debug_flags, val) + + def set_debug(self, values): + """Set debug output flags. + + Accepts either a single flag or a set of flags. Each flag might be + given either as string or integer value as shown in the following + table: + + Name | Value (hex) + ----------------------- + scanner | 0x1 + parser | 0x2 + eval | 0x4 + netlink | 0x8 + mnl | 0x10 + proto-ctx | 0x20 + segtree | 0x40 + + Returns a set of previously active debug flags, as returned by + get_debug() method. + """ + val = self._flags_to_numeric(self.debug_flags, values) + old = self.get_debug() + self.nft_ctx_output_set_debug(self.__ctx, val) + return old + + def cmd(self, cmdline): + """Run a simple nftables command via libnftables. + + Accepts a string containing an nftables command just like what one + would enter into an interactive nftables (nft -i) session. + + Returns a tuple (rc, output, error): + rc -- return code as returned by nft_run_cmd_from_buffer() fuction + output -- a string containing output written to stdout + error -- a string containing output written to stderr + """ + cmdline_is_unicode = False + if not isinstance(cmdline, bytes): + cmdline_is_unicode = True + cmdline = cmdline.encode("utf-8") + rc = self.nft_run_cmd_from_buffer(self.__ctx, cmdline) + output = self.nft_ctx_get_output_buffer(self.__ctx) + error = self.nft_ctx_get_error_buffer(self.__ctx) + if cmdline_is_unicode: + output = output.decode("utf-8") + error = error.decode("utf-8") + + return (rc, output, error) + + def json_cmd(self, json_root): + """Run an nftables command in JSON syntax via libnftables. + + Accepts a hash object as input. + + Returns a tuple (rc, output, error): + rc -- return code as returned by nft_run_cmd_from_buffer() function + output -- a hash object containing library standard output + error -- a string containing output written to stderr + """ + json_out_old = self.set_json_output(True) + rc, output, error = self.cmd(json.dumps(json_root)) + if not json_out_old: + self.set_json_output(json_out_old) + if len(output): + output = json.loads(output) + return (rc, output, error) + + def json_validate(self, json_root): + """Validate JSON object against libnftables schema. + + Accepts a hash object as input. + + Returns True if JSON is valid, raises an exception otherwise. + """ + if not self.validator: + self.validator = SchemaValidator() + + self.validator.validate(json_root) + return True + + def cmd_from_file(self, filename): + """Run a nftables command set from a file + + filename can be a str or a Path + + Returns a tuple (rc, output, error): + rc -- return code as returned by nft_run_cmd_from_filename() function + output -- a string containing output written to stdout + error -- a string containing output written to stderr + """ + filename_is_unicode = False + if not isinstance(filename, bytes): + filename_is_unicode = True + filename = str(filename) + filename= filename.encode("utf-8") + rc = self.nft_run_cmd_from_filename(self.__ctx, filename) + output = self.nft_ctx_get_output_buffer(self.__ctx) + error = self.nft_ctx_get_error_buffer(self.__ctx) + if filename_is_unicode: + output = output.decode("utf-8") + error = error.decode("utf-8") + return (rc, output, error) + + def add_include_path(self, filename): + """Add a path to the include file list + The default list includes the built-in default one + + Returns True on success, False if memory allocation fails + """ + if not isinstance(filename, bytes): + filename = str(filename) + filename= filename.encode("utf-8") + rc = self.nft_ctx_add_include_path(self.__ctx, filename) + return rc == 0 + + def clear_include_paths(self): + """Clear include path list + + Will also remove the built-in default one + """ + self.nft_ctx_clear_include_paths(self.__ctx) + + def get_dry_run(self): + """Get dry run state + + Returns True if set, False otherwise + """ + return self.nft_ctx_get_dry_run(self.__ctx) + + def set_dry_run(self, onoff): + """ Set dry run state + + Returns the previous dry run state + """ + old = self.get_dry_run() + self.nft_ctx_set_dry_run(self.__ctx, onoff) + + return old + + def add_var(self, var): + """Add a variable to the variable list + + Returns True if added, False otherwise + """ + if not isinstance(var, bytes): + var = var.encode("utf-8") + rc = self.nft_ctx_add_var(self.__ctx, var) + return rc == 0 + + def clear_vars(self): + """Clear variable list + """ + self.nft_ctx_clear_vars(self.__ctx) diff --git a/py/src/schema.json b/py/src/schema.json new file mode 100644 index 00000000..460e2156 --- /dev/null +++ b/py/src/schema.json @@ -0,0 +1,16 @@ +{ + "$schema": "http://json-schema.org/schema#", + "description": "libnftables JSON API schema", + + "type": "object", + "properties": { + "nftables": { + "type": "array", + "minitems": 0, + "items": { + "type": "object" + } + } + }, + "required": [ "nftables" ] +} |