From 3ebf985c9020aae4eb6b84768c94ba46d6dc17fc Mon Sep 17 00:00:00 2001 From: laforge Date: Sat, 15 Oct 2005 22:43:51 +0000 Subject: semi-complete mysql output plugin port --- output/mysql/ulogd_output_MYSQL.c | 525 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 525 insertions(+) create mode 100644 output/mysql/ulogd_output_MYSQL.c (limited to 'output/mysql/ulogd_output_MYSQL.c') diff --git a/output/mysql/ulogd_output_MYSQL.c b/output/mysql/ulogd_output_MYSQL.c new file mode 100644 index 0000000..78d9cb4 --- /dev/null +++ b/output/mysql/ulogd_output_MYSQL.c @@ -0,0 +1,525 @@ +/* ulogd_MYSQL.c, Version $Revision$ + * + * ulogd output plugin for logging to a MySQL database + * + * (C) 2000-2005 by Harald Welte + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 + * as published by the Free Software Foundation + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * $Id$ + * + * 15 May 2001, Alex Janssen : + * Added a compability option for older MySQL-servers, which + * don't support mysql_real_escape_string + * + * 17 May 2001, Alex Janssen : + * Added the --with-mysql-log-ip-as-string feature. This will log + * IP's as string rather than an unsigned long integer to the database. + * See ulogd/doc/mysql.table.ipaddr-as-string as an example. + * BE WARNED: This has _WAY_ less performance during table searches. + * + * 09 Feb 2005, Sven Schuster : + * Added the "port" parameter to specify ports different from 3306 + * + * 12 May 2005, Jozsef Kadlecsik + * Added reconnecting to lost mysql server. + * + * 15 Oct 2005, Harald Welte + * Port to ulogd2 (@ 0sec conference, Bern, Suisse) + */ + +#include +#include +#include +#include + +#include +#include + +#include + +#ifdef DEBUG_MYSQL +#define DEBUGP(x, args...) fprintf(stderr, x, ## args) +#else +#define DEBUGP(x, args...) +#endif + +struct mysql_instance { + /* the database handle we are using */ + MYSQL *dbh; + + /* buffer for our insert statement */ + char *stmt; + + /* pointer to the beginning of the "VALUES" part */ + char *stmt_val; + + /* pointer to current inser position in statement */ + char *stmt_ins; + + /* Attempt to reconnect if connection is lost */ + time_t reconnect = 0; + #define TIME_ERR ((time_t)-1) /* Be paranoid */ +}; + +/* our configuration directives */ +static struct config_keyset mysql_kset = { + .num_ces = 8, + .ces = { + { + .key = "db", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "host", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "user", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "pass", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "table", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "port", + .type = CONFIG_TYPE_INT, + }, + { + .key = "reconnect", + .type = CONFIG_TYPE_INT, + }, + { + .key = "connect_timeout", + .type = CONFIG_TYPE_INT, + }, + { + .key = "ip_as_string", + .type = CONFIG_TYPE_INT, + }, + }, +}; +#define db_ce(x) (x->ces[0]) +#define host_ce(x) (x->ces[1]) +#define user_ce(x) (x->ces[2]) +#define pass_ce(x) (x->ces[3]) +#define table_ce(x) (x->ces[4]) +#define port_ce(x) (x->ces[5]) +#define reconnect_ce(x) (x->ces[6]) +#define timeout_ce(x) (x->ces[7]) +#define asstring_ce(x) (x->ces[8]) + +static struct ulogd_plugin mysql_plugin; + +// FIXME static int _mysql_init_db(ulog_iret_t *result); + +/* our main output function, called by ulogd */ +static int mysql_output(struct ulogd_pluginstance *upi) +{ + struct mysql_instance *mi = (struct mysql_instance *) &upi->private; + struct _field *f; + struct ulogd_key *res; + char *tmpstr; /* need this for --log-ip-as-string */ + struct in_addr addr; + + stmt_ins = stmt_val; + + for (i = 0; i < upi->input.num; i++) { + res = upi->input[i].source; + + if (!res) + ulogd_log(ULOGD_NOTICE, + "no result for %s ?!?\n", f->name); + + if (!res || !IS_VALID(res)) { + /* no result, we have to fake something */ + sprintf(stmt_ins, "NULL,"); + stmt_ins = stmt + strlen(stmt); + continue; + } + + switch (res->type) { + case ULOGD_RET_INT8: + sprintf(stmt_ins, "%d,", res->value.i8); + break; + case ULOGD_RET_INT16: + sprintf(stmt_ins, "%d,", res->value.i16); + break; + case ULOGD_RET_INT32: + sprintf(stmt_ins, "%d,", res->value.i32); + break; + case ULOGD_RET_INT64: + sprintf(stmt_ins, "%lld,", res->value.i64); + break; + case ULOGD_RET_UINT8: + sprintf(stmt_ins, "%u,", res->value.ui8); + break; + case ULOGD_RET_UINT16: + sprintf(stmt_ins, "%u,", res->value.ui16); + break; + case ULOGD_RET_IPADDR: + if (asstring_ce(upi).u.value) { + memset(&addr, 0, sizeof(addr)); + addr.s_addr = ntohl(res->value.ui32); + *stmt_ins++ = '\''; + tmpstr = inet_ntoa(addr); +#ifdef OLD_MYSQL + mysql_escape_string(stmt_ins, tmpstr, + strlen(tmpstr)); +#else + mysql_real_escape_string(dbh, stmt_ins, + tmpstr, + strlen(tmpstr)); +#endif /* OLD_MYSQL */ + stmt_ins = stmt + strlen(stmt); + sprintf(stmt_ins, "',"); + break; + } + /* fallthrough when logging IP as u_int32_t */ + case ULOGD_RET_UINT32: + sprintf(stmt_ins, "%u,", res->value.ui32); + break; + case ULOGD_RET_UINT64: + sprintf(stmt_ins, "%llu,", res->value.ui64); + break; + case ULOGD_RET_BOOL: + sprintf(stmt_ins, "'%d',", res->value.b); + break; + case ULOGD_RET_STRING: + *stmt_ins++ = '\''; +#ifdef OLD_MYSQL + mysql_escape_string(stmt_ins, res->value.ptr, + strlen(res->value.ptr)); +#else + mysql_real_escape_string(dbh, stmt_ins, + res->value.ptr, strlen(res->value.ptr)); +#endif + stmt_ins = stmt + strlen(stmt); + sprintf(stmt_ins, "',"); + break; + case ULOGD_RET_RAW: + ulogd_log(ULOGD_NOTICE, + "%s: type RAW not supported by MySQL\n", + res->key); + break; + default: + ulogd_log(ULOGD_NOTICE, + "unknown type %d for %s\n", + res->type, res->key); + break; + } + mi->stmt_ins = mi->stmt + strlen(mi->stmt); + } + *(mi->stmt_ins - 1) = ')'; + DEBUGP("stmt=#%s#\n", mi->stmt); + + /* now we have created our statement, insert it */ + + if (mysql_real_query(mi->dbh, mi->stmt, strlen(mi->stmt))) { + ulogd_log(ULOGD_ERROR, "sql error during insert: %s\n", + mysql_error(mi->dbh)); + + // FIXME return _mysql_init_db(upi); + } + + return 0; +} + +/* no connection, plugin disabled */ +static int mysql_output_disabled(struct ulogd_pluginstance *upi) +{ + return 0; +} + +#define MYSQL_INSERTTEMPL "insert into X (Y) values (Z)" +#define MYSQL_VALSIZE 100 + +/* create the static part of our insert statement */ +static int mysql_createstmt(struct ulogd_pluginstance *upi) +{ + struct mysql_instance *mi = (struct mysql_instance *) upi->private; + struct _field *f; + unsigned int size; + char buf[ULOGD_MAX_KEYLEN]; + char *underscore; + + if (mi->stmt) + free(mi->stmt); + + /* caclulate the size for the insert statement */ + size = strlen(MYSQL_INSERTTEMPL) + strlen(table_ce(upi->config_kset).u.string); + + /* FIXME: we don't know the number *Sigh* */ + for (i = 0; i < upi->input.num_keys; i++) { + /* we need space for the key and a comma, as well as + * enough space for the values */ + size += strlen(upi->input[i].name) + 1 + MYSQL_VALSIZE; + } + + ulogd_log(ULOGD_DEBUG, "allocating %u bytes for statement\n", size); + + mi->stmt = (char *) malloc(size); + if (!mi->stmt) { + ulogd_log(ULOGD_ERROR, "OOM!\n"); + return -ENOMEM; + } + + sprintf(mi->stmt, "insert into %s (", table_ce(upi).u.string); + mi->stmt_val = mi->stmt + strlen(mi->stmt); + + for (i = 0; i < upi->input.num_keys; i++) { + strncpy(buf, upi->input[i].name, ULOGD_MAX_KEYLEN); + while ((underscore = strchr(buf, '.'))) + *underscore = '_'; + sprintf(mi->stmt_val, "%s,", buf); + mi->stmt_val = mi->stmt + strlen(mi->stmt); + } + *(stmt_val - 1) = ')'; + + sprintf(mi->stmt_val, " values ("); + mi->stmt_val = mi->stmt + strlen(mi->stmt); + + ulogd_log(ULOGD_DEBUG, "stmt='%s'\n", mi->stmt); + + return 0; +} + +/* find out which columns the table has */ +static int mysql_get_columns(struct ulogd_pluginstance *upi, const char *table) +{ + struct mysql_instance *mi = (struct mysql_instance *) upi->private; + MYSQL_RES *result; + MYSQL_FIELD *field; + struct ulogd_key *f, *f2; + int i; + + if (!mi->dbh) + return -1; + + result = mysql_list_fields(mi->dbh, table_ce(upi->config_ces).u.string, NULL); + if (!result) + return -1; + + /* Thea idea here is that we can create a pluginstance specific input key + * array by not specifyling a plugin input key list. ulogd core will then + * set upi->input to NULL. Yes, this creates a memory hole in case the core + * just calls ->configure() and then aborts (and thus never free()s the memory + * we allocate here. FIXME. */ + + /* Cleanup before reconnect */ + if (upi->input) { + free(upi->input); + upi->input = NULL; + } + + upi->input = malloc(sizeof(struct ulogd_key) * mysql_field_count(mi->dbh)); + if (!upi->input) + return -ENOMEM; + + i = 0; + while ((field = mysql_fetch_field(result))) { + char buf[ULOGD_MAX_KEYLEN+1]; + char *underscore; + int id; + + /* replace all underscores with dots */ + strncpy(buf, field->name, ULOGD_MAX_KEYLEN); + while ((underscore = strchr(buf, '_'))) + *underscore = '.'; + + DEBUGP("field '%s' found: ", buf); + + /* add it u list of input keys */ + strncpy(pi->input[i].name, buf, ULOGD_MAX_KEYLEN); + i++; + } + + mysql_free_result(result); + return 0; +} + +/* make connection and select database */ +static int open_db(struct ulogd_pluginstance *upi, char *server, + int port, char *user, char *pass, char *db) +{ + struct mysql_instance *mi = (struct mysql_instance *) upi->private; + unsigned int connect_timeout = timeout_ce(upi->config_kset).u.value; + + mi->dbh = mysql_init(NULL); + if (!mi->dbh) + return -1; + + if (connect_timeout) + mysql_options(mi->dbh, MYSQL_OPT_CONNECT_TIMEOUT, + (const char *) &connect_timeout); + + if (!mysql_real_connect(mi->dbh, server, user, pass, db, port, NULL, 0)) + return -1; + + return 0; +} + +#if 0 +static int init_reconnect(struct ulogd_pluginstance *upi) +{ + struct mysql_instance *mi = (struct mysql_instance *) upi->private; + if (reconnect_ce(upi->config_kset).u.value) { + mi->reconnect = time(NULL); + if (mi->reconnect != TIME_ERR) { + ulogd_log(ULOGD_ERROR, "no connection to database, " + "attempting to reconnect " + "after %u seconds\n", + reconnect_ce(upi).u.value); + mi->reconnect += reconnect_ce(upi).u.value; + mysql_plugin.interp = &_mysql_init_db; + return -1; + } + } + /* Disable plugin permanently */ + mysql_plugin.interp = &mysql_output_disabled; + + return 0; +} + +static int _mysql_init_db(struct ulogd_pluginstance *upi) +{ + struct mysql_instance *mi = (struct mysql_instance *) upi->private; + if (mi->reconnect && mi->reconnect > time(NULL)) + return 0; + + if (open_db(upi, host_ce(upi->config_kset).u.string, + port_ce(upi->config_kset).u.value, + user_ce(upi->config_kset).u.string, + pass_ce(upi->config_kset).u.string, + db_ce(upi->config_kset).u.string)) { + ulogd_log(ULOGD_ERROR, "can't establish database connection\n"); + return init_reconnect(upi); + } + +#if 0 + /* read the fieldnames to know which values to insert */ + if (mysql_get_columns(table_ce.u.string)) { + ulogd_log(ULOGD_ERROR, "unable to get mysql columns\n"); + return init_reconnect(); + } + mysql_createstmt(); +#endif + /* enable plugin */ + mysql_plugin.output = &mysql_output; + + mi->reconnect = 0; + + return mysql_output(result); +} +#endif + +static int configure_mysql(struct ulogd_pluginstance *upi, + struct ulogd_pluginstance_stack *stack) +{ + struct mysql_instance *mi = (struct mysql_instance *) upi->private; + int ret; + + /* First: Parse configuration file section for this instance */ + ret = config_parse_file(upi->id, upi->config_kset); + if (ret < 0) + return ret; + + /* Second: Open Database */ + ret = open_db(upi, host_ce(upi->config_kset).u.string, + port_ce(upi->config_kset).u.value, + user_ce(upi->config_kset).u.string, + pass_ce(upi->config_kset).u.string, + db_ce(upi->config_kset).u.string); + if (ret < 0) + return ret; + + /* Third: Determine required input keys for given table */ + ret = mysql_get_columns(upi); + + /* Close database, since ulogd core could just call configure + * but abort during input key resolving routines. configure + * doesn't have a destructor... */ + mysql_close(mi->dbh); + + return ret; +} + +static int start_mysql(struct ulogd_pluginstance *upi) +{ + struct mysql_instance *mi = (struct mysql_instance *) upi->private; + int ret; + + ret = open_db(upi, host_ce(upi->config_kset).u.string, + port_ce(upi->config_kset).u.value, + user_ce(upi->config_kset).u.string, + pass_ce(upi->config_kset).u.string, + db_ce(upi->config_kset).u.string); + if (ret < 0) + return ret; + + ret = mysql_createstmt(upi); + if (ret < 0) + mysql_close(mi->dbh); + + return ret; +} + +static int stop_mysql(struct ulogd_pluginstance *upi) +{ + struct mysql_instance *mi = (struct mysql_instance *) upi->private; + mysql_close(mi->dbh); + + /* try to free our dynamically allocated input key array */ + if (upi->input) { + free(upi->input); + upi->input = 0; + } + return 0; +} + +static struct ulogd_plugin mysql_plugin = { + .name = "MYSQL", + .input = { + .keys = , + .num_keys = , + .type = ULOGD_DTYPE_PACKET, + }, + .output = { + .type = ULOGD_DTYPE_SINK, + }, + .config_kset = mysql_kset, + .priv_size = sizeof(struct mysql_instance), + .configure = &configure_mysql, + .start = &start_mysql, + .stop = &stop_mysql, + .signal = &signal_mysql, + .interp = &interp_mysql, +}; + +void __attribute__ ((constructor)) init(void); + +void init(void) +{ + ulogd_register_plugin(&mysql_plugin); +} -- cgit v1.2.3