summaryrefslogtreecommitdiffstats
path: root/util/db.c
diff options
context:
space:
mode:
authorEric Leblond <eric@regit.org>2013-03-17 19:41:36 +0100
committerEric Leblond <eric@regit.org>2013-05-21 19:47:53 +0200
commit6e4d8af18923ec4873d96228ccf24eda3da4577e (patch)
tree3e62f9de5e185124928578454f29d8586d59fb85 /util/db.c
parent33136bc95407aab889778bc4ebba9622bc4c8f63 (diff)
db: store data in memory during database downtime
This patch is adding a mechanism to store query in a backlog build in memory. This allow to store events during downtime in memory and realize the effective insertion when the database comes back. A memory cap is used to avoid any memory flooding.
Diffstat (limited to 'util/db.c')
-rw-r--r--util/db.c170
1 files changed, 142 insertions, 28 deletions
diff --git a/util/db.c b/util/db.c
index 0d8b9c1..655a3ff 100644
--- a/util/db.c
+++ b/util/db.c
@@ -167,7 +167,22 @@ int ulogd_db_configure(struct ulogd_pluginstance *upi,
* but abort during input key resolving routines. configure
* doesn't have a destructor... */
di->driver->close_db(upi);
+
+ INIT_LLIST_HEAD(&di->backlog);
+ di->backlog_memusage = 0;
+ di->backlog_memcap = backlog_memcap_ce(upi->config_kset).u.value;
+ if (di->backlog_memcap > 0) {
+ di->backlog_oneshot = backlog_oneshot_ce(upi->config_kset).u.value;
+ if (di->backlog_oneshot <= 2) {
+ ulogd_log(ULOGD_ERROR,
+ "backlog_oneshot_requests must be > 2 to hope"
+ " cleaning. Setting it to 3.\n");
+ di->backlog_oneshot = 3;
+ }
+ di->backlog_full = 0;
+ }
+
return ret;
}
@@ -245,38 +260,15 @@ static int _init_reconnect(struct ulogd_pluginstance *upi)
return 0;
}
-static int _init_db(struct ulogd_pluginstance *upi)
-{
- struct db_instance *di = (struct db_instance *) upi->private;
-
- if (di->reconnect && di->reconnect > time(NULL))
- return 0;
-
- if (di->driver->open_db(upi)) {
- ulogd_log(ULOGD_ERROR, "can't establish database connection\n");
- return _init_reconnect(upi);
- }
-
- /* enable 'real' logging */
- di->interp = &__interp_db;
-
- di->reconnect = 0;
-
- /* call the interpreter function to actually write the
- * log line that we wanted to write */
- return __interp_db(upi);
-}
-
-
-/* our main output function, called by ulogd */
-static int __interp_db(struct ulogd_pluginstance *upi)
+static void __format_query_db(struct ulogd_pluginstance *upi)
{
struct db_instance *di = (struct db_instance *) &upi->private;
+
unsigned int i;
di->stmt_ins = di->stmt_val;
- for (i = 0; i < upi->input.num_keys; i++) {
+ for (i = 0; i < upi->input.num_keys; i++) {
struct ulogd_key *res = upi->input.keys[i].u.source;
if (upi->input.keys[i].flags & ULOGD_KEYF_INACTIVE)
@@ -325,8 +317,8 @@ static int __interp_db(struct ulogd_pluginstance *upi)
case ULOGD_RET_STRING:
*(di->stmt_ins++) = '\'';
if (res->u.value.ptr) {
- di->stmt_ins +=
- di->driver->escape_string(upi, di->stmt_ins,
+ di->stmt_ins +=
+ di->driver->escape_string(upi, di->stmt_ins,
res->u.value.ptr,
strlen(res->u.value.ptr));
}
@@ -347,10 +339,132 @@ static int __interp_db(struct ulogd_pluginstance *upi)
di->stmt_ins = di->stmt + strlen(di->stmt);
}
*(di->stmt_ins - 1) = ')';
+}
+
+static int __add_to_backlog(struct ulogd_pluginstance *upi, const char *stmt, unsigned int len)
+{
+ struct db_instance *di = (struct db_instance *) &upi->private;
+ struct db_stmt *query;
+ /* check if we are using backlog */
+ if (di->backlog_memcap == 0)
+ return 0;
+
+ /* check len against backlog */
+ if (len + di->backlog_memusage > di->backlog_memcap) {
+ if (di->backlog_full == 0)
+ ulogd_log(ULOGD_ERROR,
+ "Backlog is full starting to reject events.\n");
+ di->backlog_full = 1;
+ return -1;
+ }
+
+ query = malloc(sizeof(struct db_stmt));
+ if (query == NULL)
+ return -1;
+
+ query->stmt = strndup(stmt, len);
+ query->len = len;
+
+ if (query->stmt == NULL) {
+ free(query);
+ return -1;
+ }
+
+ di->backlog_memusage += len + sizeof(struct db_stmt);
+ di->backlog_full = 0;
+
+ llist_add_tail(&query->list, &di->backlog);
+
+ return 0;
+}
+
+static int _init_db(struct ulogd_pluginstance *upi)
+{
+ struct db_instance *di = (struct db_instance *) upi->private;
+
+ if (di->reconnect && di->reconnect > time(NULL)) {
+ /* store entry to backlog if it is active */
+ if (di->backlog_memcap && !di->backlog_full) {
+ __format_query_db(upi);
+ __add_to_backlog(upi, di->stmt,
+ strlen(di->stmt));
+ }
+ return 0;
+ }
+
+ if (di->driver->open_db(upi)) {
+ ulogd_log(ULOGD_ERROR, "can't establish database connection\n");
+ if (di->backlog_memcap && !di->backlog_full) {
+ __format_query_db(upi);
+ __add_to_backlog(upi, di->stmt, strlen(di->stmt));
+ }
+ return _init_reconnect(upi);
+ }
+
+ /* enable 'real' logging */
+ di->interp = &__interp_db;
+
+ di->reconnect = 0;
+
+ /* call the interpreter function to actually write the
+ * log line that we wanted to write */
+ return __interp_db(upi);
+}
+
+static int __treat_backlog(struct ulogd_pluginstance *upi)
+{
+ struct db_instance *di = (struct db_instance *) &upi->private;
+ int i = di->backlog_oneshot;
+ struct db_stmt *query;
+ struct db_stmt *nquery;
+
+ /* Don't try reconnect before timeout */
+ if (di->reconnect && di->reconnect > time(NULL))
+ return 0;
+
+ llist_for_each_entry_safe(query, nquery, &di->backlog, list) {
+ if (di->driver->execute(upi, query->stmt, query->len) < 0) {
+ /* error occur, database connexion need to be closed */
+ di->driver->close_db(upi);
+ return _init_reconnect(upi);
+ } else {
+ di->backlog_memusage -= query->len + sizeof(struct db_stmt);
+ llist_del(&query->list);
+ free(query->stmt);
+ free(query);
+ }
+ if (--i < 0)
+ break;
+ }
+ return 0;
+}
+
+/* our main output function, called by ulogd */
+static int __interp_db(struct ulogd_pluginstance *upi)
+{
+ struct db_instance *di = (struct db_instance *) &upi->private;
+
+
+ __format_query_db(upi);
/* now we have created our statement, insert it */
+ /* if backup log is not empty we add current query to it */
+ if (!llist_empty(&di->backlog)) {
+ int ret = __add_to_backlog(upi, di->stmt, strlen(di->stmt));
+ if (ret == 0)
+ return __treat_backlog(upi);
+ else {
+ ret = __treat_backlog(upi);
+ if (ret)
+ return ret;
+ /* try adding once the data to backlog */
+ return __add_to_backlog(upi, di->stmt, strlen(di->stmt));
+ }
+ }
+
if (di->driver->execute(upi, di->stmt, strlen(di->stmt)) < 0) {
+ __add_to_backlog(upi, di->stmt, strlen(di->stmt));
/* error occur, database connexion need to be closed */
di->driver->close_db(upi);
return _init_reconnect(upi);