summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHolger Eitzenberger <heitzenberger@astaro.com>2009-11-12 19:22:25 +0100
committerHarald Welte <laforge@gnumonks.org>2010-10-21 19:15:39 +0200
commitd51a4a66c00b0c4ee4a153920251b345dd6f533b (patch)
treeffd9dcb954d716c33c5ae5b6529ebeea1b7f60eb
parent3b4a92b5daed8dfbf502f01d253831507a71c1be (diff)
IPFIX: implement proper commit logic
Implement a proper commit logic for any-size MTUs and any-size data records, therefore getting more more flexibility for implementing other templates. Solve it by introducing ipfix_msg, which abstracts away all the housekeeping work for a packet, such as sets and message length. Also prepare for better handling of cases where the peer is not available (restart etc.). Signed-off-by: Holger Eitzenberger <heitzenberger@astaro.com>
-rw-r--r--output/ipfix/ipfix.c107
-rw-r--r--output/ipfix/ipfix.h17
-rw-r--r--output/ipfix/ulogd_output_IPFIX.c135
3 files changed, 208 insertions, 51 deletions
diff --git a/output/ipfix/ipfix.c b/output/ipfix/ipfix.c
index 7babef7..1164fb9 100644
--- a/output/ipfix/ipfix.c
+++ b/output/ipfix/ipfix.c
@@ -10,18 +10,109 @@
#include "ipfix.h"
-size_t
-ipfix_rec_len(uint16_t sid)
+
+struct ipfix_msg *
+ipfix_msg_alloc(size_t len, uint32_t oid)
{
- BUG_ON(sid != htons(VY_IPFIX_SID));
- return sizeof(struct vy_ipfix_data);
+ struct ipfix_msg *msg;
+ struct ipfix_hdr *hdr;
+
+ if (len < IPFIX_HDRLEN + IPFIX_SET_HDRLEN)
+ return NULL;
+
+ msg = malloc(sizeof(struct ipfix_msg) + len);
+ memset(msg, 0, sizeof(struct ipfix_msg));
+ msg->tail = msg->data + IPFIX_HDRLEN;
+ msg->end = msg->data + len;
+
+ hdr = ipfix_msg_hdr(msg);
+ memset(hdr, 0, IPFIX_HDRLEN);
+ hdr->version = htons(IPFIX_VERSION);
+ hdr->oid = htonl(oid);
+
+ return msg;
+}
+
+void
+ipfix_msg_free(struct ipfix_msg *msg)
+{
+ if (!msg)
+ return;
+
+ free(msg);
+}
+
+struct ipfix_hdr *
+ipfix_msg_hdr(const struct ipfix_msg *msg)
+{
+ return (struct ipfix_hdr *)msg->data;
+}
+
+void *
+ipfix_msg_data(struct ipfix_msg *msg)
+{
+ return msg->data;
}
size_t
-ipfix_msg_len(const struct ipfix_hdr *hdr)
+ipfix_msg_len(const struct ipfix_msg *msg)
+{
+ return msg->tail - msg->data;
+}
+
+struct ipfix_set_hdr *
+ipfix_msg_add_set(struct ipfix_msg *msg, uint16_t sid)
{
- struct ipfix_set_hdr *shdr = (struct ipfix_set_hdr *)hdr->data;
+ struct ipfix_set_hdr *shdr;
+
+ if (msg->end - msg->tail < IPFIX_SET_HDRLEN)
+ return NULL;
+
+ if (msg->last_set) {
+ shdr->id = htons(shdr->id);
+ shdr->len = htons(shdr->len);
+ }
- /* TODO count all sets */
- return IPFIX_HDRLEN + shdr->len;
+ shdr = (struct ipfix_set_hdr *)msg->tail;
+ shdr->id = sid; /* leave host byte-order */
+ shdr->len = 0;
+ msg->tail += IPFIX_SET_HDRLEN;
+ msg->last_set = shdr;
+ return shdr;
+}
+
+struct ipfix_set_hdr *
+ipfix_msg_get_set(const struct ipfix_msg *msg)
+{
+ return msg->last_set;
+}
+
+/**
+ * Add data record to an IPFIX message. The data is accounted properly.
+ *
+ * @return pointer to data or %NULL if not that much space left.
+ */
+void *
+ipfix_msg_add_data(struct ipfix_msg *msg, size_t len)
+{
+ void *data;
+
+ BUG_ON(!msg->last_set);
+ if (len > msg->end - msg->tail)
+ return NULL;
+
+ data = msg->tail;
+ msg->tail += len;
+ msg->nrecs++;
+ msg->last_set->len += len;
+
+ return data;
+}
+
+/* template management */
+size_t
+ipfix_rec_len(uint16_t sid)
+{
+ BUG_ON(sid != htons(VY_IPFIX_SID));
+ return sizeof(struct vy_ipfix_data);
}
diff --git a/output/ipfix/ipfix.h b/output/ipfix/ipfix.h
index cd51b2b..38b174d 100644
--- a/output/ipfix/ipfix.h
+++ b/output/ipfix/ipfix.h
@@ -40,6 +40,15 @@ struct ipfix_set_hdr {
#define IPFIX_SET_HDRLEN sizeof(struct ipfix_set_hdr)
+struct ipfix_msg {
+ struct llist_head link;
+ uint8_t *tail;
+ uint8_t *end;
+ unsigned nrecs;
+ struct ipfix_set_hdr *last_set;
+ uint8_t data[];
+};
+
struct vy_ipfix_data {
struct sockaddr_in saddr;
struct sockaddr_in daddr;
@@ -67,6 +76,12 @@ struct vy_ipfix_data {
size_t ipfix_rec_len(uint16_t);
/* message handling */
-size_t ipfix_msg_len(const struct ipfix_hdr *);
+struct ipfix_msg *ipfix_msg_alloc(size_t, uint32_t);
+void ipfix_msg_free(struct ipfix_msg *);
+struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *);
+size_t ipfix_msg_len(const struct ipfix_msg *);
+void *ipfix_msg_data(struct ipfix_msg *);
+struct ipfix_set_hdr *ipfix_msg_add_set(struct ipfix_msg *, uint16_t);
+void *ipfix_msg_add_data(struct ipfix_msg *, size_t);
#endif /* IPFIX_H */
diff --git a/output/ipfix/ulogd_output_IPFIX.c b/output/ipfix/ulogd_output_IPFIX.c
index 1e55d34..50f3400 100644
--- a/output/ipfix/ulogd_output_IPFIX.c
+++ b/output/ipfix/ulogd_output_IPFIX.c
@@ -37,15 +37,17 @@ enum {
HOST_CE,
PORT_CE,
PROTO_CE,
+ MTU_CE,
};
static const struct config_keyset ipfix_kset = {
- .num_ces = 4,
+ .num_ces = 5,
.ces = {
[OID_CE] = CONFIG_KEY_INT("oid", 0),
[HOST_CE] = CONFIG_KEY_STR("host", ""),
[PORT_CE] = CONFIG_KEY_INT("port", DEFAULT_PORT ),
[PROTO_CE] = CONFIG_KEY_STR("proto", "tcp"),
+ [MTU_CE] = CONFIG_KEY_INT("mtu", DEFAULT_MTU),
},
};
@@ -53,6 +55,7 @@ static const struct config_keyset ipfix_kset = {
#define host_ce(pi) ulogd_config_str(pi, HOST_CE)
#define port_ce(pi) ulogd_config_int(pi, PORT_CE)
#define proto_ce(pi) ulogd_config_str(pi, PROTO_CE)
+#define mtu_ce(pi) ulogd_config_int(pi, MTU_CE)
struct ipfix_templ {
@@ -67,10 +70,8 @@ struct ipfix_flow {
struct ipfix_priv {
struct ulogd_fd ufd;
uint32_t seqno;
- struct ipfix_hdr *hdr;
- char *end;
- int num_flows;
- int set_len;
+ struct ipfix_msg *msg; /* current message */
+ struct llist_head list;
struct ipfix_templ *templates;
int proto;
struct ulogd_timer timer;
@@ -80,8 +81,6 @@ struct ipfix_priv {
enum {
InIpSaddr = 0,
InIpDaddr,
- /* InOobIfiIn, */
- /* InOobIfiOut, */
InRawInPktCount,
InRawInPktLen,
InRawOutPktCount,
@@ -99,8 +98,6 @@ enum {
static struct ulogd_key ipfix_in_keys[] = {
[InIpSaddr] = KEY(IPADDR, "ip.saddr"),
[InIpDaddr] = KEY(IPADDR, "ip.daddr"),
- /* [InOobIfiIn] = KEY(UINT32, "oob.ifindex_in"), */
- /* [InOobIfiOut] = KEY(UINT32, "oob.ifindex_out"), */
[InRawInPktCount] = KEY(UINT64, "raw.in.pktcount"),
[InRawInPktLen] = KEY(UINT64, "raw.in.pktlen"),
[InRawOutPktCount] = KEY(UINT64, "raw.out.pktcount"),
@@ -149,6 +146,7 @@ static int
ipfix_configure(struct ulogd_pluginstance *pi)
{
struct ipfix_priv *priv = upi_priv(pi);
+ char addr[16];
int ret;
if (!oid_ce(pi)) {
@@ -176,10 +174,16 @@ ipfix_configure(struct ulogd_pluginstance *pi)
return ULOGD_IRET_ERR;
}
+ INIT_LLIST_HEAD(&priv->list);
+
ulogd_init_fd(&priv->ufd, -1, ULOGD_FD_READ, ipfix_ufd_cb, pi);
ulogd_init_timer(&priv->timer, 1 SEC, ipfix_timer_cb, pi,
TIMER_F_PERIODIC);
+ upi_log(pi, ULOGD_INFO, "using IPFIX Collector at %s:%d (MTU %d)\n",
+ inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
+ port_ce(pi), mtu_ce(pi));
+
return ULOGD_IRET_OK;
}
@@ -229,9 +233,7 @@ static int
ipfix_start(struct ulogd_pluginstance *pi)
{
struct ipfix_priv *priv = upi_priv(pi);
- struct ipfix_hdr *hdr;
struct ipfix_set_hdr *shdr;
- struct vy_ipfix_data *data;
char addr[16];
int ret;
@@ -258,17 +260,12 @@ ipfix_start(struct ulogd_pluginstance *pi)
ulogd_register_timer(&priv->timer);
- if ((hdr = priv->hdr = malloc(VY_IPFIX_PKT_LEN)) == NULL) {
+ if ((priv->msg = ipfix_msg_alloc(mtu_ce(pi), oid_ce(pi))) == NULL) {
upi_log(pi, ULOGD_ERROR, "out of memory\n");
return -1;
}
- memset(hdr, 0, IPFIX_HDRLEN + IPFIX_SET_HDRLEN);
- shdr = (struct ipfix_set_hdr *)hdr->data;
- data = (struct vy_ipfix_data *)shdr->data;
- hdr->version = htons(IPFIX_VERSION);
- hdr->oid = htonl(oid_ce(pi));
- shdr->id = htons(VY_IPFIX_SID);
+ ipfix_msg_add_set(priv->msg, VY_IPFIX_SID);
return ULOGD_IRET_OK;
}
@@ -281,32 +278,97 @@ ipfix_stop(struct ulogd_pluginstance *pi)
close(priv->ufd.fd);
priv->ufd.fd = -1;
- free(priv->hdr);
+ ipfix_msg_free(priv->msg);
+ priv->msg = NULL;
+
+ return 0;
+}
+
+/* do some polishing and enqueue it */
+static int
+enqueue_msg(struct ipfix_priv *priv, struct ipfix_msg *msg)
+{
+ struct ipfix_hdr *hdr = ipfix_msg_data(priv->msg);
+
+ hdr->time = htonl(time(NULL));
+ hdr->seqno = htonl(priv->seqno += msg->nrecs);
+ if (msg->last_set) {
+ msg->last_set->id = htons(msg->last_set->id);
+ msg->last_set->len = htons(msg->last_set->len);
+ msg->last_set = NULL;
+ }
+ hdr->len = htons(ipfix_msg_len(msg));
+
+ llist_add(&priv->msg->link, &priv->list);
+ priv->msg = NULL;
return 0;
}
static int
+send_msgs(struct ulogd_pluginstance *pi)
+{
+ struct ipfix_priv *priv = upi_priv(pi);
+ struct llist_head *curr, *tmp;
+
+ if (llist_empty(&priv->list))
+ return 0;
+
+ llist_for_each_prev_safe(curr, tmp, &priv->list) {
+ struct ipfix_msg *msg = llist_entry(curr, struct ipfix_msg, link);
+ int ret;
+
+ ret = send(priv->ufd.fd, ipfix_msg_data(msg), ipfix_msg_len(msg), 0);
+ if (ret < 0) {
+ upi_log(pi, ULOGD_ERROR, "send: %m\n");
+ return ULOGD_IRET_ERR;
+ }
+
+ /* TODO handle short send() for other protocols */
+ if (ret < ipfix_msg_len(msg)) {
+ upi_log(pi, ULOGD_ERROR, "short send: %d < %d\n",
+ ret, ipfix_msg_len(msg));
+ return ULOGD_IRET_ERR;
+ }
+
+ llist_del(curr);
+ }
+
+ return ULOGD_IRET_OK;
+}
+
+static int
ipfix_interp(struct ulogd_pluginstance *pi, unsigned *flags)
{
struct ipfix_priv *priv = upi_priv(pi);
struct ulogd_key *in = pi->input.keys;
- struct ipfix_hdr *hdr = priv->hdr;
- struct ipfix_set_hdr *shdr;
+ struct ipfix_msg *msg = priv->msg;
struct vy_ipfix_data *data;
int ret;
- BUG_ON(priv->num_flows < 0 || priv->num_flows >= 2);
- BUG_ON(!hdr);
- shdr = (struct ipfix_set_hdr *)hdr->data;
- data = (struct vy_ipfix_data *)shdr->data + priv->num_flows;
-
+ BUG_ON(!msg);
if (!key_src_valid(&in[InIpSaddr]))
return ULOGD_IRET_OK;
+again:
+ data = ipfix_msg_add_data(priv->msg, sizeof(struct vy_ipfix_data));
+ if (!data) {
+ enqueue_msg(priv, msg);
+
+ priv->msg = ipfix_msg_alloc(mtu_ce(pi), oid_ce(pi));
+ if (!priv->msg) {
+ /* just drop this flow */
+ upi_log(pi, ULOGD_ERROR, "out of memory, dropping flow\n");
+ return ULOGD_IRET_OK;
+ }
+ ipfix_msg_add_set(priv->msg, VY_IPFIX_SID);
+ /* can't loop because we know the next will succeed */
+ goto again;
+ }
+
key_src_in(&in[InIpSaddr], &data->saddr.sin_addr);
key_src_in(&in[InIpDaddr], &data->daddr.sin_addr);
- data->ifi_in = data->ifi_out = htons(1);
+ data->ifi_in = data->ifi_out = 0;
data->packets = htonl((uint32_t)(key_src_u64(&in[InRawInPktCount])
+ key_src_u64(&in[InRawOutPktCount])));
@@ -327,23 +389,12 @@ ipfix_interp(struct ulogd_pluginstance *pi, unsigned *flags)
data->aid = 0;
data->l4_proto = key_src_u8(&in[InIpProto]);
data->dscp = 0;
-
- priv->set_len += ipfix_rec_len(htons(VY_IPFIX_SID));
-
- if (++priv->num_flows >= 2) {
- hdr->time = htonl(time(NULL));
- hdr->seqno = htonl(++priv->seqno);
- shdr->len = htons(priv->set_len);
- hdr->len = htons(IPFIX_HDRLEN + htons(shdr->len));
+ data->__padding = 0;
- ret = send(priv->ufd.fd, priv->hdr, IPFIX_HDRLEN + priv->set_len, 0);
+ if ((ret = send_msgs(pi)) < 0)
+ return ret;
- priv->num_flows = 0;
- priv->set_len = 0;
- shdr->len = 0;
- }
-
- return 0;
+ return ULOGD_IRET_OK;
}
static struct ulogd_plugin ipfix_plugin = {