diff options
Diffstat (limited to 'util/db.c')
-rw-r--r-- | util/db.c | 153 |
1 files changed, 141 insertions, 12 deletions
@@ -7,7 +7,7 @@ * Portions (C) 2001 Alex Janssen <alex@ynfonatic.de>, * (C) 2005 Sven Schuster <schuster.sven@gmx.de>, * (C) 2005 Jozsef Kadlecsik <kadlec@blackhole.kfki.hu> - * (C) 2008 Eric Leblond <eric@inl.fr> + * (C) 2008,2013 Eric Leblond <eric@regit.org> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 @@ -32,6 +32,7 @@ #include <arpa/inet.h> #include <time.h> #include <inttypes.h> +#include <pthread.h> #include <ulogd/ulogd.h> #include <ulogd/db.h> @@ -90,6 +91,7 @@ static int sql_createstmt(struct ulogd_pluginstance *upi) ulogd_log(ULOGD_ERROR, "OOM!\n"); return -ENOMEM; } + mi->ring.length = size + 1; if (strncasecmp(procedure,"INSERT", strlen("INSERT")) == 0 && (procedure[strlen("INSERT")] == '\0' || @@ -138,6 +140,8 @@ static int sql_createstmt(struct ulogd_pluginstance *upi) static int _init_db(struct ulogd_pluginstance *upi); +static void *__inject_thread(void *gdi); + int ulogd_db_configure(struct ulogd_pluginstance *upi, struct ulogd_pluginstance_stack *stack) { @@ -185,6 +189,9 @@ int ulogd_db_configure(struct ulogd_pluginstance *upi, di->backlog_full = 0; } + /* check ring option */ + di->ring.size = ringsize_ce(upi->config_kset).u.value; + return ret; } @@ -192,6 +199,7 @@ int ulogd_db_start(struct ulogd_pluginstance *upi) { struct db_instance *di = (struct db_instance *) upi->private; int ret; + unsigned int i; ulogd_log(ULOGD_NOTICE, "starting\n"); @@ -201,11 +209,51 @@ int ulogd_db_start(struct ulogd_pluginstance *upi) ret = sql_createstmt(upi); if (ret < 0) - di->driver->close_db(upi); + goto db_error; + + if (di->ring.size > 0) { + /* allocate */ + di->ring.ring = calloc(di->ring.size, sizeof(char) * di->ring.length); + if (di->ring.ring == NULL) { + ret = -1; + goto db_error; + } + di->ring.wr_place = di->ring.ring; + ulogd_log(ULOGD_NOTICE, + "Allocating %d elements of size %d for ring\n", + di->ring.size, di->ring.length); + /* init start of query for each element */ + for(i = 0; i < di->ring.size; i++) { + strncpy(di->ring.ring + di->ring.length * i + 1, + di->stmt, + strlen(di->stmt)); + } + /* init cond & mutex */ + ret = pthread_cond_init(&di->ring.cond, NULL); + if (ret != 0) + goto alloc_error; + ret = pthread_mutex_init(&di->ring.mutex, NULL); + if (ret != 0) + goto cond_error; + /* create thread */ + ret = pthread_create(&di->db_thread_id, NULL, __inject_thread, upi); + if (ret != 0) + goto mutex_error; + } di->interp = &_init_db; return ret; + +mutex_error: + pthread_mutex_destroy(&di->ring.mutex); +cond_error: + pthread_cond_destroy(&di->ring.cond); +alloc_error: + free(di->ring.ring); +db_error: + di->driver->close_db(upi); + return ret; } static int ulogd_db_instance_stop(struct ulogd_pluginstance *upi) @@ -219,7 +267,13 @@ static int ulogd_db_instance_stop(struct ulogd_pluginstance *upi) free(di->stmt); di->stmt = NULL; } - + if (di->ring.size > 0) { + pthread_cancel(di->db_thread_id); + free(di->ring.ring); + pthread_cond_destroy(&di->ring.cond); + pthread_mutex_destroy(&di->ring.mutex); + di->ring.ring = NULL; + } return 0; } @@ -262,13 +316,13 @@ static int _init_reconnect(struct ulogd_pluginstance *upi) return 0; } -static void __format_query_db(struct ulogd_pluginstance *upi) +static void __format_query_db(struct ulogd_pluginstance *upi, char *start) { struct db_instance *di = (struct db_instance *) &upi->private; unsigned int i; - char *stmt_ins = di->stmt + di->stmt_offset; + char *stmt_ins = start + di->stmt_offset; for (i = 0; i < upi->input.num_keys; i++) { struct ulogd_key *res = upi->input.keys[i].u.source; @@ -279,13 +333,13 @@ static void __format_query_db(struct ulogd_pluginstance *upi) if (!res) ulogd_log(ULOGD_NOTICE, "no source for `%s' ?!?\n", upi->input.keys[i].name); - + if (!res || !IS_VALID(*res)) { /* no result, we have to fake something */ stmt_ins += sprintf(stmt_ins, "NULL,"); continue; } - + switch (res->type) { case ULOGD_RET_INT8: sprintf(stmt_ins, "%d,", res->u.value.i8); @@ -338,7 +392,7 @@ static void __format_query_db(struct ulogd_pluginstance *upi) res->type, upi->input.keys[i].name); break; } - stmt_ins = di->stmt + strlen(di->stmt); + stmt_ins = start + strlen(start); } *(stmt_ins - 1) = ')'; } @@ -388,7 +442,7 @@ static int _init_db(struct ulogd_pluginstance *upi) if (di->reconnect && di->reconnect > time(NULL)) { /* store entry to backlog if it is active */ if (di->backlog_memcap && !di->backlog_full) { - __format_query_db(upi); + __format_query_db(upi, di->stmt); __add_to_backlog(upi, di->stmt, strlen(di->stmt)); } @@ -398,7 +452,7 @@ static int _init_db(struct ulogd_pluginstance *upi) if (di->driver->open_db(upi)) { ulogd_log(ULOGD_ERROR, "can't establish database connection\n"); if (di->backlog_memcap && !di->backlog_full) { - __format_query_db(upi); + __format_query_db(upi, di->stmt); __add_to_backlog(upi, di->stmt, strlen(di->stmt)); } return _init_reconnect(upi); @@ -442,14 +496,39 @@ static int __treat_backlog(struct ulogd_pluginstance *upi) return 0; } +static int __add_to_ring(struct ulogd_pluginstance *upi, struct db_instance *di) +{ + if (*di->ring.wr_place == RING_QUERY_READY) { + if (di->ring.full == 0) { + ulogd_log(ULOGD_ERROR, "No place left in ring\n"); + di->ring.full = 1; + } + return ULOGD_IRET_OK; + } else if (di->ring.full) { + ulogd_log(ULOGD_NOTICE, "Recovered some place in ring\n"); + di->ring.full = 0; + } + __format_query_db(upi, di->ring.wr_place + 1); + *di->ring.wr_place = RING_QUERY_READY; + pthread_cond_signal(&di->ring.cond); + di->ring.wr_item ++; + di->ring.wr_place += di->ring.length; + if (di->ring.wr_item == di->ring.size) { + di->ring.wr_item = 0; + di->ring.wr_place = di->ring.ring; + } + return ULOGD_IRET_OK; +} + /* our main output function, called by ulogd */ static int __interp_db(struct ulogd_pluginstance *upi) { struct db_instance *di = (struct db_instance *) &upi->private; + if (di->ring.size) + return __add_to_ring(upi, di); - __format_query_db(upi); - /* now we have created our statement, insert it */ + __format_query_db(upi, di->stmt); /* if backup log is not empty we add current query to it */ if (!llist_empty(&di->backlog)) { @@ -475,6 +554,56 @@ static int __interp_db(struct ulogd_pluginstance *upi) return 0; } +static int __loop_reconnect_db(struct ulogd_pluginstance * upi) { + struct db_instance *di = (struct db_instance *) &upi->private; + + di->driver->close_db(upi); + while (1) { + if (di->driver->open_db(upi)) { + sleep(1); + } else { + return 0; + } + } + return 0; +} + +static void *__inject_thread(void *gdi) +{ + struct ulogd_pluginstance *upi = (struct ulogd_pluginstance *) gdi; + struct db_instance *di = (struct db_instance *) &upi->private; + char *wr_place; + + wr_place = di->ring.ring; + pthread_mutex_lock(&di->ring.mutex); + while(1) { + /* wait cond */ + pthread_cond_wait(&di->ring.cond, &di->ring.mutex); + while (*wr_place == RING_QUERY_READY) { + if (di->driver->execute(upi, wr_place + 1, + strlen(wr_place + 1)) < 0) { + if (__loop_reconnect_db(upi) != 0) { + /* loop has failed on unrecoverable error */ + ulogd_log(ULOGD_ERROR, + "permanently disabling plugin\n"); + di->interp = &disabled_interp_db; + return NULL; + } + } + *wr_place = RING_NO_QUERY; + di->ring.rd_item++; + if (di->ring.rd_item == di->ring.size) { + di->ring.rd_item = 0; + wr_place = di->ring.ring; + } else + wr_place += di->ring.length; + } + } + + return NULL; +} + + void ulogd_db_signal(struct ulogd_pluginstance *upi, int signal) { switch (signal) { |