summaryrefslogtreecommitdiffstats
path: root/util
diff options
context:
space:
mode:
Diffstat (limited to 'util')
-rw-r--r--util/db.c153
1 files changed, 141 insertions, 12 deletions
diff --git a/util/db.c b/util/db.c
index 14d9481..dae8897 100644
--- a/util/db.c
+++ b/util/db.c
@@ -7,7 +7,7 @@
* Portions (C) 2001 Alex Janssen <alex@ynfonatic.de>,
* (C) 2005 Sven Schuster <schuster.sven@gmx.de>,
* (C) 2005 Jozsef Kadlecsik <kadlec@blackhole.kfki.hu>
- * (C) 2008 Eric Leblond <eric@inl.fr>
+ * (C) 2008,2013 Eric Leblond <eric@regit.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2
@@ -32,6 +32,7 @@
#include <arpa/inet.h>
#include <time.h>
#include <inttypes.h>
+#include <pthread.h>
#include <ulogd/ulogd.h>
#include <ulogd/db.h>
@@ -90,6 +91,7 @@ static int sql_createstmt(struct ulogd_pluginstance *upi)
ulogd_log(ULOGD_ERROR, "OOM!\n");
return -ENOMEM;
}
+ mi->ring.length = size + 1;
if (strncasecmp(procedure,"INSERT", strlen("INSERT")) == 0 &&
(procedure[strlen("INSERT")] == '\0' ||
@@ -138,6 +140,8 @@ static int sql_createstmt(struct ulogd_pluginstance *upi)
static int _init_db(struct ulogd_pluginstance *upi);
+static void *__inject_thread(void *gdi);
+
int ulogd_db_configure(struct ulogd_pluginstance *upi,
struct ulogd_pluginstance_stack *stack)
{
@@ -185,6 +189,9 @@ int ulogd_db_configure(struct ulogd_pluginstance *upi,
di->backlog_full = 0;
}
+ /* check ring option */
+ di->ring.size = ringsize_ce(upi->config_kset).u.value;
+
return ret;
}
@@ -192,6 +199,7 @@ int ulogd_db_start(struct ulogd_pluginstance *upi)
{
struct db_instance *di = (struct db_instance *) upi->private;
int ret;
+ unsigned int i;
ulogd_log(ULOGD_NOTICE, "starting\n");
@@ -201,11 +209,51 @@ int ulogd_db_start(struct ulogd_pluginstance *upi)
ret = sql_createstmt(upi);
if (ret < 0)
- di->driver->close_db(upi);
+ goto db_error;
+
+ if (di->ring.size > 0) {
+ /* allocate */
+ di->ring.ring = calloc(di->ring.size, sizeof(char) * di->ring.length);
+ if (di->ring.ring == NULL) {
+ ret = -1;
+ goto db_error;
+ }
+ di->ring.wr_place = di->ring.ring;
+ ulogd_log(ULOGD_NOTICE,
+ "Allocating %d elements of size %d for ring\n",
+ di->ring.size, di->ring.length);
+ /* init start of query for each element */
+ for(i = 0; i < di->ring.size; i++) {
+ strncpy(di->ring.ring + di->ring.length * i + 1,
+ di->stmt,
+ strlen(di->stmt));
+ }
+ /* init cond & mutex */
+ ret = pthread_cond_init(&di->ring.cond, NULL);
+ if (ret != 0)
+ goto alloc_error;
+ ret = pthread_mutex_init(&di->ring.mutex, NULL);
+ if (ret != 0)
+ goto cond_error;
+ /* create thread */
+ ret = pthread_create(&di->db_thread_id, NULL, __inject_thread, upi);
+ if (ret != 0)
+ goto mutex_error;
+ }
di->interp = &_init_db;
return ret;
+
+mutex_error:
+ pthread_mutex_destroy(&di->ring.mutex);
+cond_error:
+ pthread_cond_destroy(&di->ring.cond);
+alloc_error:
+ free(di->ring.ring);
+db_error:
+ di->driver->close_db(upi);
+ return ret;
}
static int ulogd_db_instance_stop(struct ulogd_pluginstance *upi)
@@ -219,7 +267,13 @@ static int ulogd_db_instance_stop(struct ulogd_pluginstance *upi)
free(di->stmt);
di->stmt = NULL;
}
-
+ if (di->ring.size > 0) {
+ pthread_cancel(di->db_thread_id);
+ free(di->ring.ring);
+ pthread_cond_destroy(&di->ring.cond);
+ pthread_mutex_destroy(&di->ring.mutex);
+ di->ring.ring = NULL;
+ }
return 0;
}
@@ -262,13 +316,13 @@ static int _init_reconnect(struct ulogd_pluginstance *upi)
return 0;
}
-static void __format_query_db(struct ulogd_pluginstance *upi)
+static void __format_query_db(struct ulogd_pluginstance *upi, char *start)
{
struct db_instance *di = (struct db_instance *) &upi->private;
unsigned int i;
- char *stmt_ins = di->stmt + di->stmt_offset;
+ char *stmt_ins = start + di->stmt_offset;
for (i = 0; i < upi->input.num_keys; i++) {
struct ulogd_key *res = upi->input.keys[i].u.source;
@@ -279,13 +333,13 @@ static void __format_query_db(struct ulogd_pluginstance *upi)
if (!res)
ulogd_log(ULOGD_NOTICE, "no source for `%s' ?!?\n",
upi->input.keys[i].name);
-
+
if (!res || !IS_VALID(*res)) {
/* no result, we have to fake something */
stmt_ins += sprintf(stmt_ins, "NULL,");
continue;
}
-
+
switch (res->type) {
case ULOGD_RET_INT8:
sprintf(stmt_ins, "%d,", res->u.value.i8);
@@ -338,7 +392,7 @@ static void __format_query_db(struct ulogd_pluginstance *upi)
res->type, upi->input.keys[i].name);
break;
}
- stmt_ins = di->stmt + strlen(di->stmt);
+ stmt_ins = start + strlen(start);
}
*(stmt_ins - 1) = ')';
}
@@ -388,7 +442,7 @@ static int _init_db(struct ulogd_pluginstance *upi)
if (di->reconnect && di->reconnect > time(NULL)) {
/* store entry to backlog if it is active */
if (di->backlog_memcap && !di->backlog_full) {
- __format_query_db(upi);
+ __format_query_db(upi, di->stmt);
__add_to_backlog(upi, di->stmt,
strlen(di->stmt));
}
@@ -398,7 +452,7 @@ static int _init_db(struct ulogd_pluginstance *upi)
if (di->driver->open_db(upi)) {
ulogd_log(ULOGD_ERROR, "can't establish database connection\n");
if (di->backlog_memcap && !di->backlog_full) {
- __format_query_db(upi);
+ __format_query_db(upi, di->stmt);
__add_to_backlog(upi, di->stmt, strlen(di->stmt));
}
return _init_reconnect(upi);
@@ -442,14 +496,39 @@ static int __treat_backlog(struct ulogd_pluginstance *upi)
return 0;
}
+static int __add_to_ring(struct ulogd_pluginstance *upi, struct db_instance *di)
+{
+ if (*di->ring.wr_place == RING_QUERY_READY) {
+ if (di->ring.full == 0) {
+ ulogd_log(ULOGD_ERROR, "No place left in ring\n");
+ di->ring.full = 1;
+ }
+ return ULOGD_IRET_OK;
+ } else if (di->ring.full) {
+ ulogd_log(ULOGD_NOTICE, "Recovered some place in ring\n");
+ di->ring.full = 0;
+ }
+ __format_query_db(upi, di->ring.wr_place + 1);
+ *di->ring.wr_place = RING_QUERY_READY;
+ pthread_cond_signal(&di->ring.cond);
+ di->ring.wr_item ++;
+ di->ring.wr_place += di->ring.length;
+ if (di->ring.wr_item == di->ring.size) {
+ di->ring.wr_item = 0;
+ di->ring.wr_place = di->ring.ring;
+ }
+ return ULOGD_IRET_OK;
+}
+
/* our main output function, called by ulogd */
static int __interp_db(struct ulogd_pluginstance *upi)
{
struct db_instance *di = (struct db_instance *) &upi->private;
+ if (di->ring.size)
+ return __add_to_ring(upi, di);
- __format_query_db(upi);
- /* now we have created our statement, insert it */
+ __format_query_db(upi, di->stmt);
/* if backup log is not empty we add current query to it */
if (!llist_empty(&di->backlog)) {
@@ -475,6 +554,56 @@ static int __interp_db(struct ulogd_pluginstance *upi)
return 0;
}
+static int __loop_reconnect_db(struct ulogd_pluginstance * upi) {
+ struct db_instance *di = (struct db_instance *) &upi->private;
+
+ di->driver->close_db(upi);
+ while (1) {
+ if (di->driver->open_db(upi)) {
+ sleep(1);
+ } else {
+ return 0;
+ }
+ }
+ return 0;
+}
+
+static void *__inject_thread(void *gdi)
+{
+ struct ulogd_pluginstance *upi = (struct ulogd_pluginstance *) gdi;
+ struct db_instance *di = (struct db_instance *) &upi->private;
+ char *wr_place;
+
+ wr_place = di->ring.ring;
+ pthread_mutex_lock(&di->ring.mutex);
+ while(1) {
+ /* wait cond */
+ pthread_cond_wait(&di->ring.cond, &di->ring.mutex);
+ while (*wr_place == RING_QUERY_READY) {
+ if (di->driver->execute(upi, wr_place + 1,
+ strlen(wr_place + 1)) < 0) {
+ if (__loop_reconnect_db(upi) != 0) {
+ /* loop has failed on unrecoverable error */
+ ulogd_log(ULOGD_ERROR,
+ "permanently disabling plugin\n");
+ di->interp = &disabled_interp_db;
+ return NULL;
+ }
+ }
+ *wr_place = RING_NO_QUERY;
+ di->ring.rd_item++;
+ if (di->ring.rd_item == di->ring.size) {
+ di->ring.rd_item = 0;
+ wr_place = di->ring.ring;
+ } else
+ wr_place += di->ring.length;
+ }
+ }
+
+ return NULL;
+}
+
+
void ulogd_db_signal(struct ulogd_pluginstance *upi, int signal)
{
switch (signal) {