summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHolger Eitzenberger <heitzenberger@astaro.com>2009-11-13 10:24:02 +0100
committerHarald Welte <laforge@gnumonks.org>2010-10-21 19:15:40 +0200
commit9bc3004f3c0e194afd4604b88c43d40c3d229335 (patch)
treef7c6dc0b5773131868c1e8eba65df01fd3ec1123
parent2c82a9de62637cca89adc82fbd78b41e65b35115 (diff)
IPFIX: implement timer handling
Signed-off-by: Holger Eitzenberger <holger@eitzenberger.org>
-rw-r--r--output/ipfix/ulogd_output_IPFIX.c131
1 files changed, 68 insertions, 63 deletions
diff --git a/output/ipfix/ulogd_output_IPFIX.c b/output/ipfix/ulogd_output_IPFIX.c
index 25ee27b..7b74759 100644
--- a/output/ipfix/ulogd_output_IPFIX.c
+++ b/output/ipfix/ulogd_output_IPFIX.c
@@ -137,9 +137,70 @@ ipfix_ufd_cb(int fd, unsigned what, void *arg)
return 0;
}
+/* do some polishing and enqueue it */
+static int
+enqueue_msg(struct ipfix_priv *priv, struct ipfix_msg *msg)
+{
+ struct ipfix_hdr *hdr = ipfix_msg_data(msg);
+
+ if (!msg)
+ return 0;
+
+ hdr->time = htonl(time(NULL));
+ hdr->seqno = htonl(priv->seqno += msg->nrecs);
+ if (msg->last_set) {
+ msg->last_set->id = htons(msg->last_set->id);
+ msg->last_set->len = htons(msg->last_set->len);
+ msg->last_set = NULL;
+ }
+ hdr->len = htons(ipfix_msg_len(msg));
+
+ llist_add(&msg->link, &priv->list);
+
+ return 0;
+}
+
+static int
+send_msgs(struct ulogd_pluginstance *pi)
+{
+ struct ipfix_priv *priv = upi_priv(pi);
+ struct llist_head *curr, *tmp;
+
+ llist_for_each_prev_safe(curr, tmp, &priv->list) {
+ struct ipfix_msg *msg = llist_entry(curr, struct ipfix_msg, link);
+ int ret;
+
+ ret = send(priv->ufd.fd, ipfix_msg_data(msg), ipfix_msg_len(msg), 0);
+ if (ret < 0) {
+ upi_log(pi, ULOGD_ERROR, "send: %m\n");
+ return ULOGD_IRET_ERR;
+ }
+
+ /* TODO handle short send() for other protocols */
+ if (ret < ipfix_msg_len(msg)) {
+ upi_log(pi, ULOGD_ERROR, "short send: %d < %d\n",
+ ret, ipfix_msg_len(msg));
+ return ULOGD_IRET_ERR;
+ }
+
+ llist_del(curr);
+ }
+
+ return ULOGD_IRET_OK;
+}
+
static void
ipfix_timer_cb(struct ulogd_timer *t)
{
+ struct ulogd_pluginstance *pi = t->data;
+ struct ipfix_priv *priv = upi_priv(pi);
+
+ if (priv->msg && priv->msg->nrecs > 0) {
+ enqueue_msg(priv, priv->msg);
+ priv->msg = NULL;
+
+ send_msgs(pi);
+ }
}
static int
@@ -259,13 +320,6 @@ ipfix_start(struct ulogd_pluginstance *pi)
ulogd_register_timer(&priv->timer);
- if ((priv->msg = ipfix_msg_alloc(mtu_ce(pi), oid_ce(pi))) == NULL) {
- upi_log(pi, ULOGD_ERROR, "out of memory\n");
- return -1;
- }
-
- ipfix_msg_add_set(priv->msg, VY_IPFIX_SID);
-
return ULOGD_IRET_OK;
}
@@ -283,74 +337,19 @@ ipfix_stop(struct ulogd_pluginstance *pi)
return 0;
}
-/* do some polishing and enqueue it */
-static int
-enqueue_msg(struct ipfix_priv *priv, struct ipfix_msg *msg)
-{
- struct ipfix_hdr *hdr = ipfix_msg_data(priv->msg);
-
- hdr->time = htonl(time(NULL));
- hdr->seqno = htonl(priv->seqno += msg->nrecs);
- if (msg->last_set) {
- msg->last_set->id = htons(msg->last_set->id);
- msg->last_set->len = htons(msg->last_set->len);
- msg->last_set = NULL;
- }
- hdr->len = htons(ipfix_msg_len(msg));
-
- llist_add(&priv->msg->link, &priv->list);
- priv->msg = NULL;
-
- return 0;
-}
-
-static int
-send_msgs(struct ulogd_pluginstance *pi)
-{
- struct ipfix_priv *priv = upi_priv(pi);
- struct llist_head *curr, *tmp;
-
- llist_for_each_prev_safe(curr, tmp, &priv->list) {
- struct ipfix_msg *msg = llist_entry(curr, struct ipfix_msg, link);
- int ret;
-
- ret = send(priv->ufd.fd, ipfix_msg_data(msg), ipfix_msg_len(msg), 0);
- if (ret < 0) {
- upi_log(pi, ULOGD_ERROR, "send: %m\n");
- return ULOGD_IRET_ERR;
- }
-
- /* TODO handle short send() for other protocols */
- if (ret < ipfix_msg_len(msg)) {
- upi_log(pi, ULOGD_ERROR, "short send: %d < %d\n",
- ret, ipfix_msg_len(msg));
- return ULOGD_IRET_ERR;
- }
-
- llist_del(curr);
- }
-
- return ULOGD_IRET_OK;
-}
-
static int
ipfix_interp(struct ulogd_pluginstance *pi, unsigned *flags)
{
struct ipfix_priv *priv = upi_priv(pi);
struct ulogd_key *in = pi->input.keys;
- struct ipfix_msg *msg = priv->msg;
struct vy_ipfix_data *data;
int ret;
- BUG_ON(!msg);
if (!key_src_valid(&in[InIpSaddr]))
return ULOGD_IRET_OK;
again:
- data = ipfix_msg_add_data(priv->msg, sizeof(struct vy_ipfix_data));
- if (!data) {
- enqueue_msg(priv, msg);
-
+ if (!priv->msg) {
priv->msg = ipfix_msg_alloc(mtu_ce(pi), oid_ce(pi));
if (!priv->msg) {
/* just drop this flow */
@@ -358,6 +357,12 @@ again:
return ULOGD_IRET_OK;
}
ipfix_msg_add_set(priv->msg, VY_IPFIX_SID);
+ }
+
+ data = ipfix_msg_add_data(priv->msg, sizeof(struct vy_ipfix_data));
+ if (!data) {
+ enqueue_msg(priv, priv->msg);
+ priv->msg = NULL;
/* can't loop because the next will definitely succeed */
goto again;
}