diff options
-rw-r--r-- | input/flow/ulogd_inpflow_NFCT.c | 238 | ||||
-rw-r--r-- | ulogd.conf.in | 2 |
2 files changed, 212 insertions, 28 deletions
diff --git a/input/flow/ulogd_inpflow_NFCT.c b/input/flow/ulogd_inpflow_NFCT.c index a7d5d1f..1390af4 100644 --- a/input/flow/ulogd_inpflow_NFCT.c +++ b/input/flow/ulogd_inpflow_NFCT.c @@ -15,6 +15,7 @@ * * 11 May 2008, Pablo Neira Ayuso <pablo@netfilter.org> * Use a generic hashtable to store the existing flows + * Add netlink overrun handling * * TODO: * - add nanosecond-accurate packet receive timestamp of event-changing @@ -53,9 +54,14 @@ struct ct_timestamp { struct nfct_pluginstance { struct nfct_handle *cth; + struct nfct_handle *ovh; /* overrun handler */ + struct nfct_handle *pgh; /* purge handler */ struct ulogd_fd nfct_fd; + struct ulogd_fd nfct_ov; struct ulogd_timer timer; + struct ulogd_timer ov_timer; /* overrun retry timer */ struct hashtable *ct_active; + int nlbufsiz; /* current netlink buffer size */ }; #define HTABLE_SIZE (8192) @@ -63,7 +69,7 @@ struct nfct_pluginstance { #define EVENT_MASK NF_NETLINK_CONNTRACK_NEW | NF_NETLINK_CONNTRACK_DESTROY static struct config_keyset nfct_kset = { - .num_ces = 5, + .num_ces = 7, .ces = { { .key = "pollinterval", @@ -95,7 +101,18 @@ static struct config_keyset nfct_kset = { .options = CONFIG_OPT_NONE, .u.value = EVENT_MASK, }, - + { + .key = "netlink_socket_buffer_size", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u.value = 0, + }, + { + .key = "netlink_socket_buffer_maxsize", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u.value = 0, + }, }, }; #define pollint_ce(x) (x->ces[0]) @@ -103,6 +120,8 @@ static struct config_keyset nfct_kset = { #define buckets_ce(x) (x->ces[2]) #define maxentries_ce(x) (x->ces[3]) #define eventmask_ce(x) (x->ces[4]) +#define nlsockbufsize_ce(x) (x->ces[5]) +#define nlsockbufmaxsize_ce(x) (x->ces[6]) enum nfct_keys { NFCT_ORIG_IP_SADDR = 0, @@ -529,6 +548,25 @@ static int propagate_ct(struct ulogd_pluginstance *upi, return 0; } +static void +do_propagate_ct(struct ulogd_pluginstance *upi, + struct nf_conntrack *ct, + int type, + struct ct_timestamp *ts) +{ + struct ulogd_pluginstance *npi = NULL; + + /* since we support the re-use of one instance in + * several different stacks, we duplicate the message + * to let them know */ + llist_for_each_entry(npi, &upi->plist, plist) { + if (propagate_ct(npi, ct, type, ts) != 0) + break; + } + + propagate_ct(upi, ct, type, ts); +} + /* XXX: pollinterval needs a different handler */ static int event_handler(enum nf_conntrack_msg_type type, struct nf_conntrack *ct, @@ -541,21 +579,9 @@ static int event_handler(enum nf_conntrack_msg_type type, struct ct_timestamp tmp = { .ct = ct, }; - struct ulogd_pluginstance *npi = NULL; - int ret = 0; if (!usehash_ce(upi->config_kset).u.value && type == NFCT_T_DESTROY) { - /* since we support the re-use of one instance in - * several different stacks, we duplicate the message - * to let them know */ - llist_for_each_entry(npi, &upi->plist, plist) { - ret = propagate_ct(npi, ct, type, ts); - if (ret != 0) - break; - } - - propagate_ct(upi, ct, type, ts); - + do_propagate_ct(upi, ct, type, ts); return NFCT_CB_CONTINUE; } @@ -579,16 +605,7 @@ static int event_handler(enum nf_conntrack_msg_type type, if (ts) gettimeofday(&ts->time[STOP], NULL); - /* since we support the re-use of one instance in - * several different stacks, we duplicate the message - * to let them know */ - llist_for_each_entry(npi, &upi->plist, plist) { - ret = propagate_ct(npi, ct, type, ts); - if (ret != 0) - break; - } - - propagate_ct(upi, ct, type, ts); + do_propagate_ct(upi, ct, type, ts); if (ts) { hashtable_del(cpi->ct_active, ts); @@ -603,24 +620,138 @@ static int event_handler(enum nf_conntrack_msg_type type, return NFCT_CB_CONTINUE; } +static int setnlbufsiz(struct ulogd_pluginstance *upi, int size) +{ + struct nfct_pluginstance *cpi = + (struct nfct_pluginstance *)upi->private; + + if (size < nlsockbufmaxsize_ce(upi->config_kset).u.value) { + cpi->nlbufsiz = nfnl_rcvbufsiz(nfct_nfnlh(cpi->cth), size); + return 1; + } + + ulogd_log(ULOGD_NOTICE, "Maximum buffer size (%d) in NFCT has been " + "reached. Please, consider rising " + "`netlink_socket_buffer_size` and " + "`netlink_socket_buffer_maxsize` " + "clauses.\n", cpi->nlbufsiz); + return 0; +} + static int read_cb_nfct(int fd, unsigned int what, void *param) { struct nfct_pluginstance *cpi = (struct nfct_pluginstance *) param; + struct ulogd_pluginstance *upi = container_of(param, + struct ulogd_pluginstance, + private); if (!(what & ULOGD_FD_READ)) return 0; - /* FIXME: implement this */ - nfct_catch(cpi->cth); + if (nfct_catch(cpi->cth) == -1) { + if (errno == ENOBUFS) { + int family = AF_UNSPEC; + + if (nlsockbufmaxsize_ce(upi->config_kset).u.value) { + int s = cpi->nlbufsiz * 2; + if (setnlbufsiz(upi, s)) { + ulogd_log(ULOGD_NOTICE, + "We are losing events, " + "increasing buffer size " + "to %d\n", cpi->nlbufsiz); + } + } else { + ulogd_log(ULOGD_NOTICE, + "We are losing events. Please, " + "consider using the clauses " + "`netlink_socket_buffer_size' and " + "`netlink_socket_buffer_maxsize\n'"); + } + + nfct_send(cpi->ovh, NFCT_Q_DUMP, &family); + /* TODO: configurable retry timer */ + ulogd_add_timer(&cpi->ov_timer, 2); + } + } + + return 0; +} + +static int do_purge(void *data1, void *data2) +{ + int ret; + struct ulogd_pluginstance *upi = data1; + struct ct_timestamp *ts = data2; + struct nfct_pluginstance *cpi = + (struct nfct_pluginstance *) upi->private; + + /* if it is not in kernel anymore, purge it */ + ret = nfct_query(cpi->pgh, NFCT_Q_GET, ts->ct); + if (ret == -1 && errno == ENOENT) { + do_propagate_ct(upi, ts->ct, NFCT_T_DESTROY, ts); + hashtable_del(cpi->ct_active, ts); + free(ts->ct); + } + + return 0; +} + +static int overrun_handler(enum nf_conntrack_msg_type type, + struct nf_conntrack *ct, + void *data) +{ + struct ulogd_pluginstance *upi = data; + struct nfct_pluginstance *cpi = + (struct nfct_pluginstance *) upi->private; + struct ct_timestamp *ts, tmp = { + .ct = ct, + }; + + /* if it does not exist, add it */ + if (!hashtable_get(cpi->ct_active, &tmp)) { + ts = hashtable_add(cpi->ct_active, &tmp); + gettimeofday(&ts->time[START], NULL); /* do our best here */ + return NFCT_CB_STOLEN; + } + + return NFCT_CB_CONTINUE; +} + +static int read_cb_ovh(int fd, unsigned int what, void *param) +{ + struct nfct_pluginstance *cpi = (struct nfct_pluginstance *) param; + struct ulogd_pluginstance *upi = container_of(param, + struct ulogd_pluginstance, + private); + + if (!(what & ULOGD_FD_READ)) + return 0; + + /* handle the resync request, update our hashtable */ + if (nfct_catch(cpi->ovh) == -1) { + /* enobufs in the overrun buffer? very rare */ + if (errno == ENOBUFS) { + int family = AF_UNSPEC; + + nfct_send(cpi->ovh, NFCT_Q_DUMP, &family); + /* TODO: configurable retry timer */ + ulogd_add_timer(&cpi->ov_timer, 2); + } + } + + /* purge unexistent entries */ + hashtable_iterate(cpi->ct_active, upi, do_purge); + return 0; } static int get_ctr_zero(struct ulogd_pluginstance *upi) { + int family = 0; /* any */ struct nfct_pluginstance *cpi = (struct nfct_pluginstance *)upi->private; - return nfct_dump_conntrack_table_reset_counters(cpi->cth, AF_INET); + return nfct_query(cpi->cth, NFCT_Q_DUMP_RESET, &family); } static void getctr_timer_cb(struct ulogd_timer *t, void *data) @@ -652,6 +783,18 @@ static int configure_nfct(struct ulogd_pluginstance *upi, return 0; } +static void overrun_timeout(struct ulogd_timer *a, void *data) +{ + int family = AF_UNSPEC; + struct ulogd_pluginstance *upi = data; + struct nfct_pluginstance *cpi = + (struct nfct_pluginstance *)upi->private; + + nfct_send(cpi->ovh, NFCT_Q_DUMP, &family); + /* TODO: configurable retry timer */ + ulogd_add_timer(&cpi->ov_timer, 2); +} + static int constructor_nfct(struct ulogd_pluginstance *upi) { struct nfct_pluginstance *cpi = @@ -666,6 +809,28 @@ static int constructor_nfct(struct ulogd_pluginstance *upi) nfct_callback_register(cpi->cth, NFCT_T_ALL, &event_handler, upi); + if (nlsockbufsize_ce(upi->config_kset).u.value) { + setnlbufsiz(upi, nlsockbufsize_ce(upi->config_kset).u.value); + ulogd_log(ULOGD_NOTICE, "NFCT netlink buffer size has been " + "set to %d\n", cpi->nlbufsiz); + } + + cpi->ovh = nfct_open(NFNL_SUBSYS_CTNETLINK, 0); + if (!cpi->ovh) { + ulogd_log(ULOGD_FATAL, "error opening ctnetlink\n"); + return -1; + } + + nfct_callback_register(cpi->ovh, NFCT_T_ALL, &overrun_handler, upi); + + cpi->pgh = nfct_open(NFNL_SUBSYS_CTNETLINK, 0); + if (!cpi->pgh) { + ulogd_log(ULOGD_FATAL, "error opening ctnetlink\n"); + return -1; + } + + ulogd_init_timer(&cpi->ov_timer, upi, overrun_timeout); + cpi->nfct_fd.fd = nfct_fd(cpi->cth); cpi->nfct_fd.cb = &read_cb_nfct; cpi->nfct_fd.data = cpi; @@ -673,6 +838,13 @@ static int constructor_nfct(struct ulogd_pluginstance *upi) ulogd_register_fd(&cpi->nfct_fd); + cpi->nfct_ov.fd = nfct_fd(cpi->ovh); + cpi->nfct_ov.cb = &read_cb_ovh; + cpi->nfct_ov.data = cpi; + cpi->nfct_ov.when = ULOGD_FD_READ; + + ulogd_register_fd(&cpi->nfct_ov); + if (usehash_ce(upi->config_kset).u.value != 0) { cpi->ct_active = hashtable_create(buckets_ce(upi->config_kset).u.value, @@ -683,6 +855,8 @@ static int constructor_nfct(struct ulogd_pluginstance *upi) if (!cpi->ct_active) { ulogd_log(ULOGD_FATAL, "error allocating hash\n"); nfct_close(cpi->cth); + nfct_close(cpi->ovh); + nfct_close(cpi->pgh); return -1; } } @@ -701,6 +875,14 @@ static int destructor_nfct(struct ulogd_pluginstance *pi) if (rc < 0) return rc; + rc = nfct_close(cpi->ovh); + if (rc < 0) + return rc; + + rc = nfct_close(cpi->pgh); + if (rc < 0) + return rc; + return 0; } diff --git a/ulogd.conf.in b/ulogd.conf.in index 4339650..a63026a 100644 --- a/ulogd.conf.in +++ b/ulogd.conf.in @@ -87,6 +87,8 @@ plugin="@libdir@/ulogd/ulogd_raw2packet_BASE.so" #stack=ct1:NFCT,ip2str1:IP2STR,nacct1:NACCT [ct1] +#netlink_socket_buffer_size=217088 +#netlink_socket_buffer_maxsize=1085440 # IPv4 logging through NFLOG [log1] |