From e2af183ea7e5ea35a1582f40a01a7c49e83b31be Mon Sep 17 00:00:00 2001 From: Pablo Neira Ayuso Date: Thu, 15 Jan 2009 23:19:58 +0100 Subject: sync: unify tx_list and tx_queue into one single tx_queue This patch unifies the tx_list and the tx_queue to have only one transmission queue. Since the tx_list hold state objects and tx_queue control messages, I have introduced a queue node type that can be used to differenciate the kind of information that the node stores: object or control message. This patch also reworks the existing queue class to include a file descriptor that can be used to know if there are new data added to the queue (see QUEUE_F_EVFD flag). In this change, I have also modified the current evfd to make the file descriptor to make read operations non-blocking. Moreover, it keeps a counter that is used to know how many messages are inserted in the queue. Signed-off-by: Pablo Neira Ayuso --- include/conntrackd.h | 1 - include/queue.h | 51 +++++-- include/sync.h | 6 +- src/event.c | 20 +-- src/queue.c | 124 +++++++++-------- src/sync-ftfw.c | 367 ++++++++++++++++++++++++--------------------------- src/sync-mode.c | 20 +-- src/sync-notrack.c | 114 ++++++++-------- 8 files changed, 360 insertions(+), 343 deletions(-) diff --git a/include/conntrackd.h b/include/conntrackd.h index 67397b8..8cb520d 100644 --- a/include/conntrackd.h +++ b/include/conntrackd.h @@ -150,7 +150,6 @@ struct ct_sync_state { struct mcast_sock *mcast_server; /* multicast socket: incoming */ struct mcast_sock *mcast_client; /* multicast socket: outgoing */ - struct evfd *evfd; /* event fd */ struct sync_mode *sync; /* sync mode */ diff --git a/include/queue.h b/include/queue.h index 5a9cf39..ef56323 100644 --- a/include/queue.h +++ b/include/queue.h @@ -1,28 +1,53 @@ #ifndef _QUEUE_H_ #define _QUEUE_H_ +#include #include "linux_list.h" -struct queue { - size_t max_size; - size_t cur_size; - unsigned int num_elems; - struct list_head head; +struct queue_node { + struct list_head head; + uint32_t type; + struct queue *owner; + size_t size; }; -struct queue_node { - struct list_head head; - size_t size; - char data[0]; +enum { + Q_ELEM_OBJ = 0, + Q_ELEM_CTL = 1 +}; + +void queue_node_init(struct queue_node *n, int type); +void *queue_node_data(struct queue_node *n); + +struct queue_object { + struct queue_node qnode; + char data[0]; }; -struct queue *queue_create(size_t max_size); +struct queue_object *queue_object_new(int type, size_t size); +void queue_object_free(struct queue_object *obj); + +struct evfd; + +struct queue { + unsigned int max_elems; + unsigned int num_elems; + uint32_t flags; + struct list_head head; + struct evfd *evfd; +}; + +#define QUEUE_F_EVFD (1U << 0) + +struct queue *queue_create(int max_objects, unsigned int flags); void queue_destroy(struct queue *b); unsigned int queue_len(const struct queue *b); -int queue_add(struct queue *b, const void *data, size_t size); -void queue_del(struct queue *b, void *data); +int queue_add(struct queue *b, struct queue_node *n); +int queue_del(struct queue_node *n); +int queue_in(struct queue *b, struct queue_node *n); void queue_iterate(struct queue *b, const void *data, - int (*iterate)(void *data1, const void *data2)); + int (*iterate)(struct queue_node *n, const void *data2)); +int queue_get_eventfd(struct queue *b); #endif diff --git a/include/sync.h b/include/sync.h index 60c9fae..9a9540c 100644 --- a/include/sync.h +++ b/include/sync.h @@ -1,8 +1,11 @@ #ifndef _SYNC_HOOKS_H_ #define _SYNC_HOOKS_H_ +#include + struct nethdr; struct cache_object; +struct fds; struct sync_mode { int internal_cache_flags; @@ -15,7 +18,8 @@ struct sync_mode { int (*local)(int fd, int type, void *data); int (*recv)(const struct nethdr *net); void (*send)(struct nethdr *net, struct cache_object *obj); - void (*run)(void); + void (*run)(fd_set *readfds); + int (*register_fds)(struct fds *fds); }; extern struct sync_mode sync_alarm; diff --git a/src/event.c b/src/event.c index ed78835..d1dfe72 100644 --- a/src/event.c +++ b/src/event.c @@ -17,6 +17,8 @@ */ #include #include +#include +#include #include "event.h" @@ -37,6 +39,7 @@ struct evfd *create_evfd(void) free(e); return NULL; } + fcntl(e->fds[0], F_SETFL, O_NONBLOCK); return e; } @@ -55,19 +58,20 @@ int get_read_evfd(struct evfd *evfd) int write_evfd(struct evfd *evfd) { - int data = 0; + int data = 0, ret = 0; - if (evfd->read) - return 0; + if (evfd->read == 0) + ret = write(evfd->fds[1], &data, sizeof(data)); + evfd->read++; - evfd->read = 1; - return write(evfd->fds[1], &data, sizeof(data)); + return ret; } int read_evfd(struct evfd *evfd) { - int data; + int data, ret = 0; - evfd->read = 0; - return read(evfd->fds[0], &data, sizeof(data)); + if (--evfd->read == 0) + ret = read(evfd->fds[0], &data, sizeof(data)); + return ret; } diff --git a/src/queue.c b/src/queue.c index cdd70ae..cffcc93 100644 --- a/src/queue.c +++ b/src/queue.c @@ -1,5 +1,5 @@ /* - * (C) 2006-2008 by Pablo Neira Ayuso + * (C) 2006-2009 by Pablo Neira Ayuso * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -17,110 +17,122 @@ */ #include "queue.h" +#include "event.h" #include #include #include -struct queue *queue_create(size_t max_size) +struct queue *queue_create(int max_objects, unsigned int flags) { struct queue *b; - b = malloc(sizeof(struct queue)); + b = calloc(sizeof(struct queue), 1); if (b == NULL) return NULL; - memset(b, 0, sizeof(struct queue)); - b->max_size = max_size; + b->max_elems = max_objects; INIT_LIST_HEAD(&b->head); + b->flags = flags; + + if (flags & QUEUE_F_EVFD) { + b->evfd = create_evfd(); + if (b->evfd == NULL) { + free(b); + return NULL; + } + } return b; } void queue_destroy(struct queue *b) { - struct list_head *i, *tmp; - struct queue_node *node; - - /* XXX: set cur_size and num_elems */ - list_for_each_safe(i, tmp, &b->head) { - node = (struct queue_node *) i; - list_del(i); - free(node); - } + if (b->flags & QUEUE_F_EVFD) + destroy_evfd(b->evfd); free(b); } -static struct queue_node *queue_node_create(const void *data, size_t size) +void queue_node_init(struct queue_node *n, int type) { - struct queue_node *n; + INIT_LIST_HEAD(&n->head); + n->type = type; +} - n = malloc(sizeof(struct queue_node) + size); - if (n == NULL) +void *queue_node_data(struct queue_node *n) +{ + return ((char *)n) + sizeof(struct queue_node); +} + +struct queue_object *queue_object_new(int type, size_t size) +{ + struct queue_object *obj; + + obj = calloc(sizeof(struct queue_object) + size, 1); + if (obj == NULL) return NULL; - n->size = size; - memcpy(n->data, data, size); + obj->qnode.size = size; + queue_node_init(&obj->qnode, type); - return n; + return obj; } -int queue_add(struct queue *b, const void *data, size_t size) +void queue_object_free(struct queue_object *obj) { - int ret = 0; - struct queue_node *n; - - /* does it fit this queue? */ - if (size > b->max_size) { - errno = ENOSPC; - ret = -1; - goto err; - } + free(obj); +} -retry: - /* queue is full: kill the oldest entry */ - if (b->cur_size + size > b->max_size) { - n = (struct queue_node *) b->head.prev; - list_del(b->head.prev); - b->cur_size -= n->size; - free(n); - goto retry; - } +int queue_add(struct queue *b, struct queue_node *n) +{ + if (!list_empty(&n->head)) + return 0; - n = queue_node_create(data, size); - if (n == NULL) { - ret = -1; - goto err; + if (b->num_elems >= b->max_elems) { + errno = ENOSPC; + return -1; } - + n->owner = b; list_add_tail(&n->head, &b->head); - b->cur_size += size; b->num_elems++; + if (b->evfd) + write_evfd(b->evfd); + return 1; +} -err: - return ret; +int queue_del(struct queue_node *n) +{ + if (list_empty(&n->head)) + return 0; + + list_del_init(&n->head); + n->owner->num_elems--; + if (n->owner->evfd) + read_evfd(n->owner->evfd); + n->owner = NULL; + return 1; } -void queue_del(struct queue *b, void *data) +int queue_in(struct queue *b, struct queue_node *n) { - struct queue_node *n = container_of(data, struct queue_node, data); + return b == n->owner; +} - list_del(&n->head); - b->cur_size -= n->size; - b->num_elems--; - free(n); +int queue_get_eventfd(struct queue *b) +{ + return get_read_evfd(b->evfd); } void queue_iterate(struct queue *b, const void *data, - int (*iterate)(void *data1, const void *data2)) + int (*iterate)(struct queue_node *n, const void *data2)) { struct list_head *i, *tmp; struct queue_node *n; list_for_each_safe(i, tmp, &b->head) { n = (struct queue_node *) i; - if (iterate(n->data, data)) + if (iterate(n, data)) break; } } diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index bddc18c..bb53849 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -24,7 +24,7 @@ #include "alarm.h" #include "log.h" #include "cache.h" -#include "event.h" +#include "fds.h" #include @@ -34,12 +34,8 @@ #define dp(...) #endif -static LIST_HEAD(rs_list); -static LIST_HEAD(tx_list); -static unsigned int rs_list_len; -static unsigned int tx_list_len; -static struct queue *rs_queue; -static struct queue *tx_queue; +struct queue *tx_queue; +struct queue *rs_queue; static uint32_t exp_seq; static uint32_t window; static uint32_t ack_from; @@ -58,8 +54,7 @@ static int say_hello_back; #define ALIVE_INT 1 struct cache_ftfw { - struct list_head rs_list; - struct list_head tx_list; + struct queue_node qnode; uint32_t seq; }; @@ -67,24 +62,13 @@ static void cache_ftfw_add(struct cache_object *obj, void *data) { struct cache_ftfw *cn = data; /* These nodes are not inserted in the list */ - INIT_LIST_HEAD(&cn->rs_list); - INIT_LIST_HEAD(&cn->tx_list); + queue_node_init(&cn->qnode, Q_ELEM_OBJ); } static void cache_ftfw_del(struct cache_object *obj, void *data) { struct cache_ftfw *cn = data; - - /* this node is already out of the list */ - if (!list_empty(&cn->rs_list)) { - /* no need for list_del_init since the entry is destroyed */ - list_del(&cn->rs_list); - rs_list_len--; - } - if (!list_empty(&cn->tx_list)) { - list_del(&cn->tx_list); - tx_list_len--; - } + queue_del(&cn->qnode); } static struct cache_extra cache_ftfw_extra = { @@ -95,54 +79,64 @@ static struct cache_extra cache_ftfw_extra = { static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) { - struct nethdr_ack ack = { - .type = NET_T_CTL, - .flags = flags, - .from = from, - .to = to, - }; + struct queue_object *qobj; + struct nethdr_ack *ack; + + qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); + if (qobj == NULL) + return; + + ack = (struct nethdr_ack *)qobj->data; + ack->type = NET_T_CTL; + ack->flags = flags; + ack->from = from; + ack->to = to; switch(hello_state) { case HELLO_INIT: hello_state = HELLO_SAY; /* fall through */ case HELLO_SAY: - ack.flags |= NET_F_HELLO; + ack->flags |= NET_F_HELLO; break; } if (say_hello_back) { - ack.flags |= NET_F_HELLO_BACK; + ack->flags |= NET_F_HELLO_BACK; say_hello_back = 0; } - queue_add(tx_queue, &ack, NETHDR_ACK_SIZ); - write_evfd(STATE_SYNC(evfd)); + queue_add(tx_queue, &qobj->qnode); } static void tx_queue_add_ctlmsg2(uint32_t flags) { - struct nethdr ctl = { - .type = NET_T_CTL, - .flags = flags, - }; + struct queue_object *qobj; + struct nethdr *ctl; + + qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); + if (qobj == NULL) + return; + + ctl = (struct nethdr *)qobj->data; + ctl->type = NET_T_CTL; + ctl->flags = flags; switch(hello_state) { case HELLO_INIT: hello_state = HELLO_SAY; /* fall through */ case HELLO_SAY: - ctl.flags |= NET_F_HELLO; + ctl->flags |= NET_F_HELLO; break; } if (say_hello_back) { - ctl.flags |= NET_F_HELLO_BACK; + ctl->flags |= NET_F_HELLO_BACK; say_hello_back = 0; } - queue_add(tx_queue, &ctl, NETHDR_SIZ); - write_evfd(STATE_SYNC(evfd)); + queue_add(tx_queue, &qobj->qnode); } /* this function is called from the alarm framework */ @@ -156,17 +150,18 @@ static void do_alive_alarm(struct alarm_block *a, void *data) ack_from_set = 0; } else tx_queue_add_ctlmsg2(NET_F_ALIVE); + + add_alarm(&alive_alarm, ALIVE_INT, 0); } static int ftfw_init(void) { - tx_queue = queue_create(CONFIG(resend_queue_size)); + tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD); if (tx_queue == NULL) { dlog(LOG_ERR, "cannot create tx queue"); return -1; } - - rs_queue = queue_create(CONFIG(resend_queue_size)); + rs_queue = queue_create(INT_MAX, 0); if (rs_queue == NULL) { dlog(LOG_ERR, "cannot create rs queue"); return -1; @@ -192,45 +187,47 @@ static int do_cache_to_tx(void *data1, void *data2) struct cache_object *obj = data2; struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj); - /* repeated request for resync? */ - if (!list_empty(&cn->tx_list)) - return 0; + if (queue_in(rs_queue, &cn->qnode)) + queue_del(&cn->qnode); - /* add to tx list */ - list_add_tail(&cn->tx_list, &tx_list); - tx_list_len++; - write_evfd(STATE_SYNC(evfd)); + queue_add(tx_queue, &cn->qnode); return 0; } -static int debug_rs_queue_dump_step(void *data1, const void *data2) +static int rs_queue_dump(struct queue_node *n, const void *data2) { - struct nethdr_ack *net = data1; const int *fd = data2; char buf[512]; int size; - size = sprintf(buf, "seq:%u flags:%u\n", net->seq, net->flags); + switch(n->type) { + case Q_ELEM_CTL: { + struct nethdr *net = queue_node_data(n); + size = sprintf(buf, "control -> seq:%u flags:%u\n", + net->seq, net->flags); + break; + } + case Q_ELEM_OBJ: { + struct cache_ftfw *cn = (struct cache_ftfw *) n; + size = sprintf(buf, "object -> seq:%u\n", cn->seq); + break; + } + default: + return 0; + } send(*fd, buf, size, 0); return 0; } static void debug_rs_dump(int fd) { - struct cache_ftfw *cn, *tmp; char buf[512]; int size; - size = sprintf(buf, "resent list (len=%u):\n", rs_list_len); - send(fd, buf, size, 0); - list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { - size = sprintf(buf, "seq:%u\n", cn->seq); - send(fd, buf, size, 0); - } - size = sprintf(buf, "\nresent queue (len=%u):\n", queue_len(rs_queue)); + size = sprintf(buf, "resent queue (len=%u):\n", queue_len(rs_queue)); send(fd, buf, size, 0); - queue_iterate(rs_queue, &fd, debug_rs_queue_dump_step); + queue_iterate(rs_queue, &fd, rs_queue_dump); } static int ftfw_local(int fd, int type, void *data) @@ -257,87 +254,84 @@ static int ftfw_local(int fd, int type, void *data) return ret; } -static int rs_queue_to_tx(void *data1, const void *data2) +static int rs_queue_to_tx(struct queue_node *n, const void *data) { - struct nethdr_ack *net = data1; - const struct nethdr_ack *nack = data2; - - if (before(net->seq, nack->from)) - return 0; /* continue */ - else if (after(net->seq, nack->to)) - return 1; /* break */ - - dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", - net->seq, net->flags, net->len); - queue_add(tx_queue, net, net->len); - write_evfd(STATE_SYNC(evfd)); - queue_del(rs_queue, net); - return 0; -} + const struct nethdr_ack *nack = data; -static int rs_queue_empty(void *data1, const void *data2) -{ - struct nethdr *net = data1; - const struct nethdr_ack *h = data2; + switch(n->type) { + case Q_ELEM_CTL: { + struct nethdr_ack *net = queue_node_data(n); - if (h == NULL) { - dp("inconditional remove from queue (seq=%u)\n", net->seq); - queue_del(rs_queue, data1); - return 0; + if (before(net->seq, nack->from)) + return 0; /* continue */ + else if (after(net->seq, nack->to)) + return 1; /* break */ + + dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", + net->seq, net->flags, net->len); + + queue_del(n); + queue_add(tx_queue, n); + break; } + case Q_ELEM_OBJ: { + struct cache_ftfw *cn; - if (before(net->seq, h->from)) - return 0; /* continue */ - else if (after(net->seq, h->to)) - return 1; /* break */ + cn = (struct cache_ftfw *) n; + if (before(cn->seq, nack->from)) + return 0; + else if (after(cn->seq, nack->to)) + return 1; - dp("remove from queue (seq=%u)\n", net->seq); - queue_del(rs_queue, data1); + dp("resending nack'ed (oldseq=%u)\n", cn->seq); + + queue_del(n); + queue_add(tx_queue, n); + break; + } + } return 0; } -static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to) +static int rs_queue_empty(struct queue_node *n, const void *data) { - struct cache_ftfw *cn, *tmp; + const struct nethdr_ack *h = data; - list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { - struct cache_object *obj;; - - obj = cache_data_get_object(STATE_SYNC(internal), cn); - if (before(cn->seq, from)) - continue; - else if (after(cn->seq, to)) - break; + if (h == NULL) { + dp("inconditional remove from queue (seq=%u)\n", net->seq); + queue_del(n); + return 0; + } - dp("resending nack'ed (oldseq=%u)\n", cn->seq); - list_del_init(&cn->rs_list); - rs_list_len--; - /* we received a request for resync before this nack? */ - if (list_empty(&cn->tx_list)) { - list_add_tail(&cn->tx_list, &tx_list); - tx_list_len++; - } - write_evfd(STATE_SYNC(evfd)); - } -} + switch(n->type) { + case Q_ELEM_CTL: { + struct nethdr_ack *net = queue_node_data(n); -static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to) -{ - struct cache_ftfw *cn, *tmp; + if (before(net->seq, h->from)) + return 0; /* continue */ + else if (after(net->seq, h->to)) + return 1; /* break */ - list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { - struct cache_object *obj; + dp("remove from queue (seq=%u)\n", net->seq); + queue_del(n); + queue_object_free((struct queue_object *)n); + break; + } + case Q_ELEM_OBJ: { + struct cache_ftfw *cn; - obj = cache_data_get_object(STATE_SYNC(internal), cn); - if (before(cn->seq, from)) - continue; - else if (after(cn->seq, to)) - break; + cn = (struct cache_ftfw *) n; + if (before(cn->seq, h->from)) + return 0; + else if (after(cn->seq, h->to)) + return 1; dp("queue: deleting from queue (seq=%u)\n", cn->seq); - list_del_init(&cn->rs_list); - rs_list_len--; + queue_del(n); + break; } + } + return 0; } static int digest_msg(const struct nethdr *net) @@ -351,7 +345,6 @@ static int digest_msg(const struct nethdr *net) if (before(h->to, h->from)) return MSG_BAD; - rs_list_empty(STATE_SYNC(internal), h->from, h->to); queue_iterate(rs_queue, h, rs_queue_empty); return MSG_CTL; @@ -361,7 +354,6 @@ static int digest_msg(const struct nethdr *net) if (before(nack->to, nack->from)) return MSG_BAD; - rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to); queue_iterate(rs_queue, nack, rs_queue_to_tx); return MSG_CTL; @@ -409,7 +401,6 @@ static int ftfw_recv(const struct nethdr *net) * know anything about that data, we are unreliable until * the helloing finishes */ queue_iterate(rs_queue, NULL, rs_queue_empty); - rs_list_empty(STATE_SYNC(internal), 0, ~0U); goto bypass; } @@ -480,10 +471,8 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj) cn = (struct cache_ftfw *) cache_get_extra(STATE_SYNC(internal), obj); - if (!list_empty(&cn->rs_list)) { - list_del_init(&cn->rs_list); - rs_list_len--; - } + if (queue_in(rs_queue, &cn->qnode)) + queue_del(&cn->qnode); switch(hello_state) { case HELLO_INIT: @@ -500,82 +489,77 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj) } cn->seq = ntohl(net->seq); - list_add_tail(&cn->rs_list, &rs_list); - rs_list_len++; + queue_add(rs_queue, &cn->qnode); break; } } -static int tx_queue_xmit(void *data1, const void *data2) +static int tx_queue_xmit(struct queue_node *n, const void *data) { - struct nethdr *net = data1; - - if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { - nethdr_set_ack(net); - } else if (IS_ALIVE(net)) { - nethdr_set_ctl(net); - } else { - STATE_SYNC(error).msg_snd_malformed++; - return 0; - } - HDR_HOST2NETWORK(net); - - dp("tx_queue sq: %u fl:%u len:%u\n", - ntohl(net->seq), net->flags, ntohs(net->len)); - - mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); - HDR_NETWORK2HOST(net); + switch(n->type) { + case Q_ELEM_CTL: { + struct nethdr *net = queue_node_data(n); + + if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { + nethdr_set_ack(net); + } else if (IS_ALIVE(net)) { + nethdr_set_ctl(net); + } else { + STATE_SYNC(error).msg_snd_malformed++; + return 0; + } + HDR_HOST2NETWORK(net); - if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) - queue_add(rs_queue, net, net->len); + dp("tx_queue sq: %u fl:%u len:%u\n", + ntohl(net->seq), net->flags, ntohs(net->len)); - queue_del(tx_queue, net); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + HDR_NETWORK2HOST(net); - return 0; -} - -static int tx_list_xmit(struct list_head *i, struct cache_object *obj, int type) -{ - int ret; - struct nethdr *net = BUILD_NETMSG(obj->ct, type); + queue_del(n); + if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) + queue_add(rs_queue, n); + else + queue_object_free((struct queue_object *)n); + break; + } + case Q_ELEM_OBJ: { + struct cache_ftfw *cn; + struct cache_object *obj; + int type; + struct nethdr *net; - dp("tx_list sq: %u fl:%u len:%u\n", - ntohl(net->seq), net->flags, ntohs(net->len)); + cn = (struct cache_ftfw *)n; + obj = cache_data_get_object(STATE_SYNC(internal), cn); + type = object_status_to_network_type(obj->status); + net = BUILD_NETMSG(obj->ct, type); - list_del_init(i); - tx_list_len--; + dp("tx_list sq: %u fl:%u len:%u\n", + ntohl(net->seq), net->flags, ntohs(net->len)); - ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); - ftfw_send(net, obj); + queue_del(n); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + ftfw_send(net, obj); + break; + } + } - return ret; + return 0; } -static void ftfw_run(void) +static void ftfw_run(fd_set *readfds) { - struct cache_ftfw *cn, *tmp; - - /* send messages in the tx_queue */ - queue_iterate(tx_queue, NULL, tx_queue_xmit); - - /* send conntracks in the tx_list */ - list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) { - struct cache_object *obj; - - obj = cache_data_get_object(STATE_SYNC(internal), cn); - if (alarm_pending(&obj->alarm)) - tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_DEL); - else - tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_UPD); + if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) { + queue_iterate(tx_queue, NULL, tx_queue_xmit); + add_alarm(&alive_alarm, 1, 0); + dp("tx_queue_len:%u rs_queue_len:%u\n", + queue_len(tx_queue), queue_len(rs_queue)); } +} - /* reset alive alarm */ - add_alarm(&alive_alarm, 1, 0); - - dp("tx_list_len:%u tx_queue_len:%u " - "rs_list_len: %u rs_queue_len:%u\n", - tx_list_len, queue_len(tx_queue), - rs_list_len, queue_len(rs_queue)); +static int ftfw_register_fds(struct fds *fds) +{ + return register_fd(queue_get_eventfd(tx_queue), fds); } struct sync_mode sync_ftfw = { @@ -588,4 +572,5 @@ struct sync_mode sync_ftfw = { .recv = ftfw_recv, .send = ftfw_send, .run = ftfw_run, + .register_fds = ftfw_register_fds, }; diff --git a/src/sync-mode.c b/src/sync-mode.c index 368984f..711f71b 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -242,12 +242,6 @@ static int init_sync(void) return -1; } - STATE_SYNC(evfd) = create_evfd(); - if (STATE_SYNC(evfd) == NULL) { - dlog(LOG_ERR, "cannot open evfd"); - return -1; - } - /* initialization of multicast sequence generation */ STATE_SYNC(last_seq_sent) = time(NULL); @@ -259,7 +253,10 @@ static int register_fds_sync(struct fds *fds) if (register_fd(STATE_SYNC(mcast_server->fd), fds) == -1) return -1; - return register_fd(get_read_evfd(STATE_SYNC(evfd)), fds); + if (STATE_SYNC(sync)->register_fds) + return STATE_SYNC(sync)->register_fds(fds); + + return 0; } static void run_sync(fd_set *readfds) @@ -268,11 +265,8 @@ static void run_sync(fd_set *readfds) if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds)) mcast_handler(); - if (FD_ISSET(get_read_evfd(STATE_SYNC(evfd)), readfds) && - STATE_SYNC(sync)->run) { - read_evfd(STATE_SYNC(evfd)); - STATE_SYNC(sync)->run(); - } + if (STATE_SYNC(sync)->run) + STATE_SYNC(sync)->run(readfds); /* flush pending messages */ mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client)); @@ -286,8 +280,6 @@ static void kill_sync(void) mcast_server_destroy(STATE_SYNC(mcast_server)); mcast_client_destroy(STATE_SYNC(mcast_client)); - destroy_evfd(STATE_SYNC(evfd)); - mcast_buffered_destroy(); if (STATE_SYNC(sync)->kill) diff --git a/src/sync-notrack.c b/src/sync-notrack.c index 2d3783e..40cc199 100644 --- a/src/sync-notrack.c +++ b/src/sync-notrack.c @@ -23,32 +23,26 @@ #include "network.h" #include "log.h" #include "cache.h" -#include "event.h" +#include "fds.h" #include -static LIST_HEAD(tx_list); -static unsigned int tx_list_len; static struct queue *tx_queue; struct cache_notrack { - struct list_head tx_list; + struct queue_node qnode; }; static void cache_notrack_add(struct cache_object *obj, void *data) { struct cache_notrack *cn = data; - INIT_LIST_HEAD(&cn->tx_list); + queue_node_init(&cn->qnode, Q_ELEM_OBJ); } static void cache_notrack_del(struct cache_object *obj, void *data) { struct cache_notrack *cn = data; - - if (!list_empty(&cn->tx_list)) { - list_del(&cn->tx_list); - tx_list_len--; - } + queue_del(&cn->qnode); } static struct cache_extra cache_notrack_extra = { @@ -59,20 +53,25 @@ static struct cache_extra cache_notrack_extra = { static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) { - struct nethdr_ack ack = { - .type = NET_T_CTL, - .flags = flags, - .from = from, - .to = to, - }; - - queue_add(tx_queue, &ack, NETHDR_ACK_SIZ); - write_evfd(STATE_SYNC(evfd)); + struct queue_object *qobj; + struct nethdr_ack *ack; + + qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); + if (qobj == NULL) + return; + + ack = (struct nethdr_ack *)qobj->data; + ack->type = NET_T_CTL; + ack->flags = flags; + ack->from = from; + ack->to = to; + + queue_add(tx_queue, &qobj->qnode); } static int notrack_init(void) { - tx_queue = queue_create(~0U); + tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD); if (tx_queue == NULL) { dlog(LOG_ERR, "cannot create tx queue"); return -1; @@ -90,16 +89,7 @@ static int do_cache_to_tx(void *data1, void *data2) { struct cache_object *obj = data2; struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj); - - if (!list_empty(&cn->tx_list)) - return 0; - - /* add to tx list */ - list_add_tail(&cn->tx_list, &tx_list); - tx_list_len++; - - write_evfd(STATE_SYNC(evfd)); - + queue_add(tx_queue, &cn->qnode); return 0; } @@ -152,44 +142,49 @@ static int notrack_recv(const struct nethdr *net) return ret; } -static int tx_queue_xmit(void *data1, const void *data2) +static int tx_queue_xmit(struct queue_node *n, const void *data2) { - struct nethdr *net = data1; - nethdr_set_ack(net); - HDR_HOST2NETWORK(net); - mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); - queue_del(tx_queue, net); + switch (n->type) { + case Q_ELEM_CTL: { + struct nethdr *net = queue_node_data(n); + if (IS_RESYNC(net)) + nethdr_set_ack(net); + else + nethdr_set_ctl(net); + HDR_HOST2NETWORK(net); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + queue_del(n); + queue_object_free((struct queue_object *)n); + break; + } + case Q_ELEM_OBJ: { + struct cache_ftfw *cn; + struct cache_object *obj; + int type; + struct nethdr *net; + + cn = (struct cache_ftfw *)n; + obj = cache_data_get_object(STATE_SYNC(internal), cn); + type = object_status_to_network_type(obj->status);; + net = BUILD_NETMSG(obj->ct, type); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + queue_del(n); + break; + } + } return 0; } -static int tx_list_xmit(struct list_head *i, struct cache_object *obj, int type) +static void notrack_run(fd_set *readfds) { - int ret; - struct nethdr *net = BUILD_NETMSG(obj->ct, type); - - list_del_init(i); - tx_list_len--; - - ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); - - return ret; + if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) + queue_iterate(tx_queue, NULL, tx_queue_xmit); } -static void notrack_run(void) +static int notrack_register_fds(struct fds *fds) { - struct cache_notrack *cn, *tmp; - - /* send messages in the tx_queue */ - queue_iterate(tx_queue, NULL, tx_queue_xmit); - - /* send conntracks in the tx_list */ - list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) { - struct cache_object *obj; - - obj = cache_data_get_object(STATE_SYNC(internal), cn); - tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_UPD); - } + return register_fd(queue_get_eventfd(tx_queue), fds); } struct sync_mode sync_notrack = { @@ -201,4 +196,5 @@ struct sync_mode sync_notrack = { .local = notrack_local, .recv = notrack_recv, .run = notrack_run, + .register_fds = notrack_register_fds, }; -- cgit v1.2.3