summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPablo Neira Ayuso <pablo@netfilter.org>2009-01-15 23:19:58 +0100
committerPablo Neira Ayuso <pablo@netfilter.org>2009-01-15 23:19:58 +0100
commite2af183ea7e5ea35a1582f40a01a7c49e83b31be (patch)
tree5d5c5fabca580aa2851fb39c3e343b5bc324342e
parent2cacd3a802510bde43e23cf4c7d39f51a2eaf460 (diff)
sync: unify tx_list and tx_queue into one single tx_queue
This patch unifies the tx_list and the tx_queue to have only one transmission queue. Since the tx_list hold state objects and tx_queue control messages, I have introduced a queue node type that can be used to differenciate the kind of information that the node stores: object or control message. This patch also reworks the existing queue class to include a file descriptor that can be used to know if there are new data added to the queue (see QUEUE_F_EVFD flag). In this change, I have also modified the current evfd to make the file descriptor to make read operations non-blocking. Moreover, it keeps a counter that is used to know how many messages are inserted in the queue. Signed-off-by: Pablo Neira Ayuso <pablo@netfilter.org>
-rw-r--r--include/conntrackd.h1
-rw-r--r--include/queue.h51
-rw-r--r--include/sync.h6
-rw-r--r--src/event.c20
-rw-r--r--src/queue.c124
-rw-r--r--src/sync-ftfw.c367
-rw-r--r--src/sync-mode.c20
-rw-r--r--src/sync-notrack.c114
8 files changed, 360 insertions, 343 deletions
diff --git a/include/conntrackd.h b/include/conntrackd.h
index 67397b8..8cb520d 100644
--- a/include/conntrackd.h
+++ b/include/conntrackd.h
@@ -150,7 +150,6 @@ struct ct_sync_state {
struct mcast_sock *mcast_server; /* multicast socket: incoming */
struct mcast_sock *mcast_client; /* multicast socket: outgoing */
- struct evfd *evfd; /* event fd */
struct sync_mode *sync; /* sync mode */
diff --git a/include/queue.h b/include/queue.h
index 5a9cf39..ef56323 100644
--- a/include/queue.h
+++ b/include/queue.h
@@ -1,28 +1,53 @@
#ifndef _QUEUE_H_
#define _QUEUE_H_
+#include <stdint.h>
#include "linux_list.h"
-struct queue {
- size_t max_size;
- size_t cur_size;
- unsigned int num_elems;
- struct list_head head;
+struct queue_node {
+ struct list_head head;
+ uint32_t type;
+ struct queue *owner;
+ size_t size;
};
-struct queue_node {
- struct list_head head;
- size_t size;
- char data[0];
+enum {
+ Q_ELEM_OBJ = 0,
+ Q_ELEM_CTL = 1
+};
+
+void queue_node_init(struct queue_node *n, int type);
+void *queue_node_data(struct queue_node *n);
+
+struct queue_object {
+ struct queue_node qnode;
+ char data[0];
};
-struct queue *queue_create(size_t max_size);
+struct queue_object *queue_object_new(int type, size_t size);
+void queue_object_free(struct queue_object *obj);
+
+struct evfd;
+
+struct queue {
+ unsigned int max_elems;
+ unsigned int num_elems;
+ uint32_t flags;
+ struct list_head head;
+ struct evfd *evfd;
+};
+
+#define QUEUE_F_EVFD (1U << 0)
+
+struct queue *queue_create(int max_objects, unsigned int flags);
void queue_destroy(struct queue *b);
unsigned int queue_len(const struct queue *b);
-int queue_add(struct queue *b, const void *data, size_t size);
-void queue_del(struct queue *b, void *data);
+int queue_add(struct queue *b, struct queue_node *n);
+int queue_del(struct queue_node *n);
+int queue_in(struct queue *b, struct queue_node *n);
void queue_iterate(struct queue *b,
const void *data,
- int (*iterate)(void *data1, const void *data2));
+ int (*iterate)(struct queue_node *n, const void *data2));
+int queue_get_eventfd(struct queue *b);
#endif
diff --git a/include/sync.h b/include/sync.h
index 60c9fae..9a9540c 100644
--- a/include/sync.h
+++ b/include/sync.h
@@ -1,8 +1,11 @@
#ifndef _SYNC_HOOKS_H_
#define _SYNC_HOOKS_H_
+#include <sys/select.h>
+
struct nethdr;
struct cache_object;
+struct fds;
struct sync_mode {
int internal_cache_flags;
@@ -15,7 +18,8 @@ struct sync_mode {
int (*local)(int fd, int type, void *data);
int (*recv)(const struct nethdr *net);
void (*send)(struct nethdr *net, struct cache_object *obj);
- void (*run)(void);
+ void (*run)(fd_set *readfds);
+ int (*register_fds)(struct fds *fds);
};
extern struct sync_mode sync_alarm;
diff --git a/src/event.c b/src/event.c
index ed78835..d1dfe72 100644
--- a/src/event.c
+++ b/src/event.c
@@ -17,6 +17,8 @@
*/
#include <unistd.h>
#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
#include "event.h"
@@ -37,6 +39,7 @@ struct evfd *create_evfd(void)
free(e);
return NULL;
}
+ fcntl(e->fds[0], F_SETFL, O_NONBLOCK);
return e;
}
@@ -55,19 +58,20 @@ int get_read_evfd(struct evfd *evfd)
int write_evfd(struct evfd *evfd)
{
- int data = 0;
+ int data = 0, ret = 0;
- if (evfd->read)
- return 0;
+ if (evfd->read == 0)
+ ret = write(evfd->fds[1], &data, sizeof(data));
+ evfd->read++;
- evfd->read = 1;
- return write(evfd->fds[1], &data, sizeof(data));
+ return ret;
}
int read_evfd(struct evfd *evfd)
{
- int data;
+ int data, ret = 0;
- evfd->read = 0;
- return read(evfd->fds[0], &data, sizeof(data));
+ if (--evfd->read == 0)
+ ret = read(evfd->fds[0], &data, sizeof(data));
+ return ret;
}
diff --git a/src/queue.c b/src/queue.c
index cdd70ae..cffcc93 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -1,5 +1,5 @@
/*
- * (C) 2006-2008 by Pablo Neira Ayuso <pablo@netfilter.org>
+ * (C) 2006-2009 by Pablo Neira Ayuso <pablo@netfilter.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -17,110 +17,122 @@
*/
#include "queue.h"
+#include "event.h"
#include <errno.h>
#include <stdlib.h>
#include <string.h>
-struct queue *queue_create(size_t max_size)
+struct queue *queue_create(int max_objects, unsigned int flags)
{
struct queue *b;
- b = malloc(sizeof(struct queue));
+ b = calloc(sizeof(struct queue), 1);
if (b == NULL)
return NULL;
- memset(b, 0, sizeof(struct queue));
- b->max_size = max_size;
+ b->max_elems = max_objects;
INIT_LIST_HEAD(&b->head);
+ b->flags = flags;
+
+ if (flags & QUEUE_F_EVFD) {
+ b->evfd = create_evfd();
+ if (b->evfd == NULL) {
+ free(b);
+ return NULL;
+ }
+ }
return b;
}
void queue_destroy(struct queue *b)
{
- struct list_head *i, *tmp;
- struct queue_node *node;
-
- /* XXX: set cur_size and num_elems */
- list_for_each_safe(i, tmp, &b->head) {
- node = (struct queue_node *) i;
- list_del(i);
- free(node);
- }
+ if (b->flags & QUEUE_F_EVFD)
+ destroy_evfd(b->evfd);
free(b);
}
-static struct queue_node *queue_node_create(const void *data, size_t size)
+void queue_node_init(struct queue_node *n, int type)
{
- struct queue_node *n;
+ INIT_LIST_HEAD(&n->head);
+ n->type = type;
+}
- n = malloc(sizeof(struct queue_node) + size);
- if (n == NULL)
+void *queue_node_data(struct queue_node *n)
+{
+ return ((char *)n) + sizeof(struct queue_node);
+}
+
+struct queue_object *queue_object_new(int type, size_t size)
+{
+ struct queue_object *obj;
+
+ obj = calloc(sizeof(struct queue_object) + size, 1);
+ if (obj == NULL)
return NULL;
- n->size = size;
- memcpy(n->data, data, size);
+ obj->qnode.size = size;
+ queue_node_init(&obj->qnode, type);
- return n;
+ return obj;
}
-int queue_add(struct queue *b, const void *data, size_t size)
+void queue_object_free(struct queue_object *obj)
{
- int ret = 0;
- struct queue_node *n;
-
- /* does it fit this queue? */
- if (size > b->max_size) {
- errno = ENOSPC;
- ret = -1;
- goto err;
- }
+ free(obj);
+}
-retry:
- /* queue is full: kill the oldest entry */
- if (b->cur_size + size > b->max_size) {
- n = (struct queue_node *) b->head.prev;
- list_del(b->head.prev);
- b->cur_size -= n->size;
- free(n);
- goto retry;
- }
+int queue_add(struct queue *b, struct queue_node *n)
+{
+ if (!list_empty(&n->head))
+ return 0;
- n = queue_node_create(data, size);
- if (n == NULL) {
- ret = -1;
- goto err;
+ if (b->num_elems >= b->max_elems) {
+ errno = ENOSPC;
+ return -1;
}
-
+ n->owner = b;
list_add_tail(&n->head, &b->head);
- b->cur_size += size;
b->num_elems++;
+ if (b->evfd)
+ write_evfd(b->evfd);
+ return 1;
+}
-err:
- return ret;
+int queue_del(struct queue_node *n)
+{
+ if (list_empty(&n->head))
+ return 0;
+
+ list_del_init(&n->head);
+ n->owner->num_elems--;
+ if (n->owner->evfd)
+ read_evfd(n->owner->evfd);
+ n->owner = NULL;
+ return 1;
}
-void queue_del(struct queue *b, void *data)
+int queue_in(struct queue *b, struct queue_node *n)
{
- struct queue_node *n = container_of(data, struct queue_node, data);
+ return b == n->owner;
+}
- list_del(&n->head);
- b->cur_size -= n->size;
- b->num_elems--;
- free(n);
+int queue_get_eventfd(struct queue *b)
+{
+ return get_read_evfd(b->evfd);
}
void queue_iterate(struct queue *b,
const void *data,
- int (*iterate)(void *data1, const void *data2))
+ int (*iterate)(struct queue_node *n, const void *data2))
{
struct list_head *i, *tmp;
struct queue_node *n;
list_for_each_safe(i, tmp, &b->head) {
n = (struct queue_node *) i;
- if (iterate(n->data, data))
+ if (iterate(n, data))
break;
}
}
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c
index bddc18c..bb53849 100644
--- a/src/sync-ftfw.c
+++ b/src/sync-ftfw.c
@@ -24,7 +24,7 @@
#include "alarm.h"
#include "log.h"
#include "cache.h"
-#include "event.h"
+#include "fds.h"
#include <string.h>
@@ -34,12 +34,8 @@
#define dp(...)
#endif
-static LIST_HEAD(rs_list);
-static LIST_HEAD(tx_list);
-static unsigned int rs_list_len;
-static unsigned int tx_list_len;
-static struct queue *rs_queue;
-static struct queue *tx_queue;
+struct queue *tx_queue;
+struct queue *rs_queue;
static uint32_t exp_seq;
static uint32_t window;
static uint32_t ack_from;
@@ -58,8 +54,7 @@ static int say_hello_back;
#define ALIVE_INT 1
struct cache_ftfw {
- struct list_head rs_list;
- struct list_head tx_list;
+ struct queue_node qnode;
uint32_t seq;
};
@@ -67,24 +62,13 @@ static void cache_ftfw_add(struct cache_object *obj, void *data)
{
struct cache_ftfw *cn = data;
/* These nodes are not inserted in the list */
- INIT_LIST_HEAD(&cn->rs_list);
- INIT_LIST_HEAD(&cn->tx_list);
+ queue_node_init(&cn->qnode, Q_ELEM_OBJ);
}
static void cache_ftfw_del(struct cache_object *obj, void *data)
{
struct cache_ftfw *cn = data;
-
- /* this node is already out of the list */
- if (!list_empty(&cn->rs_list)) {
- /* no need for list_del_init since the entry is destroyed */
- list_del(&cn->rs_list);
- rs_list_len--;
- }
- if (!list_empty(&cn->tx_list)) {
- list_del(&cn->tx_list);
- tx_list_len--;
- }
+ queue_del(&cn->qnode);
}
static struct cache_extra cache_ftfw_extra = {
@@ -95,54 +79,64 @@ static struct cache_extra cache_ftfw_extra = {
static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
{
- struct nethdr_ack ack = {
- .type = NET_T_CTL,
- .flags = flags,
- .from = from,
- .to = to,
- };
+ struct queue_object *qobj;
+ struct nethdr_ack *ack;
+
+ qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
+ if (qobj == NULL)
+ return;
+
+ ack = (struct nethdr_ack *)qobj->data;
+ ack->type = NET_T_CTL;
+ ack->flags = flags;
+ ack->from = from;
+ ack->to = to;
switch(hello_state) {
case HELLO_INIT:
hello_state = HELLO_SAY;
/* fall through */
case HELLO_SAY:
- ack.flags |= NET_F_HELLO;
+ ack->flags |= NET_F_HELLO;
break;
}
if (say_hello_back) {
- ack.flags |= NET_F_HELLO_BACK;
+ ack->flags |= NET_F_HELLO_BACK;
say_hello_back = 0;
}
- queue_add(tx_queue, &ack, NETHDR_ACK_SIZ);
- write_evfd(STATE_SYNC(evfd));
+ queue_add(tx_queue, &qobj->qnode);
}
static void tx_queue_add_ctlmsg2(uint32_t flags)
{
- struct nethdr ctl = {
- .type = NET_T_CTL,
- .flags = flags,
- };
+ struct queue_object *qobj;
+ struct nethdr *ctl;
+
+ qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
+ if (qobj == NULL)
+ return;
+
+ ctl = (struct nethdr *)qobj->data;
+ ctl->type = NET_T_CTL;
+ ctl->flags = flags;
switch(hello_state) {
case HELLO_INIT:
hello_state = HELLO_SAY;
/* fall through */
case HELLO_SAY:
- ctl.flags |= NET_F_HELLO;
+ ctl->flags |= NET_F_HELLO;
break;
}
if (say_hello_back) {
- ctl.flags |= NET_F_HELLO_BACK;
+ ctl->flags |= NET_F_HELLO_BACK;
say_hello_back = 0;
}
- queue_add(tx_queue, &ctl, NETHDR_SIZ);
- write_evfd(STATE_SYNC(evfd));
+ queue_add(tx_queue, &qobj->qnode);
}
/* this function is called from the alarm framework */
@@ -156,17 +150,18 @@ static void do_alive_alarm(struct alarm_block *a, void *data)
ack_from_set = 0;
} else
tx_queue_add_ctlmsg2(NET_F_ALIVE);
+
+ add_alarm(&alive_alarm, ALIVE_INT, 0);
}
static int ftfw_init(void)
{
- tx_queue = queue_create(CONFIG(resend_queue_size));
+ tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD);
if (tx_queue == NULL) {
dlog(LOG_ERR, "cannot create tx queue");
return -1;
}
-
- rs_queue = queue_create(CONFIG(resend_queue_size));
+ rs_queue = queue_create(INT_MAX, 0);
if (rs_queue == NULL) {
dlog(LOG_ERR, "cannot create rs queue");
return -1;
@@ -192,45 +187,47 @@ static int do_cache_to_tx(void *data1, void *data2)
struct cache_object *obj = data2;
struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj);
- /* repeated request for resync? */
- if (!list_empty(&cn->tx_list))
- return 0;
+ if (queue_in(rs_queue, &cn->qnode))
+ queue_del(&cn->qnode);
- /* add to tx list */
- list_add_tail(&cn->tx_list, &tx_list);
- tx_list_len++;
- write_evfd(STATE_SYNC(evfd));
+ queue_add(tx_queue, &cn->qnode);
return 0;
}
-static int debug_rs_queue_dump_step(void *data1, const void *data2)
+static int rs_queue_dump(struct queue_node *n, const void *data2)
{
- struct nethdr_ack *net = data1;
const int *fd = data2;
char buf[512];
int size;
- size = sprintf(buf, "seq:%u flags:%u\n", net->seq, net->flags);
+ switch(n->type) {
+ case Q_ELEM_CTL: {
+ struct nethdr *net = queue_node_data(n);
+ size = sprintf(buf, "control -> seq:%u flags:%u\n",
+ net->seq, net->flags);
+ break;
+ }
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn = (struct cache_ftfw *) n;
+ size = sprintf(buf, "object -> seq:%u\n", cn->seq);
+ break;
+ }
+ default:
+ return 0;
+ }
send(*fd, buf, size, 0);
return 0;
}
static void debug_rs_dump(int fd)
{
- struct cache_ftfw *cn, *tmp;
char buf[512];
int size;
- size = sprintf(buf, "resent list (len=%u):\n", rs_list_len);
- send(fd, buf, size, 0);
- list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
- size = sprintf(buf, "seq:%u\n", cn->seq);
- send(fd, buf, size, 0);
- }
- size = sprintf(buf, "\nresent queue (len=%u):\n", queue_len(rs_queue));
+ size = sprintf(buf, "resent queue (len=%u):\n", queue_len(rs_queue));
send(fd, buf, size, 0);
- queue_iterate(rs_queue, &fd, debug_rs_queue_dump_step);
+ queue_iterate(rs_queue, &fd, rs_queue_dump);
}
static int ftfw_local(int fd, int type, void *data)
@@ -257,87 +254,84 @@ static int ftfw_local(int fd, int type, void *data)
return ret;
}
-static int rs_queue_to_tx(void *data1, const void *data2)
+static int rs_queue_to_tx(struct queue_node *n, const void *data)
{
- struct nethdr_ack *net = data1;
- const struct nethdr_ack *nack = data2;
-
- if (before(net->seq, nack->from))
- return 0; /* continue */
- else if (after(net->seq, nack->to))
- return 1; /* break */
-
- dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
- net->seq, net->flags, net->len);
- queue_add(tx_queue, net, net->len);
- write_evfd(STATE_SYNC(evfd));
- queue_del(rs_queue, net);
- return 0;
-}
+ const struct nethdr_ack *nack = data;
-static int rs_queue_empty(void *data1, const void *data2)
-{
- struct nethdr *net = data1;
- const struct nethdr_ack *h = data2;
+ switch(n->type) {
+ case Q_ELEM_CTL: {
+ struct nethdr_ack *net = queue_node_data(n);
- if (h == NULL) {
- dp("inconditional remove from queue (seq=%u)\n", net->seq);
- queue_del(rs_queue, data1);
- return 0;
+ if (before(net->seq, nack->from))
+ return 0; /* continue */
+ else if (after(net->seq, nack->to))
+ return 1; /* break */
+
+ dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
+ net->seq, net->flags, net->len);
+
+ queue_del(n);
+ queue_add(tx_queue, n);
+ break;
}
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn;
- if (before(net->seq, h->from))
- return 0; /* continue */
- else if (after(net->seq, h->to))
- return 1; /* break */
+ cn = (struct cache_ftfw *) n;
+ if (before(cn->seq, nack->from))
+ return 0;
+ else if (after(cn->seq, nack->to))
+ return 1;
- dp("remove from queue (seq=%u)\n", net->seq);
- queue_del(rs_queue, data1);
+ dp("resending nack'ed (oldseq=%u)\n", cn->seq);
+
+ queue_del(n);
+ queue_add(tx_queue, n);
+ break;
+ }
+ }
return 0;
}
-static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to)
+static int rs_queue_empty(struct queue_node *n, const void *data)
{
- struct cache_ftfw *cn, *tmp;
+ const struct nethdr_ack *h = data;
- list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
- struct cache_object *obj;;
-
- obj = cache_data_get_object(STATE_SYNC(internal), cn);
- if (before(cn->seq, from))
- continue;
- else if (after(cn->seq, to))
- break;
+ if (h == NULL) {
+ dp("inconditional remove from queue (seq=%u)\n", net->seq);
+ queue_del(n);
+ return 0;
+ }
- dp("resending nack'ed (oldseq=%u)\n", cn->seq);
- list_del_init(&cn->rs_list);
- rs_list_len--;
- /* we received a request for resync before this nack? */
- if (list_empty(&cn->tx_list)) {
- list_add_tail(&cn->tx_list, &tx_list);
- tx_list_len++;
- }
- write_evfd(STATE_SYNC(evfd));
- }
-}
+ switch(n->type) {
+ case Q_ELEM_CTL: {
+ struct nethdr_ack *net = queue_node_data(n);
-static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
-{
- struct cache_ftfw *cn, *tmp;
+ if (before(net->seq, h->from))
+ return 0; /* continue */
+ else if (after(net->seq, h->to))
+ return 1; /* break */
- list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
- struct cache_object *obj;
+ dp("remove from queue (seq=%u)\n", net->seq);
+ queue_del(n);
+ queue_object_free((struct queue_object *)n);
+ break;
+ }
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn;
- obj = cache_data_get_object(STATE_SYNC(internal), cn);
- if (before(cn->seq, from))
- continue;
- else if (after(cn->seq, to))
- break;
+ cn = (struct cache_ftfw *) n;
+ if (before(cn->seq, h->from))
+ return 0;
+ else if (after(cn->seq, h->to))
+ return 1;
dp("queue: deleting from queue (seq=%u)\n", cn->seq);
- list_del_init(&cn->rs_list);
- rs_list_len--;
+ queue_del(n);
+ break;
}
+ }
+ return 0;
}
static int digest_msg(const struct nethdr *net)
@@ -351,7 +345,6 @@ static int digest_msg(const struct nethdr *net)
if (before(h->to, h->from))
return MSG_BAD;
- rs_list_empty(STATE_SYNC(internal), h->from, h->to);
queue_iterate(rs_queue, h, rs_queue_empty);
return MSG_CTL;
@@ -361,7 +354,6 @@ static int digest_msg(const struct nethdr *net)
if (before(nack->to, nack->from))
return MSG_BAD;
- rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to);
queue_iterate(rs_queue, nack, rs_queue_to_tx);
return MSG_CTL;
@@ -409,7 +401,6 @@ static int ftfw_recv(const struct nethdr *net)
* know anything about that data, we are unreliable until
* the helloing finishes */
queue_iterate(rs_queue, NULL, rs_queue_empty);
- rs_list_empty(STATE_SYNC(internal), 0, ~0U);
goto bypass;
}
@@ -480,10 +471,8 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj)
cn = (struct cache_ftfw *)
cache_get_extra(STATE_SYNC(internal), obj);
- if (!list_empty(&cn->rs_list)) {
- list_del_init(&cn->rs_list);
- rs_list_len--;
- }
+ if (queue_in(rs_queue, &cn->qnode))
+ queue_del(&cn->qnode);
switch(hello_state) {
case HELLO_INIT:
@@ -500,82 +489,77 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj)
}
cn->seq = ntohl(net->seq);
- list_add_tail(&cn->rs_list, &rs_list);
- rs_list_len++;
+ queue_add(rs_queue, &cn->qnode);
break;
}
}
-static int tx_queue_xmit(void *data1, const void *data2)
+static int tx_queue_xmit(struct queue_node *n, const void *data)
{
- struct nethdr *net = data1;
-
- if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
- nethdr_set_ack(net);
- } else if (IS_ALIVE(net)) {
- nethdr_set_ctl(net);
- } else {
- STATE_SYNC(error).msg_snd_malformed++;
- return 0;
- }
- HDR_HOST2NETWORK(net);
-
- dp("tx_queue sq: %u fl:%u len:%u\n",
- ntohl(net->seq), net->flags, ntohs(net->len));
-
- mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
- HDR_NETWORK2HOST(net);
+ switch(n->type) {
+ case Q_ELEM_CTL: {
+ struct nethdr *net = queue_node_data(n);
+
+ if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
+ nethdr_set_ack(net);
+ } else if (IS_ALIVE(net)) {
+ nethdr_set_ctl(net);
+ } else {
+ STATE_SYNC(error).msg_snd_malformed++;
+ return 0;
+ }
+ HDR_HOST2NETWORK(net);
- if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net))
- queue_add(rs_queue, net, net->len);
+ dp("tx_queue sq: %u fl:%u len:%u\n",
+ ntohl(net->seq), net->flags, ntohs(net->len));
- queue_del(tx_queue, net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ HDR_NETWORK2HOST(net);
- return 0;
-}
-
-static int tx_list_xmit(struct list_head *i, struct cache_object *obj, int type)
-{
- int ret;
- struct nethdr *net = BUILD_NETMSG(obj->ct, type);
+ queue_del(n);
+ if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net))
+ queue_add(rs_queue, n);
+ else
+ queue_object_free((struct queue_object *)n);
+ break;
+ }
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn;
+ struct cache_object *obj;
+ int type;
+ struct nethdr *net;
- dp("tx_list sq: %u fl:%u len:%u\n",
- ntohl(net->seq), net->flags, ntohs(net->len));
+ cn = (struct cache_ftfw *)n;
+ obj = cache_data_get_object(STATE_SYNC(internal), cn);
+ type = object_status_to_network_type(obj->status);
+ net = BUILD_NETMSG(obj->ct, type);
- list_del_init(i);
- tx_list_len--;
+ dp("tx_list sq: %u fl:%u len:%u\n",
+ ntohl(net->seq), net->flags, ntohs(net->len));
- ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
- ftfw_send(net, obj);
+ queue_del(n);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ ftfw_send(net, obj);
+ break;
+ }
+ }
- return ret;
+ return 0;
}
-static void ftfw_run(void)
+static void ftfw_run(fd_set *readfds)
{
- struct cache_ftfw *cn, *tmp;
-
- /* send messages in the tx_queue */
- queue_iterate(tx_queue, NULL, tx_queue_xmit);
-
- /* send conntracks in the tx_list */
- list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) {
- struct cache_object *obj;
-
- obj = cache_data_get_object(STATE_SYNC(internal), cn);
- if (alarm_pending(&obj->alarm))
- tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_DEL);
- else
- tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_UPD);
+ if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) {
+ queue_iterate(tx_queue, NULL, tx_queue_xmit);
+ add_alarm(&alive_alarm, 1, 0);
+ dp("tx_queue_len:%u rs_queue_len:%u\n",
+ queue_len(tx_queue), queue_len(rs_queue));
}
+}
- /* reset alive alarm */
- add_alarm(&alive_alarm, 1, 0);
-
- dp("tx_list_len:%u tx_queue_len:%u "
- "rs_list_len: %u rs_queue_len:%u\n",
- tx_list_len, queue_len(tx_queue),
- rs_list_len, queue_len(rs_queue));
+static int ftfw_register_fds(struct fds *fds)
+{
+ return register_fd(queue_get_eventfd(tx_queue), fds);
}
struct sync_mode sync_ftfw = {
@@ -588,4 +572,5 @@ struct sync_mode sync_ftfw = {
.recv = ftfw_recv,
.send = ftfw_send,
.run = ftfw_run,
+ .register_fds = ftfw_register_fds,
};
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 368984f..711f71b 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -242,12 +242,6 @@ static int init_sync(void)
return -1;
}
- STATE_SYNC(evfd) = create_evfd();
- if (STATE_SYNC(evfd) == NULL) {
- dlog(LOG_ERR, "cannot open evfd");
- return -1;
- }
-
/* initialization of multicast sequence generation */
STATE_SYNC(last_seq_sent) = time(NULL);
@@ -259,7 +253,10 @@ static int register_fds_sync(struct fds *fds)
if (register_fd(STATE_SYNC(mcast_server->fd), fds) == -1)
return -1;
- return register_fd(get_read_evfd(STATE_SYNC(evfd)), fds);
+ if (STATE_SYNC(sync)->register_fds)
+ return STATE_SYNC(sync)->register_fds(fds);
+
+ return 0;
}
static void run_sync(fd_set *readfds)
@@ -268,11 +265,8 @@ static void run_sync(fd_set *readfds)
if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds))
mcast_handler();
- if (FD_ISSET(get_read_evfd(STATE_SYNC(evfd)), readfds) &&
- STATE_SYNC(sync)->run) {
- read_evfd(STATE_SYNC(evfd));
- STATE_SYNC(sync)->run();
- }
+ if (STATE_SYNC(sync)->run)
+ STATE_SYNC(sync)->run(readfds);
/* flush pending messages */
mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
@@ -286,8 +280,6 @@ static void kill_sync(void)
mcast_server_destroy(STATE_SYNC(mcast_server));
mcast_client_destroy(STATE_SYNC(mcast_client));
- destroy_evfd(STATE_SYNC(evfd));
-
mcast_buffered_destroy();
if (STATE_SYNC(sync)->kill)
diff --git a/src/sync-notrack.c b/src/sync-notrack.c
index 2d3783e..40cc199 100644
--- a/src/sync-notrack.c
+++ b/src/sync-notrack.c
@@ -23,32 +23,26 @@
#include "network.h"
#include "log.h"
#include "cache.h"
-#include "event.h"
+#include "fds.h"
#include <string.h>
-static LIST_HEAD(tx_list);
-static unsigned int tx_list_len;
static struct queue *tx_queue;
struct cache_notrack {
- struct list_head tx_list;
+ struct queue_node qnode;
};
static void cache_notrack_add(struct cache_object *obj, void *data)
{
struct cache_notrack *cn = data;
- INIT_LIST_HEAD(&cn->tx_list);
+ queue_node_init(&cn->qnode, Q_ELEM_OBJ);
}
static void cache_notrack_del(struct cache_object *obj, void *data)
{
struct cache_notrack *cn = data;
-
- if (!list_empty(&cn->tx_list)) {
- list_del(&cn->tx_list);
- tx_list_len--;
- }
+ queue_del(&cn->qnode);
}
static struct cache_extra cache_notrack_extra = {
@@ -59,20 +53,25 @@ static struct cache_extra cache_notrack_extra = {
static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
{
- struct nethdr_ack ack = {
- .type = NET_T_CTL,
- .flags = flags,
- .from = from,
- .to = to,
- };
-
- queue_add(tx_queue, &ack, NETHDR_ACK_SIZ);
- write_evfd(STATE_SYNC(evfd));
+ struct queue_object *qobj;
+ struct nethdr_ack *ack;
+
+ qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
+ if (qobj == NULL)
+ return;
+
+ ack = (struct nethdr_ack *)qobj->data;
+ ack->type = NET_T_CTL;
+ ack->flags = flags;
+ ack->from = from;
+ ack->to = to;
+
+ queue_add(tx_queue, &qobj->qnode);
}
static int notrack_init(void)
{
- tx_queue = queue_create(~0U);
+ tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD);
if (tx_queue == NULL) {
dlog(LOG_ERR, "cannot create tx queue");
return -1;
@@ -90,16 +89,7 @@ static int do_cache_to_tx(void *data1, void *data2)
{
struct cache_object *obj = data2;
struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj);
-
- if (!list_empty(&cn->tx_list))
- return 0;
-
- /* add to tx list */
- list_add_tail(&cn->tx_list, &tx_list);
- tx_list_len++;
-
- write_evfd(STATE_SYNC(evfd));
-
+ queue_add(tx_queue, &cn->qnode);
return 0;
}
@@ -152,44 +142,49 @@ static int notrack_recv(const struct nethdr *net)
return ret;
}
-static int tx_queue_xmit(void *data1, const void *data2)
+static int tx_queue_xmit(struct queue_node *n, const void *data2)
{
- struct nethdr *net = data1;
- nethdr_set_ack(net);
- HDR_HOST2NETWORK(net);
- mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
- queue_del(tx_queue, net);
+ switch (n->type) {
+ case Q_ELEM_CTL: {
+ struct nethdr *net = queue_node_data(n);
+ if (IS_RESYNC(net))
+ nethdr_set_ack(net);
+ else
+ nethdr_set_ctl(net);
+ HDR_HOST2NETWORK(net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ queue_del(n);
+ queue_object_free((struct queue_object *)n);
+ break;
+ }
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn;
+ struct cache_object *obj;
+ int type;
+ struct nethdr *net;
+
+ cn = (struct cache_ftfw *)n;
+ obj = cache_data_get_object(STATE_SYNC(internal), cn);
+ type = object_status_to_network_type(obj->status);;
+ net = BUILD_NETMSG(obj->ct, type);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ queue_del(n);
+ break;
+ }
+ }
return 0;
}
-static int tx_list_xmit(struct list_head *i, struct cache_object *obj, int type)
+static void notrack_run(fd_set *readfds)
{
- int ret;
- struct nethdr *net = BUILD_NETMSG(obj->ct, type);
-
- list_del_init(i);
- tx_list_len--;
-
- ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
-
- return ret;
+ if (FD_ISSET(queue_get_eventfd(tx_queue), readfds))
+ queue_iterate(tx_queue, NULL, tx_queue_xmit);
}
-static void notrack_run(void)
+static int notrack_register_fds(struct fds *fds)
{
- struct cache_notrack *cn, *tmp;
-
- /* send messages in the tx_queue */
- queue_iterate(tx_queue, NULL, tx_queue_xmit);
-
- /* send conntracks in the tx_list */
- list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) {
- struct cache_object *obj;
-
- obj = cache_data_get_object(STATE_SYNC(internal), cn);
- tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_UPD);
- }
+ return register_fd(queue_get_eventfd(tx_queue), fds);
}
struct sync_mode sync_notrack = {
@@ -201,4 +196,5 @@ struct sync_mode sync_notrack = {
.local = notrack_local,
.recv = notrack_recv,
.run = notrack_run,
+ .register_fds = notrack_register_fds,
};