/* ulogd_PGSQL.c, Version $Revision$ * * ulogd output plugin for logging to a PGSQL database * * (C) 2000-2005 by Harald Welte * This software is distributed under the terms of GNU GPL * * This plugin is based on the MySQL plugin made by Harald Welte. * The support PostgreSQL were made by Jakab Laszlo. * */ #include #include #include #include #include #include #include #include "../../utils/db.c" #ifdef DEBUG_PGSQL #define DEBUGP(x, args...) fprintf(stderr, x, ## args) #else #define DEBUGP(x, args...) #endif struct pgsql_instance { struct db_instance db_inst; PGconn *dbh; PGresult *pgres; unsigned char pgsql_have_schemas; } #define TIME_ERR ((time_t)-1) /* our configuration directives */ static struct config_keyset pgsql_kset = { .num_ces = DB_CE_NUM + 6, .ces = { DB_CES, { .key = "db", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, }, { .key = "host", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, }, { .key = "user", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, }, { .key = "pass", .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_NONE, }, { .next = &schema_ce, .key = "port", .type = CONFIG_TYPE_INT, }, { .key = "schema", .type = CONFIG_TYPE_STRING, .u.string = "public", }, }, }; #define db_ce(x) (x->ces[DB_CE_NUM+0]) #define host_ce(x) (x->ces[DB_CE_NUM+1]) #define user_ce(x) (x->ces[DB_CE_NUM+2]) #define pass_ce(x) (x->ces[DB_CE_NUM+3]) #define port_ce(x) (x->ces[DB_CE_NUM+5]) #define schema_ce(x) (x->ces[DB_CE_NUM+6]) #if 0 /* our main output function, called by ulogd */ static int pgsql_output(ulog_iret_t *result) { struct _field *f; ulog_iret_t *res; PGresult *pgres; #ifdef IP_AS_STRING char *tmpstr; /* need this for --log-ip-as-string */ struct in_addr addr; #endif stmt_ins = stmt_val; for (f = fields; f; f = f->next) { res = keyh_getres(f->id); if (!res) { ulogd_log(ULOGD_NOTICE, "no result for %s ?!?\n", f->name); } if (!res || !IS_VALID((*res))) { /* no result, we have to fake something */ sprintf(stmt_ins, "NULL,"); stmt_ins = stmt + strlen(stmt); continue; } switch (res->type) { case ULOGD_RET_INT8: sprintf(stmt_ins, "%d,", res->value.i8); break; case ULOGD_RET_INT16: sprintf(stmt_ins, "%d,", res->value.i16); break; case ULOGD_RET_INT32: sprintf(stmt_ins, "%d,", res->value.i32); break; case ULOGD_RET_INT64: sprintf(stmt_ins, "%lld,", res->value.i64); break; case ULOGD_RET_UINT8: sprintf(stmt_ins, "%u,", res->value.ui8); break; case ULOGD_RET_UINT16: sprintf(stmt_ins, "%u,", res->value.ui16); break; case ULOGD_RET_IPADDR: #ifdef IP_AS_STRING *stmt_ins++ = '\''; memset(&addr, 0, sizeof(addr)); addr.s_addr = ntohl(res->value.ui32); tmpstr = (char *)inet_ntoa(addr); PQescapeString(stmt_ins,tmpstr,strlen(tmpstr)); stmt_ins = stmt + strlen(stmt); sprintf(stmt_ins, "',"); break; #endif /* IP_AS_STRING */ /* EVIL: fallthrough when logging IP as * u_int32_t */ case ULOGD_RET_UINT32: sprintf(stmt_ins, "%u,", res->value.ui32); break; case ULOGD_RET_UINT64: sprintf(stmt_ins, "%llu,", res->value.ui64); break; case ULOGD_RET_BOOL: sprintf(stmt_ins, "'%d',", res->value.b); break; case ULOGD_RET_STRING: *stmt_ins++ = '\''; PQescapeString(stmt_ins,res->value.ptr,strlen(res->value.ptr)); stmt_ins = stmt + strlen(stmt); sprintf(stmt_ins, "',"); break; case ULOGD_RET_RAW: ulogd_log(ULOGD_NOTICE,"%s: pgsql doesn't support type RAW\n",res->key); sprintf(stmt_ins, "NULL,"); break; default: ulogd_log(ULOGD_NOTICE, "unknown type %d for %s\n", res->type, res->key); break; } stmt_ins = stmt + strlen(stmt); } *(stmt_ins - 1) = ')'; DEBUGP("stmt=#%s#\n", stmt); /* now we have created our statement, insert it */ /* Added code by Jaki */ pgres = PQexec(dbh, stmt); if(!pgres || PQresultStatus(pgres) != PGRES_COMMAND_OK) { ulogd_log(ULOGD_ERROR, "sql error during insert: %s\n", PQresultErrorMessage(pgres)); return 1; } return 0; } #endif #define PGSQL_HAVE_NAMESPACE_TEMPLATE "SELECT nspname FROM pg_namespace n WHERE n.nspname='%s'" /* Determine if server support schemas */ static int pgsql_namespace(void) { PGresult *result; char pgbuf[strlen(PGSQL_HAVE_NAMESPACE_TEMPLATE)+strlen(schema_ce.u.string)+1]; if (!dbh) return 1; sprintf(pgbuf, PGSQL_HAVE_NAMESPACE_TEMPLATE, schema_ce.u.string); ulogd_log(ULOGD_DEBUG, "%s\n", pgbuf); result = PQexec(dbh, pgbuf); if (!result) { ulogd_log(ULOGD_DEBUG, "\n result false"); return 1; } if (PQresultStatus(result) == PGRES_TUPLES_OK) { ulogd_log(ULOGD_DEBUG, "using schema %s\n", schema_ce.u.string); pgsql_have_schemas = 1; } else { pgsql_have_schemas = 0; } PQclear(result); return 0; } #define PGSQL_GETCOLUMN_TEMPLATE "SELECT a.attname FROM pg_class c, pg_attribute a WHERE c.relname ='%s' AND a.attnum>0 AND a.attrelid=c.oid ORDER BY a.attnum" #define PGSQL_GETCOLUMN_TEMPLATE_SCHEMA "SELECT a.attname FROM pg_attribute a, pg_class c LEFT JOIN pg_namespace n ON c.relnamespace=n.oid WHERE c.relname ='%s' AND n.nspname='%s' AND a.attnum>0 AND a.attrelid=c.oid AND a.attisdropped=FALSE ORDER BY a.attnum" /* find out which columns the table has */ static int get_columns_pgsql(struct ulogd_pluginstance *upi) { struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; PGresult *result; char pgbuf[strlen(PGSQL_GETCOLUMN_TEMPLATE_SCHEMA) + strlen(table) + strlen(schema_ce.u.string) + 2]; int intaux; if (!pi->dbh) { ulogd_log(ULOGD_ERROR, "no database handle\n"); return 1; } if (pgsql_have_schemas) { snprintf(pgbuf, sizeof(pgbuf)-1, PGSQL_GETCOLUMN_TEMPLATE_SCHEMA, table_ce(upi->config_kset).u.string, schema_ce(upi->config_kset).u.string); } else { snprintf(pgbuf, sizeof(pgbuf)-1, PGSQL_GETCOLUMN_TEMPLATE, table_ce(upi->config_kset).u.string); } ulogd_log(ULOGD_DEBUG, "%s\n", pgbuf); result = PQexec(dbh, pgbuf); if (!result) { ulogd_log(ULOGD_DEBUG, "result false"); return -1; } if (PQresultStatus(result) != PGRES_TUPLES_OK) { ulogd_log(ULOGD_DEBUG, "pres_command_not_ok"); return -1; } if (upi->input.keys) free(upi->input.keys); upi->input.num_keys = PQntuples(result); ulogd_log(ULOGD_DEBUG, "%u fields in table\n", upi->input.num_keys); upi->input.keys = malloc(sizeof(struct ulogd_key) * upi->input.num_keys); if (!upi->input.keys) { upi->input.num_keys = 0; ulogd_log(ULOGD_ERROR, "ENOMEM\n"); return -ENOMEM; } memset(upi->input.keys, 0, sizeof(struct ulogd_key) * upi->input.num_keys); for (intaux = 0; intaux < PQntuples(result); intaux++) { char buf[ULOGD_MAX_KEYLEN+1]; char *underscore; int id; /* replace all underscores with dots */ strncpy(buf, PQgetvalue(result, intaux, 0), ULOGD_MAX_KEYLEN); while ((underscore = strchr(buf, '_'))) *underscore = '.'; DEBUGP("field '%s' found: ", buf); /* add it to list of input keys */ strncpy(upi->input.keys[i].name, buf, ULOGD_MAX_KEYLEN); } /* FIXME: id? */ PQclear(result); return 0; } static int close_db_pgsql(struct ulogd_pluginstance *upi) { struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; return PQfinish(pi->dbh); } /* make connection and select database */ static int open_db_pgsql(struct ulogd_pluginstance *upi) { struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; int len; char *connstr; char *server = host_ce(upi->config_kset).u.string; char *port = port_ce(upi->config_kset).u.string; char *user = user_ce(upi->config_kset).u.string; char *pass = pass_ce(upi->config_kset).u.string; char *db = db_ce(upi->config_kset).u.string; /* 80 is more than what we need for the fixed parts below */ len = 80 + strlen(user) + strlen(db); /* hostname and and password are the only optionals */ if (server) len += strlen(server); if (pass) len += strlen(pass); if (port) len += 20; connstr = (char *) malloc(len); if (!connstr) return -ENOMEM; if (server) { strcpy(connstr, " host="); strcat(connstr, server); } if (port) { char portbuf[20]; snprintf(portbuf, sizeof(portbuf), " port=%u", port); strcat(connstr, portbuf); } strcat(connstr, " dbname="); strcat(connstr, db); strcat(connstr, " user="); strcat(connstr, user); if (pass) { strcat(connstr, " password="); strcat(connstr, pass); } dbh = PQconnectdb(connstr); if (PQstatus(dbh) != CONNECTION_OK) { close_db(upi); return -1; } if (pgsql_namespace()) { ulogd_log(ULOGD_ERROR, "unable to test for pgsql schemas\n"); close_db(upi); return -1; } return 0; } static int escape_string_pgsql(struct ulogd_pluginstance *upi, char *dst, const char *src, unsigned int len) { PQescapeString(dst, src, strlen(res->value.ptr)); return 0; } static int execute_pgsql(struct ulogd_pluginstance *upi, const char *stmt, unsigned int len) { struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; pi->pgres = PQexec(dbh, stmt); if (!pi->pgres || PQresultStatus(pi->pgres) != PGRES_COMMAND_OK) return -1; return 0; } static char *strerror_pgsql(struct ulogd_pluginstance *upi) { struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; return PQresultErrorMessage(pi->pgres); } static struct db_driver db_driver_pgsql = { .get_columns = &get_columns_pgsql, .open_db = &open_db_pgsql, .close_db = &close_db_pgsql, .escape_string = &escape_string_pgsql, .execute = &execute_pgsql, .strerror = &strerror_pgsql, }; static int configure_pgsql(struct ulogd_pluginstance *upi, struct ulogd_pluginstance_stack *stack) { struct pgsql_instance *pi = (struct pgsql_instance *) upi->private; di->driver = &db_driver_pgsql; return configure_db(upi, stack); } static struct ulogd_plugin pgsql_plugin = { .name = "PGSQL", .input = { .keys = NULL, .num_keys = 0, .type = ULOGD_DTYPE_PACKET | ULOGD_DTYPE_FLOW, }, .output = { .type = ULOGD_DTYPE_SINK, }, .config_kset = &pgsql_kset, .priv_size = sizeof(struct pgsql_instance), .start = &start_pgsql, .stop = &stop_db, .signal = &signal_db, .interp = &interp_pgsql, .version = ULOGD_VERSION, }; void __attribute__ ((constructor)) init(void); void _init(void) { ulogd_register_plugin(&pgsql_plugin); }