summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/channel.c119
-rw-r--r--src/read_config_lex.l1
-rw-r--r--src/read_config_yy.y18
-rw-r--r--src/sync-mode.c5
4 files changed, 135 insertions, 8 deletions
diff --git a/src/channel.c b/src/channel.c
index c442b0b..818bb01 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -13,20 +13,36 @@
#include <string.h>
#include <sys/ioctl.h>
#include <netinet/in.h>
+#include <errno.h>
+#include "conntrackd.h"
#include "channel.h"
#include "network.h"
+#include "queue.h"
static struct channel_ops *ops[CHANNEL_MAX];
extern struct channel_ops channel_mcast;
extern struct channel_ops channel_udp;
extern struct channel_ops channel_tcp;
-void channel_init(void)
+static struct queue *errorq;
+
+int channel_init(void)
{
ops[CHANNEL_MCAST] = &channel_mcast;
ops[CHANNEL_UDP] = &channel_udp;
ops[CHANNEL_TCP] = &channel_tcp;
+
+ errorq = queue_create("errorq", CONFIG(channelc).error_queue_length, 0);
+ if (errorq == NULL) {
+ return -1;
+ }
+ return 0;
+}
+
+void channel_end(void)
+{
+ queue_destroy(errorq);
}
struct channel_buffer {
@@ -133,9 +149,79 @@ channel_close(struct channel *c)
free(c);
}
+struct channel_error {
+ char *data;
+ int len;
+};
+
+static void channel_enqueue_errors(struct channel *c)
+{
+ struct queue_object *qobj;
+ struct channel_error *error;
+
+ qobj = queue_object_new(Q_ELEM_ERR, sizeof(struct channel_error));
+ if (qobj == NULL)
+ return;
+
+ error = (struct channel_error *)qobj->data;
+ error->len = c->buffer->len;
+
+ error->data = malloc(c->buffer->len);
+ if (error->data == NULL) {
+ queue_object_free(qobj);
+ return;
+ }
+ memcpy(error->data, c->buffer->data, c->buffer->len);
+ if (queue_add(errorq, &qobj->qnode) < 0) {
+ if (errno == ENOSPC) {
+ struct queue_node *tail;
+ struct channel_error *tmp;
+
+ tail = queue_del_head(errorq);
+ tmp = queue_node_data(tail);
+ free(tmp->data);
+ queue_object_free((struct queue_object *)tail);
+
+ queue_add(errorq, &qobj->qnode);
+ }
+ }
+}
+
+static int channel_handle_error_step(struct queue_node *n, const void *data2)
+{
+ struct channel_error *error;
+ const struct channel *c = data2;
+ int ret;
+
+ error = queue_node_data(n);
+ ret = c->ops->send(c->data, error->data, error->len);
+ if (ret != -1) {
+ /* Success. Delete it from the error queue. */
+ queue_del(n);
+ free(error->data);
+ queue_object_free((struct queue_object *)n);
+ } else {
+ /* We failed to deliver, give up now, try later. */
+ return 1;
+ }
+ return 0;
+}
+
+static int channel_handle_errors(struct channel *c)
+{
+ /* there are pending errors that we have to handle. */
+ if (c->channel_flags & CHANNEL_F_ERRORS && queue_len(errorq) > 0) {
+ queue_iterate(errorq, c, channel_handle_error_step);
+ return queue_len(errorq) > 0;
+ }
+ return 0;
+}
+
int channel_send(struct channel *c, const struct nethdr *net)
{
- int ret = 0, len = ntohs(net->len);
+ int ret = 0, len = ntohs(net->len), pending_errors;
+
+ pending_errors = channel_handle_errors(c);
if (!(c->channel_flags & CHANNEL_F_BUFFERED)) {
c->ops->send(c->data, net, len);
@@ -146,7 +232,19 @@ retry:
memcpy(c->buffer->data + c->buffer->len, net, len);
c->buffer->len += len;
} else {
- c->ops->send(c->data, c->buffer->data, c->buffer->len);
+ /* We've got pending packets to deliver, enqueue this
+ * packet to avoid possible re-ordering. */
+ if (pending_errors) {
+ channel_enqueue_errors(c);
+ } else {
+ ret = c->ops->send(c->data, c->buffer->data,
+ c->buffer->len);
+ if (ret == -1 &&
+ (c->channel_flags & CHANNEL_F_ERRORS)) {
+ /* Give it another chance to deliver. */
+ channel_enqueue_errors(c);
+ }
+ }
ret = 1;
c->buffer->len = 0;
goto retry;
@@ -156,10 +254,23 @@ retry:
int channel_send_flush(struct channel *c)
{
+ int ret, pending_errors;
+
+ pending_errors = channel_handle_errors(c);
+
if (!(c->channel_flags & CHANNEL_F_BUFFERED) || c->buffer->len == 0)
return 0;
- c->ops->send(c->data, c->buffer->data, c->buffer->len);
+ /* We still have pending errors to deliver, avoid any re-ordering. */
+ if (pending_errors) {
+ channel_enqueue_errors(c);
+ } else {
+ ret = c->ops->send(c->data, c->buffer->data, c->buffer->len);
+ if (ret == -1 && (c->channel_flags & CHANNEL_F_ERRORS)) {
+ /* Give it another chance to deliver it. */
+ channel_enqueue_errors(c);
+ }
+ }
c->buffer->len = 0;
return 1;
}
diff --git a/src/read_config_lex.l b/src/read_config_lex.l
index 9c53c6c..b4be6f0 100644
--- a/src/read_config_lex.l
+++ b/src/read_config_lex.l
@@ -137,6 +137,7 @@ notrack [N|n][O|o][T|t][R|r][A|a][C|c][K|k]
"Priority" { return T_PRIO; }
"NetlinkEventsReliable" { return T_NETLINK_EVENTS_RELIABLE; }
"DisableExternalCache" { return T_DISABLE_EXTERNAL_CACHE; }
+"ErrorQueueLength" { return T_ERROR_QUEUE_LENGTH; }
{is_on} { return T_ON; }
{is_off} { return T_OFF; }
diff --git a/src/read_config_yy.y b/src/read_config_yy.y
index 0804689..5075cf0 100644
--- a/src/read_config_yy.y
+++ b/src/read_config_yy.y
@@ -72,7 +72,7 @@ static void __max_dedicated_links_reached(void);
%token T_FROM T_USERSPACE T_KERNELSPACE T_EVENT_ITER_LIMIT T_DEFAULT
%token T_NETLINK_OVERRUN_RESYNC T_NICE T_IPV4_DEST_ADDR T_IPV6_DEST_ADDR
%token T_SCHEDULER T_TYPE T_PRIO T_NETLINK_EVENTS_RELIABLE
-%token T_DISABLE_EXTERNAL_CACHE
+%token T_DISABLE_EXTERNAL_CACHE T_ERROR_QUEUE_LENGTH
%token <string> T_IP T_PATH_VAL
%token <val> T_NUMBER
@@ -584,7 +584,8 @@ tcp_line : T_TCP '{' tcp_options '}'
conf.channel_type_global = CHANNEL_TCP;
conf.channel[conf.channel_num].channel_type = CHANNEL_TCP;
conf.channel[conf.channel_num].channel_flags = CHANNEL_F_BUFFERED |
- CHANNEL_F_STREAM;
+ CHANNEL_F_STREAM |
+ CHANNEL_F_ERRORS;
conf.channel_num++;
};
@@ -600,7 +601,8 @@ tcp_line : T_TCP T_DEFAULT '{' tcp_options '}'
conf.channel[conf.channel_num].channel_type = CHANNEL_TCP;
conf.channel[conf.channel_num].channel_flags = CHANNEL_F_DEFAULT |
CHANNEL_F_BUFFERED |
- CHANNEL_F_STREAM;
+ CHANNEL_F_STREAM |
+ CHANNEL_F_ERRORS;
conf.channel_default = conf.channel_num;
conf.channel_num++;
};
@@ -709,6 +711,12 @@ tcp_option: T_CHECKSUM T_OFF
conf.channel[conf.channel_num].u.tcp.checksum = 1;
};
+tcp_option: T_ERROR_QUEUE_LENGTH T_NUMBER
+{
+ __max_dedicated_links_reached();
+ CONFIG(channelc).error_queue_length = $2;
+};
+
hashsize : T_HASHSIZE T_NUMBER
{
conf.hashsize = $2;
@@ -1583,5 +1591,9 @@ init_config(char *filename)
if (CONFIG(nl_overrun_resync) == 0)
CONFIG(nl_overrun_resync) = 30;
+ /* default to 128 elements in the channel error queue */
+ if (CONFIG(channelc).error_queue_length == 0)
+ CONFIG(channelc).error_queue_length = 128;
+
return 0;
}
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 6781f10..63fae68 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -295,7 +295,8 @@ static int init_sync(void)
if (STATE_SYNC(external)->init() == -1)
return -1;
- channel_init();
+ if (channel_init() == -1)
+ return -1;
/* channel to send events on the wire */
STATE_SYNC(channel) =
@@ -397,6 +398,8 @@ static void kill_sync(void)
queue_destroy(STATE_SYNC(tx_queue));
+ channel_end();
+
origin_unregister(STATE_SYNC(commit).h);
nfct_close(STATE_SYNC(commit).h);
destroy_evfd(STATE_SYNC(commit).evfd);