diff options
Diffstat (limited to 'output/sqlite3')
-rw-r--r-- | output/sqlite3/ulogd_output_SQLITE3.c | 609 |
1 files changed, 350 insertions, 259 deletions
diff --git a/output/sqlite3/ulogd_output_SQLITE3.c b/output/sqlite3/ulogd_output_SQLITE3.c index ba1b594..0de8145 100644 --- a/output/sqlite3/ulogd_output_SQLITE3.c +++ b/output/sqlite3/ulogd_output_SQLITE3.c @@ -1,4 +1,3 @@ -#if 0 /* * ulogd output plugin for logging to a SQLITE database * @@ -26,6 +25,9 @@ * * 2005-02-09 Harald Welte <laforge@gnumonks.org>: * - port to ulogd-1.20 + * + * 2006-10-09 Holger Eitzenberger <holger@my-eitzenberger.de> + * - port to ulogd-2.00 */ #include <stdlib.h> @@ -34,381 +36,470 @@ #include <ulogd/ulogd.h> #include <ulogd/conffile.h> #include <sqlite3.h> +#include <sys/queue.h> + +#define CFG_BUFFER_DEFAULT 10 + +/* number of colums we have (really should be configurable) */ +#define DB_NUM_COLS 11 -#ifdef DEBUG_SQLITE3 +#if 0 #define DEBUGP(x, args...) fprintf(stderr, x, ## args) #else #define DEBUGP(x, args...) #endif -struct _field { +struct field { + TAILQ_ENTRY(field) link; char name[ULOGD_MAX_KEYLEN]; - unsigned int id; - struct _field *next; + struct ulogd_key *key; }; -/* the database handle we are using */ -static sqlite3 *dbh; +TAILQ_HEAD(field_lh, field); + +#define tailq_for_each(pos, head, link) \ + for (pos = (head).tqh_first; pos != NULL; pos = pos->link.tqe_next) -/* a linked list of the fields the table has */ -static struct _field *fields; -/* buffer for our insert statement */ -static char *stmt; +struct sqlite3_priv { + sqlite3 *dbh; /* database handle we are using */ + struct field_lh fields; + char *stmt; + sqlite3_stmt *p_stmt; + int buffer_size; + int buffer_curr; + struct { + unsigned err_tbl_busy; /* "Table busy" */ + } stats; +}; -/* pointer to the final prepared statement */ -static sqlite3_stmt *p_stmt; +static struct config_keyset sqlite3_kset = { + .num_ces = 3, + .ces = { + { + .key = "db", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "table", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "buffer", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u.value = CFG_BUFFER_DEFAULT, + }, + }, +}; -/* number of statements to buffer before we commit */ -static int buffer_size; +#define db_ce(pi) (pi)->config_kset->ces[0].u.string +#define table_ce(pi) (pi)->config_kset->ces[1].u.string +#define buffer_ce(pi) (pi)->config_kset->ces[2].u.value -/* number of statements currently in the buffer */ -static int buffer_ctr; +/* forward declarations */ +static int sqlite3_createstmt(struct ulogd_pluginstance *); -/* our configuration directives */ -static config_entry_t db_ce = { - .key = "db", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, -}; -static config_entry_t table_ce = { - .next = &db_ce, - .key = "table", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, -}; +static int +add_row(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + int ret; + + ret = sqlite3_step(priv->p_stmt); + if (ret == SQLITE_DONE) + priv->buffer_curr++; + else if (ret == SQLITE_BUSY) + priv->stats.err_tbl_busy++; + else if (ret == SQLITE_ERROR) { + ret = sqlite3_finalize(priv->p_stmt); + priv->p_stmt = NULL; + + if (ret == SQLITE_SCHEMA) + sqlite3_createstmt(pi); + else { + ulogd_log(ULOGD_ERROR, "SQLITE3: step: %s\n", + sqlite3_errmsg(priv->dbh)); + goto err_reset; + } + } + + ret = sqlite3_reset(priv->p_stmt); + + return 0; + + err_reset: + sqlite3_reset(priv->p_stmt); + + return -1; +} -static config_entry_t buffer_ce = { - .next = &table_ce, - .key = "buffer", - .type = CONFIG_TYPE_INT, - .options = CONFIG_OPT_MANDATORY, -}; /* our main output function, called by ulogd */ -static int _sqlite3_output(ulog_iret_t *result) +static int +sqlite3_interp(struct ulogd_pluginstance *pi) { - struct _field *f; - ulog_iret_t *res; - int col_counter; -#ifdef IP_AS_STRING - char *ipaddr; - struct in_addr addr; -#endif + struct sqlite3_priv *priv = (void *)pi->private; + struct field *f; + int ret, i = 1; - col_counter = 1; - for (f = fields; f; f = f->next) { - res = keyh_getres(f->id); + tailq_for_each(f, priv->fields, link) { + struct ulogd_key *k_ret = f->key->u.source; - if (!res) { - ulogd_log(ULOGD_NOTICE, - "no result for %s ?!?\n", f->name); + if (f->key == NULL || !IS_VALID(*k_ret)) { + sqlite3_bind_null(priv->p_stmt, i); + goto next_field; } + + switch (f->key->type) { + case ULOGD_RET_INT8: + ret = sqlite3_bind_int(priv->p_stmt, i, k_ret->u.value.i8); + if (ret != SQLITE_OK) + goto err_bind; + break; + + case ULOGD_RET_INT16: + ret = sqlite3_bind_int(priv->p_stmt, i, k_ret->u.value.i16); + if (ret != SQLITE_OK) + goto err_bind; + break; + + case ULOGD_RET_INT32: + ret = sqlite3_bind_int(priv->p_stmt, i, k_ret->u.value.i32); + if (ret != SQLITE_OK) + goto err_bind; + break; + + case ULOGD_RET_INT64: + ret = sqlite3_bind_int(priv->p_stmt, i, k_ret->u.value.i64); + if (ret != SQLITE_OK) + goto err_bind; + break; + + case ULOGD_RET_UINT8: + ret = sqlite3_bind_int(priv->p_stmt, i, k_ret->u.value.ui8); + if (ret != SQLITE_OK) + goto err_bind; + break; - if (!res || !IS_VALID((*res))) { - /* no result, pass a null */ - sqlite3_bind_null(p_stmt, col_counter); - col_counter++; - continue; + case ULOGD_RET_UINT16: + ret = sqlite3_bind_int(priv->p_stmt, i, k_ret->u.value.ui16); + if (ret != SQLITE_OK) + goto err_bind; + break; + + case ULOGD_RET_UINT32: + ret = sqlite3_bind_int(priv->p_stmt, i, k_ret->u.value.ui32); + if (ret != SQLITE_OK) + goto err_bind; + break; + + case ULOGD_RET_IPADDR: + case ULOGD_RET_UINT64: + ret = sqlite3_bind_int64(priv->p_stmt, i, k_ret->u.value.ui64); + if (ret != SQLITE_OK) + goto err_bind; + break; + + case ULOGD_RET_BOOL: + ret = sqlite3_bind_int(priv->p_stmt, i, k_ret->u.value.b); + if (ret != SQLITE_OK) + goto err_bind; + break; + + case ULOGD_RET_STRING: + ret = sqlite3_bind_text(priv->p_stmt, i, k_ret->u.value.ptr, + strlen(k_ret->u.value.ptr), SQLITE_STATIC); + if (ret != SQLITE_OK) + goto err_bind; + break; + + default: + ulogd_log(ULOGD_NOTICE, "unknown type %d for %s\n", + f->key->type, f->key->name); } - - switch (res->type) { - case ULOGD_RET_INT8: - sqlite3_bind_int(p_stmt,col_counter,res->value.i8); - break; - case ULOGD_RET_INT16: - sqlite3_bind_int(p_stmt,col_counter,res->value.i16); - break; - case ULOGD_RET_INT32: - sqlite3_bind_int(p_stmt,col_counter,res->value.i32); - break; - case ULOGD_RET_INT64: - sqlite3_bind_int64(p_stmt,col_counter,res->value.i64); - break; - case ULOGD_RET_UINT8: - sqlite3_bind_int(p_stmt,col_counter,res->value.ui8); - break; - case ULOGD_RET_UINT16: - sqlite3_bind_int(p_stmt,col_counter,res->value.ui16); - break; - case ULOGD_RET_IPADDR: -#ifdef IP_AS_STRING - memset(&addr, 0, sizeof(addr)); - addr.s_addr = ntohl(res->value.ui32); - ipaddr = inet_ntoa(addr); - sqlite3_bind_text(p_stmt,col_counter,ipaddr,strlen(ipaddr),SQLITE_STATIC); - break; -#endif /* IP_AS_STRING */ - /* EVIL: fallthrough when logging IP as u_int32_t */ - case ULOGD_RET_UINT32: - sqlite3_bind_int(p_stmt,col_counter,res->value.ui32); - break; - case ULOGD_RET_UINT64: - sqlite3_bind_int64(p_stmt,col_counter,res->value.ui64); - break; - case ULOGD_RET_BOOL: - sqlite3_bind_int(p_stmt,col_counter,res->value.b); - break; - case ULOGD_RET_STRING: - sqlite3_bind_text(p_stmt,col_counter,res->value.ptr,strlen(res->value.ptr),SQLITE_STATIC); - break; - default: - ulogd_log(ULOGD_NOTICE, - "unknown type %d for %s\n", - res->type, res->key); - break; - } - - col_counter++; - } - - /* now we have created our statement, insert it */ - if (sqlite3_step(p_stmt) == SQLITE_DONE) { - sqlite3_reset(p_stmt); - buffer_ctr++; - } else { - ulogd_log(ULOGD_ERROR, "sql error during insert: %s\n", - sqlite3_errmsg(dbh)); - return 1; + next_field: + i++; } - /* commit all of the inserts to the database, ie flush buffer */ - if (buffer_ctr >= buffer_size) { - if (sqlite3_exec(dbh,"commit",NULL,NULL,NULL) != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"unable to commit records to db."); - - if (sqlite3_exec(dbh,"begin deferred",NULL,NULL,NULL) != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"unable to begin a new transaction."); + if (add_row(pi) < 0) + return ULOGD_IRET_ERR; - buffer_ctr = 0; - DEBUGP("committing.\n"); - } + return ULOGD_IRET_OK; - return 0; + err_bind: + ulogd_log(ULOGD_ERROR, "SQLITE: bind: %s\n", sqlite3_errmsg(priv->dbh)); + + return ULOGD_IRET_ERR; } #define _SQLITE3_INSERTTEMPL "insert into X (Y) values (Z)" /* create the static part of our insert statement */ -static int _sqlite3_createstmt(void) +static int +sqlite3_createstmt(struct ulogd_pluginstance *pi) { - struct _field *f; - unsigned int size; + struct sqlite3_priv *priv = (void *)pi->private; + struct field *f; char buf[ULOGD_MAX_KEYLEN]; char *underscore; char *stmt_pos; - int col_count; - int i; - - if (stmt) { - ulogd_log(ULOGD_NOTICE, "createstmt called, but stmt" - " already existing\n"); - return 1; - } - - /* caclulate the size for the insert statement */ - size = strlen(_SQLITE3_INSERTTEMPL) + strlen(table_ce.u.string); + int i, cols = 0; - DEBUGP("initial size: %u\n", size); + if (priv->stmt != NULL) + free(priv->stmt); - col_count = 0; - for (f = fields; f; f = f->next) { - /* we need space for the key and a comma, and a ? */ - size += strlen(f->name) + 3; - DEBUGP("size is now %u since adding %s\n",size,f->name); - col_count++; + if ((priv->stmt = calloc(1, 1024)) == NULL) { + ulogd_log(ULOGD_ERROR, "SQLITE3: out of memory\n"); + return -1; } - DEBUGP("there were %d columns\n",col_count); - DEBUGP("after calc name length: %u\n",size); + sprintf(priv->stmt, "insert into %s (", table_ce(pi)); + stmt_pos = priv->stmt + strlen(priv->stmt); - ulogd_log(ULOGD_DEBUG, "allocating %u bytes for statement\n", size); + tailq_for_each(f, priv->fields, link) { + strncpy(buf, f->name, ULOGD_MAX_KEYLEN); - stmt = (char *) malloc(size); - - if (!stmt) { - ulogd_log(ULOGD_ERROR, "OOM!\n"); - return 1; - } - - sprintf(stmt, "insert into %s (", table_ce.u.string); - stmt_pos = stmt + strlen(stmt); - - for (f = fields; f; f = f->next) { - strncpy(buf, f->name, ULOGD_MAX_KEYLEN); while ((underscore = strchr(buf, '.'))) *underscore = '_'; + sprintf(stmt_pos, "%s,", buf); - stmt_pos = stmt + strlen(stmt); + stmt_pos = priv->stmt + strlen(priv->stmt); + + cols++; } *(stmt_pos - 1) = ')'; sprintf(stmt_pos, " values ("); - stmt_pos = stmt + strlen(stmt); + stmt_pos = priv->stmt + strlen(priv->stmt); - for (i = 0; i < col_count - 1; i++) { + for (i = 0; i < cols - 1; i++) { sprintf(stmt_pos,"?,"); stmt_pos += 2; } sprintf(stmt_pos, "?)"); - ulogd_log(ULOGD_DEBUG, "stmt='%s'\n", stmt); + ulogd_log(ULOGD_DEBUG, "%s: stmt='%s'\n", pi->id, priv->stmt); DEBUGP("about to prepare statement.\n"); - sqlite3_prepare(dbh,stmt,-1,&p_stmt,0); + sqlite3_prepare(priv->dbh, priv->stmt, -1, &priv->p_stmt, 0); + if (priv->p_stmt == NULL) { + ulogd_log(ULOGD_ERROR, "SQLITE3: prepare: %s\n", + sqlite3_errmsg(priv->dbh)); + return 1; + } DEBUGP("statement prepared.\n"); - if (!p_stmt) { - ulogd_log(ULOGD_ERROR,"unable to prepare statement"); - return 1; + return 0; +} + + +static struct ulogd_key * +ulogd_find_key(struct ulogd_pluginstance *pi, const char *name) +{ + int i; + + for (i = 0; i < pi->input.num_keys; i++) { + if (strcmp(pi->input.keys[i].name, name) == 0) + return &pi->input.keys[i]; + } + + return NULL; +} + +#define SELECT_ALL_STR "select * from " +#define SELECT_ALL_LEN sizeof(SELECT_ALL_STR) + +static int +db_count_cols(struct ulogd_pluginstance *pi, sqlite3_stmt **stmt) +{ + struct sqlite3_priv *priv = (void *)pi->private; + char query[SELECT_ALL_LEN + CONFIG_VAL_STRING_LEN] = SELECT_ALL_STR; + + strncat(query, table_ce(pi), LINE_LEN); + + if (sqlite3_prepare(priv->dbh, query, -1, stmt, 0) != SQLITE_OK) + return -1; + + return sqlite3_column_count(*stmt); +} + + +/* FIXME make this configurable */ +#define SQL_CREATE_STR \ + "create table daily(ip_saddr integer, ip_daddr integer, " \ + "ip_protocol integer, l4_dport integer, raw_in_pktlen integer, " \ + "raw_in_pktcount integer, raw_out_pktlen integer, " \ + "raw_out_pktcount integer, flow_start_day integer, " \ + "flow_start_sec integer, flow_duration integer)" + +static int +db_create_tbl(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + char *errmsg; + int ret; + + sqlite3_exec(priv->dbh, "drop table daily", NULL, NULL, NULL); + + ret = sqlite3_exec(priv->dbh, SQL_CREATE_STR, NULL, NULL, &errmsg); + if (ret != SQLITE_OK) { + ulogd_log(ULOGD_ERROR, "SQLITE3: create table: %s\n", errmsg); + sqlite3_free(errmsg); + + return -1; } return 0; } -/* length of "select * from \0" */ -#define SQLITE_SELECT_LEN 15 -/* find out which columns the table has */ -static int _sqlite3_get_columns(const char *table) +/* initialize DB, possibly creating it */ +static int +sqlite3_init_db(struct ulogd_pluginstance *pi) { + struct sqlite3_priv *priv = (void *)pi->private; char buf[ULOGD_MAX_KEYLEN]; - char query[SQLITE_SELECT_LEN + CONFIG_VAL_STRING_LEN] = "select * from \0"; char *underscore; - struct _field *f; + struct field *f; sqlite3_stmt *schema_stmt; - int column; - int result; - int id; + int col, num_cols; - if (!dbh) - return 1; + if (priv->dbh == NULL) + return -1; - strncat(query,table,LINE_LEN); - - result = sqlite3_prepare(dbh,query,-1,&schema_stmt,0); - - if (result != SQLITE_OK) - return 1; + num_cols = db_count_cols(pi, &schema_stmt); + if (num_cols != DB_NUM_COLS) { + if (db_create_tbl(pi) < 0) + return -1; - for (column = 0; column < sqlite3_column_count(schema_stmt); column++) { - /* replace all underscores with dots */ - strncpy(buf, sqlite3_column_name(schema_stmt,column), ULOGD_MAX_KEYLEN); - while ((underscore = strchr(buf, '_'))) - *underscore = '.'; + num_cols = db_count_cols(pi, &schema_stmt); + } - DEBUGP("field '%s' found: ", buf); + for (col = 0; col < num_cols; col++) { + strncpy(buf, sqlite3_column_name(schema_stmt, col), ULOGD_MAX_KEYLEN); - if (!(id = keyh_getid(buf))) { - DEBUGP(" no keyid!\n"); - continue; - } + /* replace all underscores with dots */ + while ((underscore = strchr(buf, '_')) != NULL) + *underscore = '.'; - DEBUGP("keyid %u\n", id); + DEBUGP("field '%s' found\n", buf); /* prepend it to the linked list */ - f = (struct _field *) malloc(sizeof *f); - if (!f) { - ulogd_log(ULOGD_ERROR, "OOM!\n"); - return 1; + if ((f = calloc(1, sizeof(struct field))) == NULL) { + ulogd_log(ULOGD_ERROR, "SQLITE3: out of memory\n"); + return -1; } strncpy(f->name, buf, ULOGD_MAX_KEYLEN); - f->id = id; - f->next = fields; - fields = f; + + if ((f->key = ulogd_find_key(pi, buf)) == NULL) + return -1; + + TAILQ_INSERT_TAIL(&priv->fields, f, link); } sqlite3_finalize(schema_stmt); + return 0; } -/** - * make connection and select database - * returns 0 if database failed to open. - */ -static int _sqlite3_open_db(char *db_file) -{ - DEBUGP("opening database.\n"); - return sqlite3_open(db_file,&dbh); -} +#define SQLITE3_BUSY_TIMEOUT 300 -/* give us an opportunity to close the database down properly */ -static void _sqlite3_fini(void) +static int +sqlite3_configure(struct ulogd_pluginstance *pi, + struct ulogd_pluginstance_stack *stack) { - DEBUGP("cleaning up db connection\n"); + /* struct sqlite_priv *priv = (void *)pi->private; */ - /* free up our prepared statements so we can close the db */ - if (p_stmt) { - sqlite3_finalize(p_stmt); - DEBUGP("prepared statement finalized\n"); - } + config_parse_file(pi->id, pi->config_kset); - if (dbh) { - int result; - /* flush the remaining insert statements to the database. */ - result = sqlite3_exec(dbh,"commit",NULL,NULL,NULL); + if (ulogd_wildcard_inputkeys(pi) < 0) + return -1; - if (result != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"unable to commit remaining records to db."); + DEBUGP("%s: db='%s' table='%s'\n", pi->id, db_ce(pi), table_ce(pi)); - sqlite3_close(dbh); - DEBUGP("database file closed\n"); - } + return 0; } -#define _SQLITE3_BUSY_TIMEOUT 300 - -static int _sqlite3_init(void) +static int +sqlite3_start(struct ulogd_pluginstance *pi) { - /* have the opts parsed */ - config_parse_file("SQLITE3", &buffer_ce); + struct sqlite3_priv *priv = (void *)pi->private; - if (_sqlite3_open_db(db_ce.u.string)) { - ulogd_log(ULOGD_ERROR, "can't open the database file\n"); - return 1; + TAILQ_INIT(&priv->fields); + + if (sqlite3_open(db_ce(pi), &priv->dbh) != SQLITE_OK) { + ulogd_log(ULOGD_ERROR, "SQLITE3: %s\n", sqlite3_errmsg(priv->dbh)); + return -1; } /* set the timeout so that we don't automatically fail - * if the table is busy. */ - sqlite3_busy_timeout(dbh, _SQLITE3_BUSY_TIMEOUT); + if the table is busy */ + sqlite3_busy_timeout(priv->dbh, SQLITE3_BUSY_TIMEOUT); /* read the fieldnames to know which values to insert */ - if (_sqlite3_get_columns(table_ce.u.string)) { - ulogd_log(ULOGD_ERROR, "unable to get sqlite columns\n"); - return 1; - } + if (sqlite3_init_db(pi) < 0) + return -1; /* initialize our buffer size and counter */ - buffer_size = buffer_ce.u.value; - buffer_ctr = 0; + priv->buffer_size = buffer_ce(pi); + priv->buffer_curr = 0; - DEBUGP("Have a buffer size of : %d\n", buffer_size); + /* create and prepare the actual insert statement */ + sqlite3_createstmt(pi); - if (sqlite3_exec(dbh,"begin deferred",NULL,NULL,NULL) != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"can't create a new transaction\n"); + return 0; +} - /* create and prepare the actual insert statement */ - _sqlite3_createstmt(); +/* give us an opportunity to close the database down properly */ +static int +sqlite3_stop(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + + /* free up our prepared statements so we can close the db */ + if (priv->p_stmt) { + sqlite3_finalize(priv->p_stmt); + DEBUGP("prepared statement finalized\n"); + } + + if (priv->dbh == NULL) + return -1; + + sqlite3_close(priv->dbh); return 0; } -static ulog_output_t _sqlite3_plugin = { - .name = "sqlite3", - .output = &_sqlite3_output, - .init = &_sqlite3_init, - .fini = &_sqlite3_fini, +static struct ulogd_plugin sqlite3_plugin = { + .name = "SQLITE3", + .input = { + .type = ULOGD_DTYPE_PACKET | ULOGD_DTYPE_FLOW, + }, + .output = { + .type = ULOGD_DTYPE_SINK, + }, + .config_kset = &sqlite3_kset, + .priv_size = sizeof(struct sqlite3_priv), + .configure = sqlite3_configure, + .start = sqlite3_start, + .stop = sqlite3_stop, + .interp = sqlite3_interp, + .version = ULOGD_VERSION, }; -void _init(void) +static void init(void) __attribute__((constructor)); + +static void +init(void) { - register_output(&_sqlite3_plugin); + ulogd_register_plugin(&sqlite3_plugin); } - -#endif |