summaryrefslogtreecommitdiffstats
path: root/util
diff options
context:
space:
mode:
authorEric Leblond <eric@regit.org>2013-04-20 12:44:17 +0200
committerEric Leblond <eric@regit.org>2013-05-21 19:57:02 +0200
commitbc817c2077ddfb37041a20cf7c71bc71b4d56003 (patch)
treed840879a6649955ab00ec76bd880c7920f69c651 /util
parent1d84cdeaa56fe003c4beb0452c8bc476ecfd75a8 (diff)
db: add ring buffer for DB query
This patch adds an optional ring buffer option which modify the way database queries are made. The main thread is only handling kernel message reading and query formatting. The SQL request is made in a separate dedicated thread. The idea is to try to avoid buffer overrun by minimizing the time requested to treat kernel message. Doing synchronous SQL request, as it was made before was causing a delay which could cause some messages to be lost in case of burst from kernel side.
Diffstat (limited to 'util')
-rw-r--r--util/db.c153
1 files changed, 141 insertions, 12 deletions
diff --git a/util/db.c b/util/db.c
index 14d9481..dae8897 100644
--- a/util/db.c
+++ b/util/db.c
@@ -7,7 +7,7 @@
* Portions (C) 2001 Alex Janssen <alex@ynfonatic.de>,
* (C) 2005 Sven Schuster <schuster.sven@gmx.de>,
* (C) 2005 Jozsef Kadlecsik <kadlec@blackhole.kfki.hu>
- * (C) 2008 Eric Leblond <eric@inl.fr>
+ * (C) 2008,2013 Eric Leblond <eric@regit.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2
@@ -32,6 +32,7 @@
#include <arpa/inet.h>
#include <time.h>
#include <inttypes.h>
+#include <pthread.h>
#include <ulogd/ulogd.h>
#include <ulogd/db.h>
@@ -90,6 +91,7 @@ static int sql_createstmt(struct ulogd_pluginstance *upi)
ulogd_log(ULOGD_ERROR, "OOM!\n");
return -ENOMEM;
}
+ mi->ring.length = size + 1;
if (strncasecmp(procedure,"INSERT", strlen("INSERT")) == 0 &&
(procedure[strlen("INSERT")] == '\0' ||
@@ -138,6 +140,8 @@ static int sql_createstmt(struct ulogd_pluginstance *upi)
static int _init_db(struct ulogd_pluginstance *upi);
+static void *__inject_thread(void *gdi);
+
int ulogd_db_configure(struct ulogd_pluginstance *upi,
struct ulogd_pluginstance_stack *stack)
{
@@ -185,6 +189,9 @@ int ulogd_db_configure(struct ulogd_pluginstance *upi,
di->backlog_full = 0;
}
+ /* check ring option */
+ di->ring.size = ringsize_ce(upi->config_kset).u.value;
+
return ret;
}
@@ -192,6 +199,7 @@ int ulogd_db_start(struct ulogd_pluginstance *upi)
{
struct db_instance *di = (struct db_instance *) upi->private;
int ret;
+ unsigned int i;
ulogd_log(ULOGD_NOTICE, "starting\n");
@@ -201,11 +209,51 @@ int ulogd_db_start(struct ulogd_pluginstance *upi)
ret = sql_createstmt(upi);
if (ret < 0)
- di->driver->close_db(upi);
+ goto db_error;
+
+ if (di->ring.size > 0) {
+ /* allocate */
+ di->ring.ring = calloc(di->ring.size, sizeof(char) * di->ring.length);
+ if (di->ring.ring == NULL) {
+ ret = -1;
+ goto db_error;
+ }
+ di->ring.wr_place = di->ring.ring;
+ ulogd_log(ULOGD_NOTICE,
+ "Allocating %d elements of size %d for ring\n",
+ di->ring.size, di->ring.length);
+ /* init start of query for each element */
+ for(i = 0; i < di->ring.size; i++) {
+ strncpy(di->ring.ring + di->ring.length * i + 1,
+ di->stmt,
+ strlen(di->stmt));
+ }
+ /* init cond & mutex */
+ ret = pthread_cond_init(&di->ring.cond, NULL);
+ if (ret != 0)
+ goto alloc_error;
+ ret = pthread_mutex_init(&di->ring.mutex, NULL);
+ if (ret != 0)
+ goto cond_error;
+ /* create thread */
+ ret = pthread_create(&di->db_thread_id, NULL, __inject_thread, upi);
+ if (ret != 0)
+ goto mutex_error;
+ }
di->interp = &_init_db;
return ret;
+
+mutex_error:
+ pthread_mutex_destroy(&di->ring.mutex);
+cond_error:
+ pthread_cond_destroy(&di->ring.cond);
+alloc_error:
+ free(di->ring.ring);
+db_error:
+ di->driver->close_db(upi);
+ return ret;
}
static int ulogd_db_instance_stop(struct ulogd_pluginstance *upi)
@@ -219,7 +267,13 @@ static int ulogd_db_instance_stop(struct ulogd_pluginstance *upi)
free(di->stmt);
di->stmt = NULL;
}
-
+ if (di->ring.size > 0) {
+ pthread_cancel(di->db_thread_id);
+ free(di->ring.ring);
+ pthread_cond_destroy(&di->ring.cond);
+ pthread_mutex_destroy(&di->ring.mutex);
+ di->ring.ring = NULL;
+ }
return 0;
}
@@ -262,13 +316,13 @@ static int _init_reconnect(struct ulogd_pluginstance *upi)
return 0;
}
-static void __format_query_db(struct ulogd_pluginstance *upi)
+static void __format_query_db(struct ulogd_pluginstance *upi, char *start)
{
struct db_instance *di = (struct db_instance *) &upi->private;
unsigned int i;
- char *stmt_ins = di->stmt + di->stmt_offset;
+ char *stmt_ins = start + di->stmt_offset;
for (i = 0; i < upi->input.num_keys; i++) {
struct ulogd_key *res = upi->input.keys[i].u.source;
@@ -279,13 +333,13 @@ static void __format_query_db(struct ulogd_pluginstance *upi)
if (!res)
ulogd_log(ULOGD_NOTICE, "no source for `%s' ?!?\n",
upi->input.keys[i].name);
-
+
if (!res || !IS_VALID(*res)) {
/* no result, we have to fake something */
stmt_ins += sprintf(stmt_ins, "NULL,");
continue;
}
-
+
switch (res->type) {
case ULOGD_RET_INT8:
sprintf(stmt_ins, "%d,", res->u.value.i8);
@@ -338,7 +392,7 @@ static void __format_query_db(struct ulogd_pluginstance *upi)
res->type, upi->input.keys[i].name);
break;
}
- stmt_ins = di->stmt + strlen(di->stmt);
+ stmt_ins = start + strlen(start);
}
*(stmt_ins - 1) = ')';
}
@@ -388,7 +442,7 @@ static int _init_db(struct ulogd_pluginstance *upi)
if (di->reconnect && di->reconnect > time(NULL)) {
/* store entry to backlog if it is active */
if (di->backlog_memcap && !di->backlog_full) {
- __format_query_db(upi);
+ __format_query_db(upi, di->stmt);
__add_to_backlog(upi, di->stmt,
strlen(di->stmt));
}
@@ -398,7 +452,7 @@ static int _init_db(struct ulogd_pluginstance *upi)
if (di->driver->open_db(upi)) {
ulogd_log(ULOGD_ERROR, "can't establish database connection\n");
if (di->backlog_memcap && !di->backlog_full) {
- __format_query_db(upi);
+ __format_query_db(upi, di->stmt);
__add_to_backlog(upi, di->stmt, strlen(di->stmt));
}
return _init_reconnect(upi);
@@ -442,14 +496,39 @@ static int __treat_backlog(struct ulogd_pluginstance *upi)
return 0;
}
+static int __add_to_ring(struct ulogd_pluginstance *upi, struct db_instance *di)
+{
+ if (*di->ring.wr_place == RING_QUERY_READY) {
+ if (di->ring.full == 0) {
+ ulogd_log(ULOGD_ERROR, "No place left in ring\n");
+ di->ring.full = 1;
+ }
+ return ULOGD_IRET_OK;
+ } else if (di->ring.full) {
+ ulogd_log(ULOGD_NOTICE, "Recovered some place in ring\n");
+ di->ring.full = 0;
+ }
+ __format_query_db(upi, di->ring.wr_place + 1);
+ *di->ring.wr_place = RING_QUERY_READY;
+ pthread_cond_signal(&di->ring.cond);
+ di->ring.wr_item ++;
+ di->ring.wr_place += di->ring.length;
+ if (di->ring.wr_item == di->ring.size) {
+ di->ring.wr_item = 0;
+ di->ring.wr_place = di->ring.ring;
+ }
+ return ULOGD_IRET_OK;
+}
+
/* our main output function, called by ulogd */
static int __interp_db(struct ulogd_pluginstance *upi)
{
struct db_instance *di = (struct db_instance *) &upi->private;
+ if (di->ring.size)
+ return __add_to_ring(upi, di);
- __format_query_db(upi);
- /* now we have created our statement, insert it */
+ __format_query_db(upi, di->stmt);
/* if backup log is not empty we add current query to it */
if (!llist_empty(&di->backlog)) {
@@ -475,6 +554,56 @@ static int __interp_db(struct ulogd_pluginstance *upi)
return 0;
}
+static int __loop_reconnect_db(struct ulogd_pluginstance * upi) {
+ struct db_instance *di = (struct db_instance *) &upi->private;
+
+ di->driver->close_db(upi);
+ while (1) {
+ if (di->driver->open_db(upi)) {
+ sleep(1);
+ } else {
+ return 0;
+ }
+ }
+ return 0;
+}
+
+static void *__inject_thread(void *gdi)
+{
+ struct ulogd_pluginstance *upi = (struct ulogd_pluginstance *) gdi;
+ struct db_instance *di = (struct db_instance *) &upi->private;
+ char *wr_place;
+
+ wr_place = di->ring.ring;
+ pthread_mutex_lock(&di->ring.mutex);
+ while(1) {
+ /* wait cond */
+ pthread_cond_wait(&di->ring.cond, &di->ring.mutex);
+ while (*wr_place == RING_QUERY_READY) {
+ if (di->driver->execute(upi, wr_place + 1,
+ strlen(wr_place + 1)) < 0) {
+ if (__loop_reconnect_db(upi) != 0) {
+ /* loop has failed on unrecoverable error */
+ ulogd_log(ULOGD_ERROR,
+ "permanently disabling plugin\n");
+ di->interp = &disabled_interp_db;
+ return NULL;
+ }
+ }
+ *wr_place = RING_NO_QUERY;
+ di->ring.rd_item++;
+ if (di->ring.rd_item == di->ring.size) {
+ di->ring.rd_item = 0;
+ wr_place = di->ring.ring;
+ } else
+ wr_place += di->ring.length;
+ }
+ }
+
+ return NULL;
+}
+
+
void ulogd_db_signal(struct ulogd_pluginstance *upi, int signal)
{
switch (signal) {