From 2177d789d1320ae6df9377815607fb02e44ac2b3 Mon Sep 17 00:00:00 2001 From: "/C=DE/ST=Berlin/L=Berlin/O=Netfilter Project/OU=Development/CN=laforge/emailAddress=laforge@netfilter.org" Date: Thu, 8 Dec 2005 12:42:06 +0000 Subject: convert mysql to new DB api --- output/mysql/ulogd_output_MYSQL.c | 433 ++++++-------------------------------- 1 file changed, 62 insertions(+), 371 deletions(-) (limited to 'output/mysql/ulogd_output_MYSQL.c') diff --git a/output/mysql/ulogd_output_MYSQL.c b/output/mysql/ulogd_output_MYSQL.c index 68106bb..766fdb3 100644 --- a/output/mysql/ulogd_output_MYSQL.c +++ b/output/mysql/ulogd_output_MYSQL.c @@ -48,6 +48,8 @@ #include #include +#include "../../util/db.c" + #ifdef DEBUG_MYSQL #define DEBUGP(x, args...) fprintf(stderr, x, ## args) #else @@ -55,20 +57,15 @@ #endif struct mysql_instance { + struct db_instance db_inst; MYSQL *dbh; /* the database handle we are using */ - char *stmt; /* buffer for our insert statement */ - char *stmt_val; /* pointer to the beginning of the "VALUES" part */ - char *stmt_ins; /* pointer to current inser position in statement */ - time_t reconnect; /* Attempt to reconnect if connection is lost */ - - int (*interp)(struct ulogd_pluginstance *upi); }; -#define TIME_ERR ((time_t)-1) /* Be paranoid */ /* our configuration directives */ -static struct config_keyset mysql_kset = { - .num_ces = 9, +static struct config_keyset kset_mysql = { + .num_ces = DB_CE_NUM+5, .ces = { + DB_CES, { .key = "db", .type = CONFIG_TYPE_STRING, @@ -89,235 +86,19 @@ static struct config_keyset mysql_kset = { .type = CONFIG_TYPE_STRING, .options = CONFIG_OPT_MANDATORY, }, - { - .key = "table", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, - }, { .key = "port", .type = CONFIG_TYPE_INT, }, - { - .key = "reconnect", - .type = CONFIG_TYPE_INT, - }, - { - .key = "connect_timeout", - .type = CONFIG_TYPE_INT, - }, - { - .key = "ip_as_string", - .type = CONFIG_TYPE_INT, - }, }, }; -#define db_ce(x) (x->ces[0]) -#define host_ce(x) (x->ces[1]) -#define user_ce(x) (x->ces[2]) -#define pass_ce(x) (x->ces[3]) -#define table_ce(x) (x->ces[4]) -#define port_ce(x) (x->ces[5]) -#define reconnect_ce(x) (x->ces[6]) -#define timeout_ce(x) (x->ces[7]) -#define asstring_ce(x) (x->ces[8]) - -static struct ulogd_plugin mysql_plugin; - -static int _mysql_init_db(struct ulogd_pluginstance *upi); - -/* this is a wrapper that just calls the current real - * interp function */ -static int interp_mysql(struct ulogd_pluginstance *upi) -{ - struct mysql_instance *mi = (struct mysql_instance *) &upi->private; - return mi->interp(upi); -} - -/* our main output function, called by ulogd */ -static int __interp_mysql(struct ulogd_pluginstance *upi) -{ - struct mysql_instance *mi = (struct mysql_instance *) &upi->private; - int i; - - mi->stmt_ins = mi->stmt_val; - - for (i = 0; i < upi->input.num_keys; i++) { - struct ulogd_key *res = upi->input.keys[i].u.source; - - if (upi->input.keys[i].flags & ULOGD_KEYF_INACTIVE) - continue; - - if (!res) - ulogd_log(ULOGD_NOTICE, "no source for `%s' ?!?\n", - upi->input.keys[i].name); - - if (!res || !IS_VALID(*res)) { - /* no result, we have to fake something */ - mi->stmt_ins += sprintf(mi->stmt_ins, "NULL,"); - continue; - } - - switch (res->type) { - char *tmpstr; - struct in_addr addr; - case ULOGD_RET_INT8: - sprintf(mi->stmt_ins, "%d,", res->u.value.i8); - break; - case ULOGD_RET_INT16: - sprintf(mi->stmt_ins, "%d,", res->u.value.i16); - break; - case ULOGD_RET_INT32: - sprintf(mi->stmt_ins, "%d,", res->u.value.i32); - break; - case ULOGD_RET_INT64: - sprintf(mi->stmt_ins, "%lld,", res->u.value.i64); - break; - case ULOGD_RET_UINT8: - sprintf(mi->stmt_ins, "%u,", res->u.value.ui8); - break; - case ULOGD_RET_UINT16: - sprintf(mi->stmt_ins, "%u,", res->u.value.ui16); - break; - case ULOGD_RET_IPADDR: - if (asstring_ce(upi->config_kset).u.value) { - memset(&addr, 0, sizeof(addr)); - addr.s_addr = ntohl(res->u.value.ui32); - *(mi->stmt_ins++) = '\''; - tmpstr = inet_ntoa(addr); - #ifdef OLD_MYSQL - mysql_escape_string(mi->stmt_ins, tmpstr, - strlen(tmpstr)); - #else - mysql_real_escape_string(mi->dbh, mi->stmt_ins, - tmpstr, - strlen(tmpstr)); - #endif /* OLD_MYSQL */ - mi->stmt_ins = mi->stmt + strlen(mi->stmt); - sprintf(mi->stmt_ins, "',"); - break; - } - /* fallthrough when logging IP as u_int32_t */ - case ULOGD_RET_UINT32: - sprintf(mi->stmt_ins, "%u,", res->u.value.ui32); - break; - case ULOGD_RET_UINT64: - sprintf(mi->stmt_ins, "%llu,", res->u.value.ui64); - break; - case ULOGD_RET_BOOL: - sprintf(mi->stmt_ins, "'%d',", res->u.value.b); - break; - case ULOGD_RET_STRING: - *(mi->stmt_ins++) = '\''; - if (res->u.value.ptr) { - #ifdef OLD_MYSQL - mi->stmt_ins += mysql_escape_string(mi->stmt_ins, - res->u.value.ptr, - strlen(res->u.value.ptr)); - #else - mi->stmt_ins += mysql_real_escape_string( - mi->dbh, mi->stmt_ins, - res->u.value.ptr, - strlen(res->u.value.ptr)); - #endif - } - sprintf(mi->stmt_ins, "',"); - break; - case ULOGD_RET_RAW: - ulogd_log(ULOGD_NOTICE, - "%s: type RAW not supported by MySQL\n", - upi->input.keys[i].name); - break; - default: - ulogd_log(ULOGD_NOTICE, - "unknown type %d for %s\n", - res->type, upi->input.keys[i].name); - break; - } - mi->stmt_ins = mi->stmt + strlen(mi->stmt); - } - *(mi->stmt_ins - 1) = ')'; - DEBUGP("stmt=#%s#\n", mi->stmt); - - /* now we have created our statement, insert it */ - - if (mysql_real_query(mi->dbh, mi->stmt, strlen(mi->stmt))) { - ulogd_log(ULOGD_ERROR, "sql error during insert: %s\n", - mysql_error(mi->dbh)); - return _mysql_init_db(upi); - } - - return 0; -} - -/* no connection, plugin disabled */ -static int mysql_output_disabled(struct ulogd_pluginstance *upi) -{ - return 0; -} - -#define MYSQL_INSERTTEMPL "insert into X (Y) values (Z)" -#define MYSQL_VALSIZE 100 - -/* create the static part of our insert statement */ -static int mysql_createstmt(struct ulogd_pluginstance *upi) -{ - struct mysql_instance *mi = (struct mysql_instance *) upi->private; - struct _field *f; - unsigned int size; - char buf[ULOGD_MAX_KEYLEN]; - char *underscore; - int i; - - if (mi->stmt) - free(mi->stmt); - - /* caclulate the size for the insert statement */ - size = strlen(MYSQL_INSERTTEMPL) + - strlen(table_ce(upi->config_kset).u.string); - - for (i = 0; i < upi->input.num_keys; i++) { - if (upi->input.keys[i].flags & ULOGD_KEYF_INACTIVE) - continue; - /* we need space for the key and a comma, as well as - * enough space for the values */ - size += strlen(upi->input.keys[i].name) + 1 + MYSQL_VALSIZE; - } - - ulogd_log(ULOGD_DEBUG, "allocating %u bytes for statement\n", size); - - mi->stmt = (char *) malloc(size); - if (!mi->stmt) { - ulogd_log(ULOGD_ERROR, "OOM!\n"); - return -ENOMEM; - } - - sprintf(mi->stmt, "insert into %s (", - table_ce(upi->config_kset).u.string); - mi->stmt_val = mi->stmt + strlen(mi->stmt); - - for (i = 0; i < upi->input.num_keys; i++) { - if (upi->input.keys[i].flags & ULOGD_KEYF_INACTIVE) - continue; - - strncpy(buf, upi->input.keys[i].name, ULOGD_MAX_KEYLEN); - while ((underscore = strchr(buf, '.'))) - *underscore = '_'; - sprintf(mi->stmt_val, "%s,", buf); - mi->stmt_val = mi->stmt + strlen(mi->stmt); - } - *(mi->stmt_val - 1) = ')'; - - sprintf(mi->stmt_val, " values ("); - mi->stmt_val = mi->stmt + strlen(mi->stmt); - - ulogd_log(ULOGD_DEBUG, "stmt='%s'\n", mi->stmt); - - return 0; -} - +#define db_ce(x) (x->ces[DB_CE_NUM+0]) +#define host_ce(x) (x->ces[DB_CE_NUM+1]) +#define user_ce(x) (x->ces[DB_CE_NUM+2]) +#define pass_ce(x) (x->ces[DB_CE_NUM+3]) +#define port_ce(x) (x->ces[DB_CE_NUM+4]) /* find out which columns the table has */ -static int mysql_get_columns(struct ulogd_pluginstance *upi) +static int get_columns_mysql(struct ulogd_pluginstance *upi) { struct mysql_instance *mi = (struct mysql_instance *) upi->private; MYSQL_RES *result; @@ -372,7 +153,7 @@ static int mysql_get_columns(struct ulogd_pluginstance *upi) DEBUGP("field '%s' found\n", buf); - /* add it u list of input keys */ + /* add it to list of input keys */ strncpy(upi->input.keys[i].name, buf, ULOGD_MAX_KEYLEN); } /* MySQL Auto increment ... ID :) */ @@ -382,12 +163,23 @@ static int mysql_get_columns(struct ulogd_pluginstance *upi) return 0; } +static int close_db_mysql(struct ulogd_pluginstance *upi) +{ + struct mysql_instance *mi = (struct mysql_instance *) upi->private; + mysql_close(mi->dbh); + return 0; +} + /* make connection and select database */ -static int open_db(struct ulogd_pluginstance *upi, char *server, - int port, char *user, char *pass, char *db) +static int open_db_mysql(struct ulogd_pluginstance *upi) { struct mysql_instance *mi = (struct mysql_instance *) upi->private; unsigned int connect_timeout = timeout_ce(upi->config_kset).u.value; + char *server = host_ce(upi->config_kset).u.string; + u_int16_t port = port_ce(upi->config_kset).u.value; + char *user = user_ce(upi->config_kset).u.string; + char *pass = pass_ce(upi->config_kset).u.string; + char *db = pass_ce(upi->config_kset).u.string; mi->dbh = mysql_init(NULL); if (!mi->dbh) { @@ -407,157 +199,56 @@ static int open_db(struct ulogd_pluginstance *upi, char *server, return 0; } -static int init_reconnect(struct ulogd_pluginstance *upi) +static int escape_string_mysql(struct ulogd_pluginstance *upi, + char *dst, const char *src, unsigned int len) { struct mysql_instance *mi = (struct mysql_instance *) upi->private; - if (reconnect_ce(upi->config_kset).u.value) { - mi->reconnect = time(NULL); - if (mi->reconnect != TIME_ERR) { - ulogd_log(ULOGD_ERROR, "no connection to database, " - "attempting to reconnect after %u seconds\n", - reconnect_ce(upi->config_kset).u.value); - mi->reconnect += reconnect_ce(upi->config_kset).u.value; - mi->interp = &_mysql_init_db; - return -1; - } - } - - /* Disable plugin permanently */ - ulogd_log(ULOGD_ERROR, "permanently disabling plugin\n"); - mi->interp = &mysql_output_disabled; - - return 0; -} -static int _mysql_init_db(struct ulogd_pluginstance *upi) -{ - struct mysql_instance *mi = (struct mysql_instance *) upi->private; - - if (mi->reconnect && mi->reconnect > time(NULL)) - return 0; - - if (open_db(upi, host_ce(upi->config_kset).u.string, - port_ce(upi->config_kset).u.value, - user_ce(upi->config_kset).u.string, - pass_ce(upi->config_kset).u.string, - db_ce(upi->config_kset).u.string)) { - ulogd_log(ULOGD_ERROR, "can't establish database connection\n"); - return init_reconnect(upi); - } - -#if 0 - /* read the fieldnames to know which values to insert */ - if (mysql_get_columns(table_ce.u.string)) { - ulogd_log(ULOGD_ERROR, "unable to get mysql columns\n"); - return init_reconnect(); - } - mysql_createstmt(); -#endif - /* enable 'real' logging */ - mi->interp = &__interp_mysql; - - mi->reconnect = 0; - - /* call the interpreter function to actually write the - * log line that we wanted to write */ - return __interp_mysql(upi); +#ifdef OLD_MYSQL + return mysql_escape_string(dst, src, len); +#else + return mysql_real_escape_string(mi->dbh, dst, src, len); +#endif /* OLD_MYSQL */ } -static int configure_mysql(struct ulogd_pluginstance *upi, - struct ulogd_pluginstance_stack *stack) +static int execute_mysql(struct ulogd_pluginstance *upi, + const char *stmt, unsigned int len) { struct mysql_instance *mi = (struct mysql_instance *) upi->private; int ret; - ulogd_log(ULOGD_NOTICE, "(re)configuring\n"); - - /* Assign the default interp function */ - mi->interp = &__interp_mysql; - - /* First: Parse configuration file section for this instance */ - ret = config_parse_file(upi->id, upi->config_kset); - if (ret < 0) { - ulogd_log(ULOGD_ERROR, "error parsing config file\n"); - return ret; - } - - /* Second: Open Database */ - ret = open_db(upi, host_ce(upi->config_kset).u.string, - port_ce(upi->config_kset).u.value, - user_ce(upi->config_kset).u.string, - pass_ce(upi->config_kset).u.string, - db_ce(upi->config_kset).u.string); - if (ret < 0) { - ulogd_log(ULOGD_ERROR, "error in open_db\n"); - return ret; - } + ret = mysql_real_query(mi->dbh, stmt, len); + if (ret) + return -1; - /* Third: Determine required input keys for given table */ - ret = mysql_get_columns(upi); - if (ret < 0) - ulogd_log(ULOGD_ERROR, "error in get_columns\n"); - - /* Close database, since ulogd core could just call configure - * but abort during input key resolving routines. configure - * doesn't have a destructor... */ - mysql_close(mi->dbh); - - return ret; + return 0; } -static int start_mysql(struct ulogd_pluginstance *upi) +static char *strerror_mysql(struct ulogd_pluginstance *upi) { struct mysql_instance *mi = (struct mysql_instance *) upi->private; - int ret; - - ulogd_log(ULOGD_NOTICE, "starting\n"); - - ret = open_db(upi, host_ce(upi->config_kset).u.string, - port_ce(upi->config_kset).u.value, - user_ce(upi->config_kset).u.string, - pass_ce(upi->config_kset).u.string, - db_ce(upi->config_kset).u.string); - if (ret < 0) - return ret; - - ret = mysql_createstmt(upi); - if (ret < 0) - mysql_close(mi->dbh); - - return ret; + return (char *) mysql_error(mi->dbh); } -static int stop_mysql(struct ulogd_pluginstance *upi) -{ - struct mysql_instance *mi = (struct mysql_instance *) upi->private; - - ulogd_log(ULOGD_NOTICE, "stopping\n"); - mysql_close(mi->dbh); - - /* try to free our dynamically allocated input key array */ - if (upi->input.keys) { - free(upi->input.keys); - upi->input.keys = NULL; - } - return 0; -} +static struct db_driver db_driver_mysql = { + .get_columns = &get_columns_mysql, + .open_db = &open_db_mysql, + .close_db = &close_db_mysql, + .escape_string = &escape_string_mysql, + .execute = &execute_mysql, + .strerror = &strerror_mysql, +}; -static void signal_mysql(struct ulogd_pluginstance *upi, - int signal) +static int configure_mysql(struct ulogd_pluginstance *upi, + struct ulogd_pluginstance_stack *stack) { - switch (signal) { - case SIGHUP: - /* reopen database connection */ - stop_mysql(upi); - start_mysql(upi); - break; - default: - break; - } -} + struct db_instance *di = (struct db_instance *) &upi->private; + di->driver = &db_driver_mysql; + return configure_db(upi, stack); +} -static struct ulogd_plugin mysql_plugin = { +static struct ulogd_plugin plugin_mysql = { .name = "MYSQL", .input = { .keys = NULL, @@ -567,13 +258,13 @@ static struct ulogd_plugin mysql_plugin = { .output = { .type = ULOGD_DTYPE_SINK, }, - .config_kset = &mysql_kset, + .config_kset = &kset_mysql, .priv_size = sizeof(struct mysql_instance), .configure = &configure_mysql, - .start = &start_mysql, - .stop = &stop_mysql, - .signal = &signal_mysql, - .interp = &interp_mysql, + .start = &start_db, + .stop = &stop_db, + .signal = &signal_db, + .interp = &interp_db, .version = ULOGD_VERSION, }; @@ -581,5 +272,5 @@ void __attribute__ ((constructor)) init(void); void init(void) { - ulogd_register_plugin(&mysql_plugin); + ulogd_register_plugin(&plugin_mysql); } -- cgit v1.2.3