From 6360f319362fd13c86c3387a4bac57665d5ecd73 Mon Sep 17 00:00:00 2001 From: Pablo Neira Ayuso Date: Wed, 23 Sep 2009 18:12:37 +0200 Subject: conntrackd: add retention queue for TCP errors Under stress, the TCP stack may return EAGAIN if there is not space left in the sender buffer. We also enqueue any other error. Signed-off-by: Pablo Neira Ayuso --- src/channel.c | 119 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 115 insertions(+), 4 deletions(-) (limited to 'src/channel.c') diff --git a/src/channel.c b/src/channel.c index c442b0b..818bb01 100644 --- a/src/channel.c +++ b/src/channel.c @@ -13,20 +13,36 @@ #include #include #include +#include +#include "conntrackd.h" #include "channel.h" #include "network.h" +#include "queue.h" static struct channel_ops *ops[CHANNEL_MAX]; extern struct channel_ops channel_mcast; extern struct channel_ops channel_udp; extern struct channel_ops channel_tcp; -void channel_init(void) +static struct queue *errorq; + +int channel_init(void) { ops[CHANNEL_MCAST] = &channel_mcast; ops[CHANNEL_UDP] = &channel_udp; ops[CHANNEL_TCP] = &channel_tcp; + + errorq = queue_create("errorq", CONFIG(channelc).error_queue_length, 0); + if (errorq == NULL) { + return -1; + } + return 0; +} + +void channel_end(void) +{ + queue_destroy(errorq); } struct channel_buffer { @@ -133,9 +149,79 @@ channel_close(struct channel *c) free(c); } +struct channel_error { + char *data; + int len; +}; + +static void channel_enqueue_errors(struct channel *c) +{ + struct queue_object *qobj; + struct channel_error *error; + + qobj = queue_object_new(Q_ELEM_ERR, sizeof(struct channel_error)); + if (qobj == NULL) + return; + + error = (struct channel_error *)qobj->data; + error->len = c->buffer->len; + + error->data = malloc(c->buffer->len); + if (error->data == NULL) { + queue_object_free(qobj); + return; + } + memcpy(error->data, c->buffer->data, c->buffer->len); + if (queue_add(errorq, &qobj->qnode) < 0) { + if (errno == ENOSPC) { + struct queue_node *tail; + struct channel_error *tmp; + + tail = queue_del_head(errorq); + tmp = queue_node_data(tail); + free(tmp->data); + queue_object_free((struct queue_object *)tail); + + queue_add(errorq, &qobj->qnode); + } + } +} + +static int channel_handle_error_step(struct queue_node *n, const void *data2) +{ + struct channel_error *error; + const struct channel *c = data2; + int ret; + + error = queue_node_data(n); + ret = c->ops->send(c->data, error->data, error->len); + if (ret != -1) { + /* Success. Delete it from the error queue. */ + queue_del(n); + free(error->data); + queue_object_free((struct queue_object *)n); + } else { + /* We failed to deliver, give up now, try later. */ + return 1; + } + return 0; +} + +static int channel_handle_errors(struct channel *c) +{ + /* there are pending errors that we have to handle. */ + if (c->channel_flags & CHANNEL_F_ERRORS && queue_len(errorq) > 0) { + queue_iterate(errorq, c, channel_handle_error_step); + return queue_len(errorq) > 0; + } + return 0; +} + int channel_send(struct channel *c, const struct nethdr *net) { - int ret = 0, len = ntohs(net->len); + int ret = 0, len = ntohs(net->len), pending_errors; + + pending_errors = channel_handle_errors(c); if (!(c->channel_flags & CHANNEL_F_BUFFERED)) { c->ops->send(c->data, net, len); @@ -146,7 +232,19 @@ retry: memcpy(c->buffer->data + c->buffer->len, net, len); c->buffer->len += len; } else { - c->ops->send(c->data, c->buffer->data, c->buffer->len); + /* We've got pending packets to deliver, enqueue this + * packet to avoid possible re-ordering. */ + if (pending_errors) { + channel_enqueue_errors(c); + } else { + ret = c->ops->send(c->data, c->buffer->data, + c->buffer->len); + if (ret == -1 && + (c->channel_flags & CHANNEL_F_ERRORS)) { + /* Give it another chance to deliver. */ + channel_enqueue_errors(c); + } + } ret = 1; c->buffer->len = 0; goto retry; @@ -156,10 +254,23 @@ retry: int channel_send_flush(struct channel *c) { + int ret, pending_errors; + + pending_errors = channel_handle_errors(c); + if (!(c->channel_flags & CHANNEL_F_BUFFERED) || c->buffer->len == 0) return 0; - c->ops->send(c->data, c->buffer->data, c->buffer->len); + /* We still have pending errors to deliver, avoid any re-ordering. */ + if (pending_errors) { + channel_enqueue_errors(c); + } else { + ret = c->ops->send(c->data, c->buffer->data, c->buffer->len); + if (ret == -1 && (c->channel_flags & CHANNEL_F_ERRORS)) { + /* Give it another chance to deliver it. */ + channel_enqueue_errors(c); + } + } c->buffer->len = 0; return 1; } -- cgit v1.2.3