From e2af183ea7e5ea35a1582f40a01a7c49e83b31be Mon Sep 17 00:00:00 2001 From: Pablo Neira Ayuso Date: Thu, 15 Jan 2009 23:19:58 +0100 Subject: sync: unify tx_list and tx_queue into one single tx_queue This patch unifies the tx_list and the tx_queue to have only one transmission queue. Since the tx_list hold state objects and tx_queue control messages, I have introduced a queue node type that can be used to differenciate the kind of information that the node stores: object or control message. This patch also reworks the existing queue class to include a file descriptor that can be used to know if there are new data added to the queue (see QUEUE_F_EVFD flag). In this change, I have also modified the current evfd to make the file descriptor to make read operations non-blocking. Moreover, it keeps a counter that is used to know how many messages are inserted in the queue. Signed-off-by: Pablo Neira Ayuso --- src/sync-notrack.c | 114 ++++++++++++++++++++++++++--------------------------- 1 file changed, 55 insertions(+), 59 deletions(-) (limited to 'src/sync-notrack.c') diff --git a/src/sync-notrack.c b/src/sync-notrack.c index 2d3783e..40cc199 100644 --- a/src/sync-notrack.c +++ b/src/sync-notrack.c @@ -23,32 +23,26 @@ #include "network.h" #include "log.h" #include "cache.h" -#include "event.h" +#include "fds.h" #include -static LIST_HEAD(tx_list); -static unsigned int tx_list_len; static struct queue *tx_queue; struct cache_notrack { - struct list_head tx_list; + struct queue_node qnode; }; static void cache_notrack_add(struct cache_object *obj, void *data) { struct cache_notrack *cn = data; - INIT_LIST_HEAD(&cn->tx_list); + queue_node_init(&cn->qnode, Q_ELEM_OBJ); } static void cache_notrack_del(struct cache_object *obj, void *data) { struct cache_notrack *cn = data; - - if (!list_empty(&cn->tx_list)) { - list_del(&cn->tx_list); - tx_list_len--; - } + queue_del(&cn->qnode); } static struct cache_extra cache_notrack_extra = { @@ -59,20 +53,25 @@ static struct cache_extra cache_notrack_extra = { static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) { - struct nethdr_ack ack = { - .type = NET_T_CTL, - .flags = flags, - .from = from, - .to = to, - }; - - queue_add(tx_queue, &ack, NETHDR_ACK_SIZ); - write_evfd(STATE_SYNC(evfd)); + struct queue_object *qobj; + struct nethdr_ack *ack; + + qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack)); + if (qobj == NULL) + return; + + ack = (struct nethdr_ack *)qobj->data; + ack->type = NET_T_CTL; + ack->flags = flags; + ack->from = from; + ack->to = to; + + queue_add(tx_queue, &qobj->qnode); } static int notrack_init(void) { - tx_queue = queue_create(~0U); + tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD); if (tx_queue == NULL) { dlog(LOG_ERR, "cannot create tx queue"); return -1; @@ -90,16 +89,7 @@ static int do_cache_to_tx(void *data1, void *data2) { struct cache_object *obj = data2; struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj); - - if (!list_empty(&cn->tx_list)) - return 0; - - /* add to tx list */ - list_add_tail(&cn->tx_list, &tx_list); - tx_list_len++; - - write_evfd(STATE_SYNC(evfd)); - + queue_add(tx_queue, &cn->qnode); return 0; } @@ -152,44 +142,49 @@ static int notrack_recv(const struct nethdr *net) return ret; } -static int tx_queue_xmit(void *data1, const void *data2) +static int tx_queue_xmit(struct queue_node *n, const void *data2) { - struct nethdr *net = data1; - nethdr_set_ack(net); - HDR_HOST2NETWORK(net); - mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); - queue_del(tx_queue, net); + switch (n->type) { + case Q_ELEM_CTL: { + struct nethdr *net = queue_node_data(n); + if (IS_RESYNC(net)) + nethdr_set_ack(net); + else + nethdr_set_ctl(net); + HDR_HOST2NETWORK(net); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + queue_del(n); + queue_object_free((struct queue_object *)n); + break; + } + case Q_ELEM_OBJ: { + struct cache_ftfw *cn; + struct cache_object *obj; + int type; + struct nethdr *net; + + cn = (struct cache_ftfw *)n; + obj = cache_data_get_object(STATE_SYNC(internal), cn); + type = object_status_to_network_type(obj->status);; + net = BUILD_NETMSG(obj->ct, type); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); + queue_del(n); + break; + } + } return 0; } -static int tx_list_xmit(struct list_head *i, struct cache_object *obj, int type) +static void notrack_run(fd_set *readfds) { - int ret; - struct nethdr *net = BUILD_NETMSG(obj->ct, type); - - list_del_init(i); - tx_list_len--; - - ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); - - return ret; + if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) + queue_iterate(tx_queue, NULL, tx_queue_xmit); } -static void notrack_run(void) +static int notrack_register_fds(struct fds *fds) { - struct cache_notrack *cn, *tmp; - - /* send messages in the tx_queue */ - queue_iterate(tx_queue, NULL, tx_queue_xmit); - - /* send conntracks in the tx_list */ - list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) { - struct cache_object *obj; - - obj = cache_data_get_object(STATE_SYNC(internal), cn); - tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_UPD); - } + return register_fd(queue_get_eventfd(tx_queue), fds); } struct sync_mode sync_notrack = { @@ -201,4 +196,5 @@ struct sync_mode sync_notrack = { .local = notrack_local, .recv = notrack_recv, .run = notrack_run, + .register_fds = notrack_register_fds, }; -- cgit v1.2.3