From 8da465aadbeeef3be8e4bab070b101755dc8debc Mon Sep 17 00:00:00 2001 From: laforge Date: Sat, 8 Oct 2005 10:40:21 +0000 Subject: merge changes from 1.x --- output/mysql/ulogd_MYSQL.c | 204 +++++++++++++++++++++++++++++++-------------- 1 file changed, 143 insertions(+), 61 deletions(-) (limited to 'output/mysql') diff --git a/output/mysql/ulogd_MYSQL.c b/output/mysql/ulogd_MYSQL.c index 78c799d..9014a29 100644 --- a/output/mysql/ulogd_MYSQL.c +++ b/output/mysql/ulogd_MYSQL.c @@ -1,4 +1,4 @@ -/* ulogd_MYSQL.c, Version $Revision: 1.15 $ +/* ulogd_MYSQL.c, Version $Revision$ * * ulogd output plugin for logging to a MySQL database * @@ -29,10 +29,17 @@ * See ulogd/doc/mysql.table.ipaddr-as-string as an example. * BE WARNED: This has _WAY_ less performance during table searches. * + * 09 Feb 2005, Sven Schuster : + * Added the "port" parameter to specify ports different from 3306 + * + * 12 May 2005, Jozsef Kadlecsik + * Added reconnecting to lost mysql server. */ #include #include +#include +#include #include #include #include @@ -49,6 +56,9 @@ struct _field { struct _field *next; }; +/* The plugin handler */ +static ulog_output_t mysql_plugin; + /* the database handle we are using */ static MYSQL *dbh; @@ -64,53 +74,74 @@ static char *stmt_val; /* pointer to current inser position in statement */ static char *stmt_ins; +/* Attempt to reconnect if connection is lost */ +time_t reconnect = 0; +#define TIME_ERR ((time_t)-1) /* Be paranoid */ + /* our configuration directives */ -static struct config_entry mysql_ces[] = { - { - .key = "db", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, - }, - { - .key = "host", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, - }, - { - .key = "user", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, - }, - { - .key = "pass", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, - }, - { - .key = "table", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, - }, +static config_entry_t db_ce = { + .key = "db", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, +}; + +static config_entry_t host_ce = { + .next = &db_ce, + .key = "host", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, +}; + +static config_entry_t user_ce = { + .next = &host_ce, + .key = "user", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, +}; + +static config_entry_t pass_ce = { + .next = &user_ce, + .key = "pass", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, +}; + +static config_entry_t table_ce = { + .next = &pass_ce, + .key = "table", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, }; -static struct config_keyset mysql_keyset = { - .num_ces = sizeof(mysql_ces)/sizeof(struct config_entry), - .ces = &mysql_ces, +static config_entry_t port_ce = { + .next = &table_ce, + .key = "port", + .type = CONFIG_TYPE_INT, }; +static config_entry_t reconnect_ce = { + .next = &port_ce, + .key = "reconnect", + .type = CONFIG_TYPE_INT, +}; + +static config_entry_t connect_timeout_ce = { + .next = &reconnect_ce, + .key = "connect_timeout", + .type = CONFIG_TYPE_INT, +}; -#define db_ce(x) x[0] -#define host_ce(x) x[1] -#define user_ce(x) x[2] -#define pass_ce(x) x[3] -#define table_ce(x) x[4] +static int _mysql_init_db(ulog_iret_t *result); /* our main output function, called by ulogd */ static int mysql_output(ulog_iret_t *result) { struct _field *f; ulog_iret_t *res; +#ifdef IP_AS_STRING char *tmpstr; /* need this for --log-ip-as-string */ + struct in_addr addr; +#endif stmt_ins = stmt_val; @@ -150,8 +181,10 @@ static int mysql_output(ulog_iret_t *result) break; case ULOGD_RET_IPADDR: #ifdef IP_AS_STRING + memset(&addr, 0, sizeof(addr)); + addr.s_addr = ntohl(res->value.ui32); *stmt_ins++ = '\''; - tmpstr = inet_ntoa(ntohl(res->value.ui32)); + tmpstr = inet_ntoa(addr); #ifdef OLD_MYSQL mysql_escape_string(stmt_ins, tmpstr, strlen(tmpstr)); @@ -206,15 +239,21 @@ static int mysql_output(ulog_iret_t *result) /* now we have created our statement, insert it */ - if(mysql_real_query(dbh, stmt, strlen(stmt))) { + if (mysql_real_query(dbh, stmt, strlen(stmt))) { ulogd_log(ULOGD_ERROR, "sql error during insert: %s\n", mysql_error(dbh)); - return 1; + return _mysql_init_db(result); } return 0; } +/* no connection, plugin disabled */ +static int mysql_output_disabled(ulog_iret_t *result) +{ + return 0; +} + #define MYSQL_INSERTTEMPL "insert into X (Y) values (Z)" #define MYSQL_VALSIZE 100 @@ -226,11 +265,8 @@ static int mysql_createstmt(void) char buf[ULOGD_MAX_KEYLEN]; char *underscore; - if (stmt) { - ulogd_log(ULOGD_NOTICE, "createstmt called, but stmt" - " already existing\n"); - return 1; - } + if (stmt) + free(stmt); /* caclulate the size for the insert statement */ size = strlen(MYSQL_INSERTTEMPL) + strlen(table_ce.u.string); @@ -247,7 +283,7 @@ static int mysql_createstmt(void) if (!stmt) { ulogd_log(ULOGD_ERROR, "OOM!\n"); - return 1; + return -1; } sprintf(stmt, "insert into %s (", table_ce.u.string); @@ -281,11 +317,18 @@ static int mysql_get_columns(const char *table) int id; if (!dbh) - return 1; + return -1; result = mysql_list_fields(dbh, table, NULL); if (!result) - return 1; + return -1; + + /* Cleanup before reconnect */ + while (fields) { + f = fields; + fields = f->next; + free(f); + } while ((field = mysql_fetch_field(result))) { @@ -307,7 +350,7 @@ static int mysql_get_columns(const char *table) f = (struct _field *) malloc(sizeof *f); if (!f) { ulogd_log(ULOGD_ERROR, "OOM!\n"); - return 1; + return -1; } strncpy(f->name, buf, ULOGD_MAX_KEYLEN); f->id = id; @@ -320,40 +363,80 @@ static int mysql_get_columns(const char *table) } /* make connection and select database */ -static int mysql_open_db(char *server, char *user, char *pass, char *db) +static int mysql_open_db(char *server, int port, char *user, char *pass, + char *db) { dbh = mysql_init(NULL); if (!dbh) - return 1; + return -1; - if (!mysql_real_connect(dbh, server, user, pass, db, 0, NULL, 0)) - return 1; + if (connect_timeout_ce.u.value) + mysql_options(dbh, MYSQL_OPT_CONNECT_TIMEOUT, (const char *) &connect_timeout_ce.u.value); + + if (!mysql_real_connect(dbh, server, user, pass, db, port, NULL, 0)) + return -1; return 0; } -static int mysql_init(void) +static int init_reconnect(void) { - /* have the opts parsed */ - config_parse_file("MYSQL", &table_ce); + if (reconnect_ce.u.value) { + reconnect = time(NULL); + if (reconnect != TIME_ERR) { + ulogd_log(ULOGD_ERROR, "no connection to database, " + "attempting to reconnect " + "after %u seconds\n", + reconnect_ce.u.value); + reconnect += reconnect_ce.u.value; + mysql_plugin.output = &_mysql_init_db; + return -1; + } + } + /* Disable plugin permanently */ + mysql_plugin.output = &mysql_output_disabled; + + return 0; +} - if (mysql_open_db(host_ce.u.string, user_ce.u.string, +static int _mysql_init_db(ulog_iret_t *result) +{ + if (reconnect && reconnect > time(NULL)) + return 0; + + if (mysql_open_db(host_ce.u.string, port_ce.u.value, user_ce.u.string, pass_ce.u.string, db_ce.u.string)) { ulogd_log(ULOGD_ERROR, "can't establish database connection\n"); - return -1; + return init_reconnect(); } /* read the fieldnames to know which values to insert */ if (mysql_get_columns(table_ce.u.string)) { ulogd_log(ULOGD_ERROR, "unable to get mysql columns\n"); - return -1; + return init_reconnect(); } mysql_createstmt(); + + /* enable plugin */ + mysql_plugin.output = &mysql_output; + + reconnect = 0; + if (result) + return mysql_output(result); + return 0; } -static void mysql_fini(void) +static int _mysql_init(void) +{ + /* have the opts parsed */ + config_parse_file("MYSQL", &connect_timeout_ce); + + return _mysql_init_db(NULL); +} + +static void _mysql_fini(void) { mysql_close(dbh); } @@ -361,9 +444,8 @@ static void mysql_fini(void) static ulog_output_t mysql_plugin = { .name = "mysql", .output = &mysql_output, - .init = &mysql_init, - .fini = &mysql_fini, - .config_kset = &mysql_keyset, + .init = &_mysql_init, + .fini = &_mysql_fini, }; void _init(void) -- cgit v1.2.3