summaryrefslogtreecommitdiffstats
path: root/input
diff options
context:
space:
mode:
Diffstat (limited to 'input')
-rw-r--r--input/flow/ulogd_inpflow_NFCT.c238
1 files changed, 210 insertions, 28 deletions
diff --git a/input/flow/ulogd_inpflow_NFCT.c b/input/flow/ulogd_inpflow_NFCT.c
index a7d5d1f..1390af4 100644
--- a/input/flow/ulogd_inpflow_NFCT.c
+++ b/input/flow/ulogd_inpflow_NFCT.c
@@ -15,6 +15,7 @@
*
* 11 May 2008, Pablo Neira Ayuso <pablo@netfilter.org>
* Use a generic hashtable to store the existing flows
+ * Add netlink overrun handling
*
* TODO:
* - add nanosecond-accurate packet receive timestamp of event-changing
@@ -53,9 +54,14 @@ struct ct_timestamp {
struct nfct_pluginstance {
struct nfct_handle *cth;
+ struct nfct_handle *ovh; /* overrun handler */
+ struct nfct_handle *pgh; /* purge handler */
struct ulogd_fd nfct_fd;
+ struct ulogd_fd nfct_ov;
struct ulogd_timer timer;
+ struct ulogd_timer ov_timer; /* overrun retry timer */
struct hashtable *ct_active;
+ int nlbufsiz; /* current netlink buffer size */
};
#define HTABLE_SIZE (8192)
@@ -63,7 +69,7 @@ struct nfct_pluginstance {
#define EVENT_MASK NF_NETLINK_CONNTRACK_NEW | NF_NETLINK_CONNTRACK_DESTROY
static struct config_keyset nfct_kset = {
- .num_ces = 5,
+ .num_ces = 7,
.ces = {
{
.key = "pollinterval",
@@ -95,7 +101,18 @@ static struct config_keyset nfct_kset = {
.options = CONFIG_OPT_NONE,
.u.value = EVENT_MASK,
},
-
+ {
+ .key = "netlink_socket_buffer_size",
+ .type = CONFIG_TYPE_INT,
+ .options = CONFIG_OPT_NONE,
+ .u.value = 0,
+ },
+ {
+ .key = "netlink_socket_buffer_maxsize",
+ .type = CONFIG_TYPE_INT,
+ .options = CONFIG_OPT_NONE,
+ .u.value = 0,
+ },
},
};
#define pollint_ce(x) (x->ces[0])
@@ -103,6 +120,8 @@ static struct config_keyset nfct_kset = {
#define buckets_ce(x) (x->ces[2])
#define maxentries_ce(x) (x->ces[3])
#define eventmask_ce(x) (x->ces[4])
+#define nlsockbufsize_ce(x) (x->ces[5])
+#define nlsockbufmaxsize_ce(x) (x->ces[6])
enum nfct_keys {
NFCT_ORIG_IP_SADDR = 0,
@@ -529,6 +548,25 @@ static int propagate_ct(struct ulogd_pluginstance *upi,
return 0;
}
+static void
+do_propagate_ct(struct ulogd_pluginstance *upi,
+ struct nf_conntrack *ct,
+ int type,
+ struct ct_timestamp *ts)
+{
+ struct ulogd_pluginstance *npi = NULL;
+
+ /* since we support the re-use of one instance in
+ * several different stacks, we duplicate the message
+ * to let them know */
+ llist_for_each_entry(npi, &upi->plist, plist) {
+ if (propagate_ct(npi, ct, type, ts) != 0)
+ break;
+ }
+
+ propagate_ct(upi, ct, type, ts);
+}
+
/* XXX: pollinterval needs a different handler */
static int event_handler(enum nf_conntrack_msg_type type,
struct nf_conntrack *ct,
@@ -541,21 +579,9 @@ static int event_handler(enum nf_conntrack_msg_type type,
struct ct_timestamp tmp = {
.ct = ct,
};
- struct ulogd_pluginstance *npi = NULL;
- int ret = 0;
if (!usehash_ce(upi->config_kset).u.value && type == NFCT_T_DESTROY) {
- /* since we support the re-use of one instance in
- * several different stacks, we duplicate the message
- * to let them know */
- llist_for_each_entry(npi, &upi->plist, plist) {
- ret = propagate_ct(npi, ct, type, ts);
- if (ret != 0)
- break;
- }
-
- propagate_ct(upi, ct, type, ts);
-
+ do_propagate_ct(upi, ct, type, ts);
return NFCT_CB_CONTINUE;
}
@@ -579,16 +605,7 @@ static int event_handler(enum nf_conntrack_msg_type type,
if (ts)
gettimeofday(&ts->time[STOP], NULL);
- /* since we support the re-use of one instance in
- * several different stacks, we duplicate the message
- * to let them know */
- llist_for_each_entry(npi, &upi->plist, plist) {
- ret = propagate_ct(npi, ct, type, ts);
- if (ret != 0)
- break;
- }
-
- propagate_ct(upi, ct, type, ts);
+ do_propagate_ct(upi, ct, type, ts);
if (ts) {
hashtable_del(cpi->ct_active, ts);
@@ -603,24 +620,138 @@ static int event_handler(enum nf_conntrack_msg_type type,
return NFCT_CB_CONTINUE;
}
+static int setnlbufsiz(struct ulogd_pluginstance *upi, int size)
+{
+ struct nfct_pluginstance *cpi =
+ (struct nfct_pluginstance *)upi->private;
+
+ if (size < nlsockbufmaxsize_ce(upi->config_kset).u.value) {
+ cpi->nlbufsiz = nfnl_rcvbufsiz(nfct_nfnlh(cpi->cth), size);
+ return 1;
+ }
+
+ ulogd_log(ULOGD_NOTICE, "Maximum buffer size (%d) in NFCT has been "
+ "reached. Please, consider rising "
+ "`netlink_socket_buffer_size` and "
+ "`netlink_socket_buffer_maxsize` "
+ "clauses.\n", cpi->nlbufsiz);
+ return 0;
+}
+
static int read_cb_nfct(int fd, unsigned int what, void *param)
{
struct nfct_pluginstance *cpi = (struct nfct_pluginstance *) param;
+ struct ulogd_pluginstance *upi = container_of(param,
+ struct ulogd_pluginstance,
+ private);
if (!(what & ULOGD_FD_READ))
return 0;
- /* FIXME: implement this */
- nfct_catch(cpi->cth);
+ if (nfct_catch(cpi->cth) == -1) {
+ if (errno == ENOBUFS) {
+ int family = AF_UNSPEC;
+
+ if (nlsockbufmaxsize_ce(upi->config_kset).u.value) {
+ int s = cpi->nlbufsiz * 2;
+ if (setnlbufsiz(upi, s)) {
+ ulogd_log(ULOGD_NOTICE,
+ "We are losing events, "
+ "increasing buffer size "
+ "to %d\n", cpi->nlbufsiz);
+ }
+ } else {
+ ulogd_log(ULOGD_NOTICE,
+ "We are losing events. Please, "
+ "consider using the clauses "
+ "`netlink_socket_buffer_size' and "
+ "`netlink_socket_buffer_maxsize\n'");
+ }
+
+ nfct_send(cpi->ovh, NFCT_Q_DUMP, &family);
+ /* TODO: configurable retry timer */
+ ulogd_add_timer(&cpi->ov_timer, 2);
+ }
+ }
+
+ return 0;
+}
+
+static int do_purge(void *data1, void *data2)
+{
+ int ret;
+ struct ulogd_pluginstance *upi = data1;
+ struct ct_timestamp *ts = data2;
+ struct nfct_pluginstance *cpi =
+ (struct nfct_pluginstance *) upi->private;
+
+ /* if it is not in kernel anymore, purge it */
+ ret = nfct_query(cpi->pgh, NFCT_Q_GET, ts->ct);
+ if (ret == -1 && errno == ENOENT) {
+ do_propagate_ct(upi, ts->ct, NFCT_T_DESTROY, ts);
+ hashtable_del(cpi->ct_active, ts);
+ free(ts->ct);
+ }
+
+ return 0;
+}
+
+static int overrun_handler(enum nf_conntrack_msg_type type,
+ struct nf_conntrack *ct,
+ void *data)
+{
+ struct ulogd_pluginstance *upi = data;
+ struct nfct_pluginstance *cpi =
+ (struct nfct_pluginstance *) upi->private;
+ struct ct_timestamp *ts, tmp = {
+ .ct = ct,
+ };
+
+ /* if it does not exist, add it */
+ if (!hashtable_get(cpi->ct_active, &tmp)) {
+ ts = hashtable_add(cpi->ct_active, &tmp);
+ gettimeofday(&ts->time[START], NULL); /* do our best here */
+ return NFCT_CB_STOLEN;
+ }
+
+ return NFCT_CB_CONTINUE;
+}
+
+static int read_cb_ovh(int fd, unsigned int what, void *param)
+{
+ struct nfct_pluginstance *cpi = (struct nfct_pluginstance *) param;
+ struct ulogd_pluginstance *upi = container_of(param,
+ struct ulogd_pluginstance,
+ private);
+
+ if (!(what & ULOGD_FD_READ))
+ return 0;
+
+ /* handle the resync request, update our hashtable */
+ if (nfct_catch(cpi->ovh) == -1) {
+ /* enobufs in the overrun buffer? very rare */
+ if (errno == ENOBUFS) {
+ int family = AF_UNSPEC;
+
+ nfct_send(cpi->ovh, NFCT_Q_DUMP, &family);
+ /* TODO: configurable retry timer */
+ ulogd_add_timer(&cpi->ov_timer, 2);
+ }
+ }
+
+ /* purge unexistent entries */
+ hashtable_iterate(cpi->ct_active, upi, do_purge);
+
return 0;
}
static int get_ctr_zero(struct ulogd_pluginstance *upi)
{
+ int family = 0; /* any */
struct nfct_pluginstance *cpi =
(struct nfct_pluginstance *)upi->private;
- return nfct_dump_conntrack_table_reset_counters(cpi->cth, AF_INET);
+ return nfct_query(cpi->cth, NFCT_Q_DUMP_RESET, &family);
}
static void getctr_timer_cb(struct ulogd_timer *t, void *data)
@@ -652,6 +783,18 @@ static int configure_nfct(struct ulogd_pluginstance *upi,
return 0;
}
+static void overrun_timeout(struct ulogd_timer *a, void *data)
+{
+ int family = AF_UNSPEC;
+ struct ulogd_pluginstance *upi = data;
+ struct nfct_pluginstance *cpi =
+ (struct nfct_pluginstance *)upi->private;
+
+ nfct_send(cpi->ovh, NFCT_Q_DUMP, &family);
+ /* TODO: configurable retry timer */
+ ulogd_add_timer(&cpi->ov_timer, 2);
+}
+
static int constructor_nfct(struct ulogd_pluginstance *upi)
{
struct nfct_pluginstance *cpi =
@@ -666,6 +809,28 @@ static int constructor_nfct(struct ulogd_pluginstance *upi)
nfct_callback_register(cpi->cth, NFCT_T_ALL, &event_handler, upi);
+ if (nlsockbufsize_ce(upi->config_kset).u.value) {
+ setnlbufsiz(upi, nlsockbufsize_ce(upi->config_kset).u.value);
+ ulogd_log(ULOGD_NOTICE, "NFCT netlink buffer size has been "
+ "set to %d\n", cpi->nlbufsiz);
+ }
+
+ cpi->ovh = nfct_open(NFNL_SUBSYS_CTNETLINK, 0);
+ if (!cpi->ovh) {
+ ulogd_log(ULOGD_FATAL, "error opening ctnetlink\n");
+ return -1;
+ }
+
+ nfct_callback_register(cpi->ovh, NFCT_T_ALL, &overrun_handler, upi);
+
+ cpi->pgh = nfct_open(NFNL_SUBSYS_CTNETLINK, 0);
+ if (!cpi->pgh) {
+ ulogd_log(ULOGD_FATAL, "error opening ctnetlink\n");
+ return -1;
+ }
+
+ ulogd_init_timer(&cpi->ov_timer, upi, overrun_timeout);
+
cpi->nfct_fd.fd = nfct_fd(cpi->cth);
cpi->nfct_fd.cb = &read_cb_nfct;
cpi->nfct_fd.data = cpi;
@@ -673,6 +838,13 @@ static int constructor_nfct(struct ulogd_pluginstance *upi)
ulogd_register_fd(&cpi->nfct_fd);
+ cpi->nfct_ov.fd = nfct_fd(cpi->ovh);
+ cpi->nfct_ov.cb = &read_cb_ovh;
+ cpi->nfct_ov.data = cpi;
+ cpi->nfct_ov.when = ULOGD_FD_READ;
+
+ ulogd_register_fd(&cpi->nfct_ov);
+
if (usehash_ce(upi->config_kset).u.value != 0) {
cpi->ct_active =
hashtable_create(buckets_ce(upi->config_kset).u.value,
@@ -683,6 +855,8 @@ static int constructor_nfct(struct ulogd_pluginstance *upi)
if (!cpi->ct_active) {
ulogd_log(ULOGD_FATAL, "error allocating hash\n");
nfct_close(cpi->cth);
+ nfct_close(cpi->ovh);
+ nfct_close(cpi->pgh);
return -1;
}
}
@@ -701,6 +875,14 @@ static int destructor_nfct(struct ulogd_pluginstance *pi)
if (rc < 0)
return rc;
+ rc = nfct_close(cpi->ovh);
+ if (rc < 0)
+ return rc;
+
+ rc = nfct_close(cpi->pgh);
+ if (rc < 0)
+ return rc;
+
return 0;
}