From e2af183ea7e5ea35a1582f40a01a7c49e83b31be Mon Sep 17 00:00:00 2001 From: Pablo Neira Ayuso Date: Thu, 15 Jan 2009 23:19:58 +0100 Subject: sync: unify tx_list and tx_queue into one single tx_queue This patch unifies the tx_list and the tx_queue to have only one transmission queue. Since the tx_list hold state objects and tx_queue control messages, I have introduced a queue node type that can be used to differenciate the kind of information that the node stores: object or control message. This patch also reworks the existing queue class to include a file descriptor that can be used to know if there are new data added to the queue (see QUEUE_F_EVFD flag). In this change, I have also modified the current evfd to make the file descriptor to make read operations non-blocking. Moreover, it keeps a counter that is used to know how many messages are inserted in the queue. Signed-off-by: Pablo Neira Ayuso --- src/sync-ftfw.c | 367 +++++++++++++++++++++++++++----------------------------- 1 file changed, 176 insertions(+), 191 deletions(-) (limited to 'src/sync-ftfw.c') diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index bddc18c..bb53849 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -24,7 +24,7 @@ #include "alarm.h" #include "log.h" #include "cache.h" -#include "event.h" +#include "fds.h" #include @@ -34,12 +34,8 @@ #define dp(...) #endif -static LIST_HEAD(rs_list); -static LIST_HEAD(tx_list); -static unsigned int rs_list_len; -static unsigned int tx_list_len; -static struct queue *rs_queue; -static struct queue *tx_queue; +struct queue *tx_queue; +struct queue *rs_queue; static uint32_t exp_seq; static uint32_t window; static uint32_t ack_from; @@ -58,8 +54,7 @@ static int say_hello_back; #define ALIVE_INT 1 struct cache_ftfw { - struct list_head rs_list; - struct list_head tx_list; + struct queue_node qnode; uint32_t seq; }; @@ -67,24 +62,13 @@ static void cache_ftfw_add(struct cache_object *obj, void *data) { struct cache_ftfw *cn = data; /* These nodes are not inserted in the list */ - INIT_LIST_HEAD(&cn->rs_list); - INIT_LIST_HEAD(&cn->tx_list); + queue_node_init(&cn->qnode, Q_ELEM_OBJ); } static void cache_ftfw_del(struct cache_object *obj, void *data) { struct cache_ftfw *cn = data; - - /* this node is already out of the list */ - if (!list_empty(&cn->rs_list)) { - /* no need for list_del_init since the entry is destroyed */ - list_del(&cn->rs_list); - rs_list_len--; - } - if (!list_empty(&cn->tx_list)) { - list_del(&cn->tx_list); - tx_list_len--; - } + queue_del(&cn->qnode); } static struct cache_extra cache_ftfw_extra = { @@ -95,54 +79,64 @@ static struct cache_extra cache_ftfw_extra = { static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) { - struct nethdr_ack ack = { - .type = NET_T_CTL, - .flags = flags, - .from = from, - .to = to, - }; + struct queue_object *qobj; + struct nethdr_ack *ack; + + qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); + if (qobj == NULL) + return; + + ack = (struct nethdr_ack *)qobj->data; + ack->type = NET_T_CTL; + ack->flags = flags; + ack->from = from; + ack->to = to; switch(hello_state) { case HELLO_INIT: hello_state = HELLO_SAY; /* fall through */ case HELLO_SAY: - ack.flags |= NET_F_HELLO; + ack->flags |= NET_F_HELLO; break; } if (say_hello_back) { - ack.flags |= NET_F_HELLO_BACK; + ack->flags |= NET_F_HELLO_BACK; say_hello_back = 0; } - queue_add(tx_queue, &ack, NETHDR_ACK_SIZ); - write_evfd(STATE_SYNC(evfd)); + queue_add(tx_queue, &qobj->qnode); } static void tx_queue_add_ctlmsg2(uint32_t flags) { - struct nethdr ctl = { - .type = NET_T_CTL, - .flags = flags, - }; + struct queue_object *qobj; + struct nethdr *ctl; + + qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); + if (qobj == NULL) + return; + + ctl = (struct nethdr *)qobj->data; + ctl->type = NET_T_CTL; + ctl->flags = flags; switch(hello_state) { case HELLO_INIT: hello_state = HELLO_SAY; /* fall through */ case HELLO_SAY: - ctl.flags |= NET_F_HELLO; + ctl->flags |= NET_F_HELLO; break; } if (say_hello_back) { - ctl.flags |= NET_F_HELLO_BACK; + ctl->flags |= NET_F_HELLO_BACK; say_hello_back = 0; } - queue_add(tx_queue, &ctl, NETHDR_SIZ); - write_evfd(STATE_SYNC(evfd)); + queue_add(tx_queue, &qobj->qnode); } /* this function is called from the alarm framework */ @@ -156,17 +150,18 @@ static void do_alive_alarm(struct alarm_block *a, void *data) ack_from_set = 0; } else tx_queue_add_ctlmsg2(NET_F_ALIVE); + + add_alarm(&alive_alarm, ALIVE_INT, 0); } static int ftfw_init(void) { - tx_queue = queue_create(CONFIG(resend_queue_size)); + tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD); if (tx_queue == NULL) { dlog(LOG_ERR, "cannot create tx queue"); return -1; } - - rs_queue = queue_create(CONFIG(resend_queue_size)); + rs_queue = queue_create(INT_MAX, 0); if (rs_queue == NULL) { dlog(LOG_ERR, "cannot create rs queue"); return -1; @@ -192,45 +187,47 @@ static int do_cache_to_tx(void *data1, void *data2) struct cache_object *obj = data2; struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), obj); - /* repeated request for resync? */ - if (!list_empty(&cn->tx_list)) - return 0; + if (queue_in(rs_queue, &cn->qnode)) + queue_del(&cn->qnode); - /* add to tx list */ - list_add_tail(&cn->tx_list, &tx_list); - tx_list_len++; - write_evfd(STATE_SYNC(evfd)); + queue_add(tx_queue, &cn->qnode); return 0; } -static int debug_rs_queue_dump_step(void *data1, const void *data2) +static int rs_queue_dump(struct queue_node *n, const void *data2) { - struct nethdr_ack *net = data1; const int *fd = data2; char buf[512]; int size; - size = sprintf(buf, "seq:%u flags:%u\n", net->seq, net->flags); + switch(n->type) { + case Q_ELEM_CTL: { + struct nethdr *net = queue_node_data(n); + size = sprintf(buf, "control -> seq:%u flags:%u\n", + net->seq, net->flags); + break; + } + case Q_ELEM_OBJ: { + struct cache_ftfw *cn = (struct cache_ftfw *) n; + size = sprintf(buf, "object -> seq:%u\n", cn->seq); + break; + } + default: + return 0; + } send(*fd, buf, size, 0); return 0; } static void debug_rs_dump(int fd) { - struct cache_ftfw *cn, *tmp; char buf[512]; int size; - size = sprintf(buf, "resent list (len=%u):\n", rs_list_len); - send(fd, buf, size, 0); - list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { - size = sprintf(buf, "seq:%u\n", cn->seq); - send(fd, buf, size, 0); - } - size = sprintf(buf, "\nresent queue (len=%u):\n", queue_len(rs_queue)); + size = sprintf(buf, "resent queue (len=%u):\n", queue_len(rs_queue)); send(fd, buf, size, 0); - queue_iterate(rs_queue, &fd, debug_rs_queue_dump_step); + queue_iterate(rs_queue, &fd, rs_queue_dump); } static int ftfw_local(int fd, int type, void *data) @@ -257,87 +254,84 @@ static int ftfw_local(int fd, int type, void *data) return ret; } -static int rs_queue_to_tx(void *data1, const void *data2) +static int rs_queue_to_tx(struct queue_node *n, const void *data) { - struct nethdr_ack *net = data1; - const struct nethdr_ack *nack = data2; - - if (before(net->seq, nack->from)) - return 0; /* continue */ - else if (after(net->seq, nack->to)) - return 1; /* break */ - - dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", - net->seq, net->flags, net->len); - queue_add(tx_queue, net, net->len); - write_evfd(STATE_SYNC(evfd)); - queue_del(rs_queue, net); - return 0; -} + const struct nethdr_ack *nack = data; -static int rs_queue_empty(void *data1, const void *data2) -{ - struct nethdr *net = data1; - const struct nethdr_ack *h = data2; + switch(n->type) { + case Q_ELEM_CTL: { + struct nethdr_ack *net = queue_node_data(n); - if (h == NULL) { - dp("inconditional remove from queue (seq=%u)\n", net->seq); - queue_del(rs_queue, data1); - return 0; + if (before(net->seq, nack->from)) + return 0; /* continue */ + else if (after(net->seq, nack->to)) + return 1; /* break */ + + dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", + net->seq, net->flags, net->len); + + queue_del(n); + queue_add(tx_queue, n); + break; } + case Q_ELEM_OBJ: { + struct cache_ftfw *cn; - if (before(net->seq, h->from)) - return 0; /* continue */ - else if (after(net->seq, h->to)) - return 1; /* break */ + cn = (struct cache_ftfw *) n; + if (before(cn->seq, nack->from)) + return 0; + else if (after(cn->seq, nack->to)) + return 1; - dp("remove from queue (seq=%u)\n", net->seq); - queue_del(rs_queue, data1); + dp("resending nack'ed (oldseq=%u)\n", cn->seq); + + queue_del(n); + queue_add(tx_queue, n); + break; + } + } return 0; } -static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to) +static int rs_queue_empty(struct queue_node *n, const void *data) { - struct cache_ftfw *cn, *tmp; + const struct nethdr_ack *h = data; - list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { - struct cache_object *obj;; - - obj = cache_data_get_object(STATE_SYNC(internal), cn); - if (before(cn->seq, from)) - continue; - else if (after(cn->seq, to)) - break; + if (h == NULL) { + dp("inconditional remove from queue (seq=%u)\n", net->seq); + queue_del(n); + return 0; + } - dp("resending nack'ed (oldseq=%u)\n", cn->seq); - list_del_init(&cn->rs_list); - rs_list_len--; - /* we received a request for resync before this nack? */ - if (list_empty(&cn->tx_list)) { - list_add_tail(&cn->tx_list, &tx_list); - tx_list_len++; - } - write_evfd(STATE_SYNC(evfd)); - } -} + switch(n->type) { + case Q_ELEM_CTL: { + struct nethdr_ack *net = queue_node_data(n); -static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to) -{ - struct cache_ftfw *cn, *tmp; + if (before(net->seq, h->from)) + return 0; /* continue */ + else if (after(net->seq, h->to)) + return 1; /* break */ - list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { - struct cache_object *obj; + dp("remove from queue (seq=%u)\n", net->seq); + queue_del(n); + queue_object_free((struct queue_object *)n); + break; + } + case Q_ELEM_OBJ: { + struct cache_ftfw *cn; - obj = cache_data_get_object(STATE_SYNC(internal), cn); - if (before(cn->seq, from)) - continue; - else if (after(cn->seq, to)) - break; + cn = (struct cache_ftfw *) n; + if (before(cn->seq, h->from)) + return 0; + else if (after(cn->seq, h->to)) + return 1; dp("queue: deleting from queue (seq=%u)\n", cn->seq); - list_del_init(&cn->rs_list); - rs_list_len--; + queue_del(n); + break; } + } + return 0; } static int digest_msg(const struct nethdr *net) @@ -351,7 +345,6 @@ static int digest_msg(const struct nethdr *net) if (before(h->to, h->from)) return MSG_BAD; - rs_list_empty(STATE_SYNC(internal), h->from, h->to); queue_iterate(rs_queue, h, rs_queue_empty); return MSG_CTL; @@ -361,7 +354,6 @@ static int digest_msg(const struct nethdr *net) if (before(nack->to, nack->from)) return MSG_BAD; - rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to); queue_iterate(rs_queue, nack, rs_queue_to_tx); return MSG_CTL; @@ -409,7 +401,6 @@ static int ftfw_recv(const struct nethdr *net) * know anything about that data, we are unreliable until * the helloing finishes */ queue_iterate(rs_queue, NULL, rs_queue_empty); - rs_list_empty(STATE_SYNC(internal), 0, ~0U); goto bypass; } @@ -480,10 +471,8 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj) cn = (struct cache_ftfw *) cache_get_extra(STATE_SYNC(internal), obj); - if (!list_empty(&cn->rs_list)) { - list_del_init(&cn->rs_list); - rs_list_len--; - } + if (queue_in(rs_queue, &cn->qnode)) + queue_del(&cn->qnode); switch(hello_state) { case HELLO_INIT: @@ -500,82 +489,77 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj) } cn->seq = ntohl(net->seq); - list_add_tail(&cn->rs_list, &rs_list); - rs_list_len++; + queue_add(rs_queue, &cn->qnode); break; } } -static int tx_queue_xmit(void *data1, const void *data2) +static int tx_queue_xmit(struct queue_node *n, const void *data) { - struct nethdr *net = data1; - - if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { - nethdr_set_ack(net); - } else if (IS_ALIVE(net)) { - nethdr_set_ctl(net); - } else { - STATE_SYNC(error).msg_snd_malformed++; - return 0; - } - HDR_HOST2NETWORK(net); - - dp("tx_queue sq: %u fl:%u len:%u\n", - ntohl(net->seq), net->flags, ntohs(net->len)); - - mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); - HDR_NETWORK2HOST(net); + switch(n->type) { + case Q_ELEM_CTL: { + struct nethdr *net = queue_node_data(n); + + if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { + nethdr_set_ack(net); + } else if (IS_ALIVE(net)) { + nethdr_set_ctl(net); + } else { + STATE_SYNC(error).msg_snd_malformed++; + return 0; + } + HDR_HOST2NETWORK(net); - if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) - queue_add(rs_queue, net, net->len); + dp("tx_queue sq: %u fl:%u len:%u\n", + ntohl(net->seq), net->flags, ntohs(net->len)); - queue_del(tx_queue, net); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + HDR_NETWORK2HOST(net); - return 0; -} - -static int tx_list_xmit(struct list_head *i, struct cache_object *obj, int type) -{ - int ret; - struct nethdr *net = BUILD_NETMSG(obj->ct, type); + queue_del(n); + if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) + queue_add(rs_queue, n); + else + queue_object_free((struct queue_object *)n); + break; + } + case Q_ELEM_OBJ: { + struct cache_ftfw *cn; + struct cache_object *obj; + int type; + struct nethdr *net; - dp("tx_list sq: %u fl:%u len:%u\n", - ntohl(net->seq), net->flags, ntohs(net->len)); + cn = (struct cache_ftfw *)n; + obj = cache_data_get_object(STATE_SYNC(internal), cn); + type = object_status_to_network_type(obj->status); + net = BUILD_NETMSG(obj->ct, type); - list_del_init(i); - tx_list_len--; + dp("tx_list sq: %u fl:%u len:%u\n", + ntohl(net->seq), net->flags, ntohs(net->len)); - ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); - ftfw_send(net, obj); + queue_del(n); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + ftfw_send(net, obj); + break; + } + } - return ret; + return 0; } -static void ftfw_run(void) +static void ftfw_run(fd_set *readfds) { - struct cache_ftfw *cn, *tmp; - - /* send messages in the tx_queue */ - queue_iterate(tx_queue, NULL, tx_queue_xmit); - - /* send conntracks in the tx_list */ - list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) { - struct cache_object *obj; - - obj = cache_data_get_object(STATE_SYNC(internal), cn); - if (alarm_pending(&obj->alarm)) - tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_DEL); - else - tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_UPD); + if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) { + queue_iterate(tx_queue, NULL, tx_queue_xmit); + add_alarm(&alive_alarm, 1, 0); + dp("tx_queue_len:%u rs_queue_len:%u\n", + queue_len(tx_queue), queue_len(rs_queue)); } +} - /* reset alive alarm */ - add_alarm(&alive_alarm, 1, 0); - - dp("tx_list_len:%u tx_queue_len:%u " - "rs_list_len: %u rs_queue_len:%u\n", - tx_list_len, queue_len(tx_queue), - rs_list_len, queue_len(rs_queue)); +static int ftfw_register_fds(struct fds *fds) +{ + return register_fd(queue_get_eventfd(tx_queue), fds); } struct sync_mode sync_ftfw = { @@ -588,4 +572,5 @@ struct sync_mode sync_ftfw = { .recv = ftfw_recv, .send = ftfw_send, .run = ftfw_run, + .register_fds = ftfw_register_fds, }; -- cgit v1.2.3