diff options
Diffstat (limited to 'output/pgsql/ulogd_output_PGSQL.c')
-rw-r--r-- | output/pgsql/ulogd_output_PGSQL.c | 435 |
1 files changed, 435 insertions, 0 deletions
diff --git a/output/pgsql/ulogd_output_PGSQL.c b/output/pgsql/ulogd_output_PGSQL.c new file mode 100644 index 0000000..70e07df --- /dev/null +++ b/output/pgsql/ulogd_output_PGSQL.c @@ -0,0 +1,435 @@ +/* ulogd_PGSQL.c, Version $Revision$ + * + * ulogd output plugin for logging to a PGSQL database + * + * (C) 2000-2005 by Harald Welte <laforge@gnumonks.org> + * This software is distributed under the terms of GNU GPL + * + * This plugin is based on the MySQL plugin made by Harald Welte. + * The support PostgreSQL were made by Jakab Laszlo. + * + */ + +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <arpa/inet.h> +#include <libpq-fe.h> +#include <ulogd/ulogd.h> +#include <ulogd/conffile.h> + +#include "../../utils/db.c" + +#ifdef DEBUG_PGSQL +#define DEBUGP(x, args...) fprintf(stderr, x, ## args) +#else +#define DEBUGP(x, args...) +#endif + +struct pgsql_instance { + struct db_instance db_inst; + + PGconn *dbh; + PGresult *pgres; + unsigned char pgsql_have_schemas; +} +#define TIME_ERR ((time_t)-1) + +/* our configuration directives */ +static struct config_keyset pgsql_kset = { + .num_ces = DB_CE_NUM + 6, + .ces = { + DB_CES, + { + .key = "db", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "host", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_NONE, + }, + { + .key = "user", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "pass", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_NONE, + }, + { + .next = &schema_ce, + .key = "port", + .type = CONFIG_TYPE_INT, + }, + { + .key = "schema", + .type = CONFIG_TYPE_STRING, + .u.string = "public", + }, + }, +}; +#define db_ce(x) (x->ces[DB_CE_NUM+0]) +#define host_ce(x) (x->ces[DB_CE_NUM+1]) +#define user_ce(x) (x->ces[DB_CE_NUM+2]) +#define pass_ce(x) (x->ces[DB_CE_NUM+3]) +#define port_ce(x) (x->ces[DB_CE_NUM+5]) +#define schema_ce(x) (x->ces[DB_CE_NUM+6]) + +#if 0 +/* our main output function, called by ulogd */ +static int pgsql_output(ulog_iret_t *result) +{ + struct _field *f; + ulog_iret_t *res; + PGresult *pgres; +#ifdef IP_AS_STRING + char *tmpstr; /* need this for --log-ip-as-string */ + struct in_addr addr; +#endif + + stmt_ins = stmt_val; + + for (f = fields; f; f = f->next) { + res = keyh_getres(f->id); + + if (!res) { + ulogd_log(ULOGD_NOTICE, + "no result for %s ?!?\n", f->name); + } + + if (!res || !IS_VALID((*res))) { + /* no result, we have to fake something */ + sprintf(stmt_ins, "NULL,"); + stmt_ins = stmt + strlen(stmt); + continue; + } + + switch (res->type) { + case ULOGD_RET_INT8: + sprintf(stmt_ins, "%d,", res->value.i8); + break; + case ULOGD_RET_INT16: + sprintf(stmt_ins, "%d,", res->value.i16); + break; + case ULOGD_RET_INT32: + sprintf(stmt_ins, "%d,", res->value.i32); + break; + case ULOGD_RET_INT64: + sprintf(stmt_ins, "%lld,", res->value.i64); + break; + case ULOGD_RET_UINT8: + sprintf(stmt_ins, "%u,", res->value.ui8); + break; + case ULOGD_RET_UINT16: + sprintf(stmt_ins, "%u,", res->value.ui16); + break; + case ULOGD_RET_IPADDR: +#ifdef IP_AS_STRING + *stmt_ins++ = '\''; + memset(&addr, 0, sizeof(addr)); + addr.s_addr = ntohl(res->value.ui32); + tmpstr = (char *)inet_ntoa(addr); + PQescapeString(stmt_ins,tmpstr,strlen(tmpstr)); + stmt_ins = stmt + strlen(stmt); + sprintf(stmt_ins, "',"); + break; +#endif /* IP_AS_STRING */ + /* EVIL: fallthrough when logging IP as + * u_int32_t */ + + case ULOGD_RET_UINT32: + sprintf(stmt_ins, "%u,", res->value.ui32); + break; + case ULOGD_RET_UINT64: + sprintf(stmt_ins, "%llu,", res->value.ui64); + break; + case ULOGD_RET_BOOL: + sprintf(stmt_ins, "'%d',", res->value.b); + break; + case ULOGD_RET_STRING: + *stmt_ins++ = '\''; + PQescapeString(stmt_ins,res->value.ptr,strlen(res->value.ptr)); + stmt_ins = stmt + strlen(stmt); + sprintf(stmt_ins, "',"); + break; + case ULOGD_RET_RAW: + ulogd_log(ULOGD_NOTICE,"%s: pgsql doesn't support type RAW\n",res->key); + sprintf(stmt_ins, "NULL,"); + break; + default: + ulogd_log(ULOGD_NOTICE, + "unknown type %d for %s\n", + res->type, res->key); + break; + } + stmt_ins = stmt + strlen(stmt); + } + *(stmt_ins - 1) = ')'; + DEBUGP("stmt=#%s#\n", stmt); + + /* now we have created our statement, insert it */ + /* Added code by Jaki */ + pgres = PQexec(dbh, stmt); + if(!pgres || PQresultStatus(pgres) != PGRES_COMMAND_OK) { + ulogd_log(ULOGD_ERROR, "sql error during insert: %s\n", + PQresultErrorMessage(pgres)); + return 1; + } + + return 0; +} +#endif + +#define PGSQL_HAVE_NAMESPACE_TEMPLATE "SELECT nspname FROM pg_namespace n WHERE n.nspname='%s'" + +/* Determine if server support schemas */ +static int pgsql_namespace(void) +{ + PGresult *result; + char pgbuf[strlen(PGSQL_HAVE_NAMESPACE_TEMPLATE)+strlen(schema_ce.u.string)+1]; + + if (!dbh) + return 1; + + sprintf(pgbuf, PGSQL_HAVE_NAMESPACE_TEMPLATE, schema_ce.u.string); + ulogd_log(ULOGD_DEBUG, "%s\n", pgbuf); + + result = PQexec(dbh, pgbuf); + if (!result) { + ulogd_log(ULOGD_DEBUG, "\n result false"); + return 1; + } + + if (PQresultStatus(result) == PGRES_TUPLES_OK) { + ulogd_log(ULOGD_DEBUG, "using schema %s\n", schema_ce.u.string); + pgsql_have_schemas = 1; + } else { + pgsql_have_schemas = 0; + } + + PQclear(result); + + return 0; +} + +#define PGSQL_GETCOLUMN_TEMPLATE "SELECT a.attname FROM pg_class c, pg_attribute a WHERE c.relname ='%s' AND a.attnum>0 AND a.attrelid=c.oid ORDER BY a.attnum" + +#define PGSQL_GETCOLUMN_TEMPLATE_SCHEMA "SELECT a.attname FROM pg_attribute a, pg_class c LEFT JOIN pg_namespace n ON c.relnamespace=n.oid WHERE c.relname ='%s' AND n.nspname='%s' AND a.attnum>0 AND a.attrelid=c.oid AND a.attisdropped=FALSE ORDER BY a.attnum" + +/* find out which columns the table has */ +static int get_columns_pgsql(struct ulogd_pluginstance *upi) +{ + struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; + PGresult *result; + char pgbuf[strlen(PGSQL_GETCOLUMN_TEMPLATE_SCHEMA) + + strlen(table) + strlen(schema_ce.u.string) + 2]; + int intaux; + + if (!pi->dbh) { + ulogd_log(ULOGD_ERROR, "no database handle\n"); + return 1; + } + + if (pgsql_have_schemas) { + snprintf(pgbuf, sizeof(pgbuf)-1, PGSQL_GETCOLUMN_TEMPLATE_SCHEMA, + table_ce(upi->config_kset).u.string, + schema_ce(upi->config_kset).u.string); + } else { + snprintf(pgbuf, sizeof(pgbuf)-1, PGSQL_GETCOLUMN_TEMPLATE, + table_ce(upi->config_kset).u.string); + } + + ulogd_log(ULOGD_DEBUG, "%s\n", pgbuf); + + result = PQexec(dbh, pgbuf); + if (!result) { + ulogd_log(ULOGD_DEBUG, "result false"); + return -1; + } + + if (PQresultStatus(result) != PGRES_TUPLES_OK) { + ulogd_log(ULOGD_DEBUG, "pres_command_not_ok"); + return -1; + } + + if (upi->input.keys) + free(upi->input.keys); + + upi->input.num_keys = PQntuples(result); + ulogd_log(ULOGD_DEBUG, "%u fields in table\n", upi->input.num_keys); + upi->input.keys = malloc(sizeof(struct ulogd_key) * + upi->input.num_keys); + if (!upi->input.keys) { + upi->input.num_keys = 0; + ulogd_log(ULOGD_ERROR, "ENOMEM\n"); + return -ENOMEM; + } + + memset(upi->input.keys, 0, sizeof(struct ulogd_key) * + upi->input.num_keys); + + for (intaux = 0; intaux < PQntuples(result); intaux++) { + char buf[ULOGD_MAX_KEYLEN+1]; + char *underscore; + int id; + + /* replace all underscores with dots */ + strncpy(buf, PQgetvalue(result, intaux, 0), ULOGD_MAX_KEYLEN); + while ((underscore = strchr(buf, '_'))) + *underscore = '.'; + + DEBUGP("field '%s' found: ", buf); + + /* add it to list of input keys */ + strncpy(upi->input.keys[i].name, buf, ULOGD_MAX_KEYLEN); + } + + /* FIXME: id? */ + + PQclear(result); + return 0; +} + +static int close_db_pgsql(struct ulogd_pluginstance *upi) +{ + struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; + + return PQfinish(pi->dbh); +} + +/* make connection and select database */ +static int open_db_pgsql(struct ulogd_pluginstance *upi) +{ + struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; + int len; + char *connstr; + char *server = host_ce(upi->config_kset).u.string; + char *port = port_ce(upi->config_kset).u.string; + char *user = user_ce(upi->config_kset).u.string; + char *pass = pass_ce(upi->config_kset).u.string; + char *db = db_ce(upi->config_kset).u.string; + + /* 80 is more than what we need for the fixed parts below */ + len = 80 + strlen(user) + strlen(db); + + /* hostname and and password are the only optionals */ + if (server) + len += strlen(server); + if (pass) + len += strlen(pass); + if (port) + len += 20; + + connstr = (char *) malloc(len); + if (!connstr) + return -ENOMEM; + + if (server) { + strcpy(connstr, " host="); + strcat(connstr, server); + } + + if (port) { + char portbuf[20]; + snprintf(portbuf, sizeof(portbuf), " port=%u", port); + strcat(connstr, portbuf); + } + + strcat(connstr, " dbname="); + strcat(connstr, db); + strcat(connstr, " user="); + strcat(connstr, user); + + if (pass) { + strcat(connstr, " password="); + strcat(connstr, pass); + } + + dbh = PQconnectdb(connstr); + if (PQstatus(dbh) != CONNECTION_OK) { + close_db(upi); + return -1; + } + + if (pgsql_namespace()) { + ulogd_log(ULOGD_ERROR, "unable to test for pgsql schemas\n"); + close_db(upi); + return -1; + } + + return 0; +} + +static int escape_string_pgsql(struct ulogd_pluginstance *upi, + char *dst, const char *src, unsigned int len) +{ + PQescapeString(dst, src, strlen(res->value.ptr)); + return 0; +} + +static int execute_pgsql(struct ulogd_pluginstance *upi, + const char *stmt, unsigned int len) +{ + struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; + + pi->pgres = PQexec(dbh, stmt); + if (!pi->pgres || PQresultStatus(pi->pgres) != PGRES_COMMAND_OK) + return -1; + + return 0; +} + +static char *strerror_pgsql(struct ulogd_pluginstance *upi) +{ + struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; + return PQresultErrorMessage(pi->pgres); +} + +static struct db_driver db_driver_pgsql = { + .get_columns = &get_columns_pgsql, + .open_db = &open_db_pgsql, + .close_db = &close_db_pgsql, + .escape_string = &escape_string_pgsql, + .execute = &execute_pgsql, + .strerror = &strerror_pgsql, +}; + +static int configure_pgsql(struct ulogd_pluginstance *upi, + struct ulogd_pluginstance_stack *stack) +{ + struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; + + di->driver = &db_driver_pgsql; + + return configure_db(upi, stack); +} + +static struct ulogd_plugin pgsql_plugin = { + .name = "PGSQL", + .input = { + .keys = NULL, + .num_keys = 0, + .type = ULOGD_DTYPE_PACKET | ULOGD_DTYPE_FLOW, + }, + .output = { + .type = ULOGD_DTYPE_SINK, + }, + .config_kset = &pgsql_kset, + .priv_size = sizeof(struct pgsql_instance), + .start = &start_pgsql, + .stop = &stop_db, + .signal = &signal_db, + .interp = &interp_pgsql, + .version = ULOGD_VERSION, +}; + +void __attribute__ ((constructor)) init(void); + +void _init(void) +{ + ulogd_register_plugin(&pgsql_plugin); +} |