From 786f37040cdcb64b24eb0b437307ed5e208f717f Mon Sep 17 00:00:00 2001 From: Pablo Neira Ayuso Date: Sat, 17 Jan 2009 17:54:57 +0100 Subject: sync: add generic tx_queue for all synchronization modes This patch adds a generic tx queue for all synchronization modes. Signed-off-by: Pablo Neira Ayuso --- src/sync-ftfw.c | 37 +++++++++++-------------------------- src/sync-mode.c | 16 ++++++++++++---- src/sync-notrack.c | 37 +++++-------------------------------- 3 files changed, 28 insertions(+), 62 deletions(-) (limited to 'src') diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index 565a4bc..d544a7b 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -34,7 +34,6 @@ #define dp(...) #endif -struct queue *tx_queue; struct queue *rs_queue; static uint32_t exp_seq; static uint32_t window; @@ -108,7 +107,7 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) ack->from = from; ack->to = to; - queue_add(tx_queue, &qobj->qnode); + queue_add(STATE_SYNC(tx_queue), &qobj->qnode); } static void tx_queue_add_ctlmsg2(uint32_t flags) @@ -124,7 +123,7 @@ static void tx_queue_add_ctlmsg2(uint32_t flags) ctl->type = NET_T_CTL; ctl->flags = flags; - queue_add(tx_queue, &qobj->qnode); + queue_add(STATE_SYNC(tx_queue), &qobj->qnode); } /* this function is called from the alarm framework */ @@ -144,11 +143,6 @@ static void do_alive_alarm(struct alarm_block *a, void *data) static int ftfw_init(void) { - tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD); - if (tx_queue == NULL) { - dlog(LOG_ERR, "cannot create tx queue"); - return -1; - } rs_queue = queue_create(INT_MAX, 0); if (rs_queue == NULL) { dlog(LOG_ERR, "cannot create rs queue"); @@ -167,7 +161,6 @@ static int ftfw_init(void) static void ftfw_kill(void) { queue_destroy(rs_queue); - queue_destroy(tx_queue); } static int do_cache_to_tx(void *data1, void *data2) @@ -178,7 +171,7 @@ static int do_cache_to_tx(void *data1, void *data2) if (queue_in(rs_queue, &cn->qnode)) queue_del(&cn->qnode); - queue_add(tx_queue, &cn->qnode); + queue_add(STATE_SYNC(tx_queue), &cn->qnode); return 0; } @@ -259,7 +252,7 @@ static int rs_queue_to_tx(struct queue_node *n, const void *data) net->seq, net->flags, net->len); queue_del(n); - queue_add(tx_queue, n); + queue_add(STATE_SYNC(tx_queue), n); break; } case Q_ELEM_OBJ: { @@ -274,7 +267,7 @@ static int rs_queue_to_tx(struct queue_node *n, const void *data) dp("resending nack'ed (oldseq=%u)\n", cn->seq); queue_del(n); - queue_add(tx_queue, n); + queue_add(STATE_SYNC(tx_queue), n); break; } } @@ -526,19 +519,12 @@ static int tx_queue_xmit(struct queue_node *n, const void *data) return 0; } -static void ftfw_run(fd_set *readfds) +static void ftfw_xmit(void) { - if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) { - queue_iterate(tx_queue, NULL, tx_queue_xmit); - add_alarm(&alive_alarm, 1, 0); - dp("tx_queue_len:%u rs_queue_len:%u\n", - queue_len(tx_queue), queue_len(rs_queue)); - } -} - -static int ftfw_register_fds(struct fds *fds) -{ - return register_fd(queue_get_eventfd(tx_queue), fds); + queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit); + add_alarm(&alive_alarm, ALIVE_INT, 0); + dp("tx_queue_len:%u rs_queue_len:%u\n", + queue_len(tx_queue), queue_len(rs_queue)); } struct sync_mode sync_ftfw = { @@ -550,6 +536,5 @@ struct sync_mode sync_ftfw = { .local = ftfw_local, .recv = ftfw_recv, .send = ftfw_send, - .run = ftfw_run, - .register_fds = ftfw_register_fds, + .xmit = ftfw_xmit, }; diff --git a/src/sync-mode.c b/src/sync-mode.c index 711f71b..5ae9062 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -26,6 +26,7 @@ #include "fds.h" #include "event.h" #include "debug.h" +#include "queue.h" #include #include @@ -242,6 +243,12 @@ static int init_sync(void) return -1; } + STATE_SYNC(tx_queue) = queue_create(INT_MAX, QUEUE_F_EVFD); + if (STATE_SYNC(tx_queue) == NULL) { + dlog(LOG_ERR, "cannot create tx queue"); + return -1; + } + /* initialization of multicast sequence generation */ STATE_SYNC(last_seq_sent) = time(NULL); @@ -253,8 +260,8 @@ static int register_fds_sync(struct fds *fds) if (register_fd(STATE_SYNC(mcast_server->fd), fds) == -1) return -1; - if (STATE_SYNC(sync)->register_fds) - return STATE_SYNC(sync)->register_fds(fds); + if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)), fds) == -1) + return -1; return 0; } @@ -265,8 +272,8 @@ static void run_sync(fd_set *readfds) if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds)) mcast_handler(); - if (STATE_SYNC(sync)->run) - STATE_SYNC(sync)->run(readfds); + if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds)) + STATE_SYNC(sync)->xmit(); /* flush pending messages */ mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client)); @@ -281,6 +288,7 @@ static void kill_sync(void) mcast_client_destroy(STATE_SYNC(mcast_client)); mcast_buffered_destroy(); + queue_destroy(STATE_SYNC(tx_queue)); if (STATE_SYNC(sync)->kill) STATE_SYNC(sync)->kill(); diff --git a/src/sync-notrack.c b/src/sync-notrack.c index 40cc199..4ded298 100644 --- a/src/sync-notrack.c +++ b/src/sync-notrack.c @@ -27,8 +27,6 @@ #include -static struct queue *tx_queue; - struct cache_notrack { struct queue_node qnode; }; @@ -66,30 +64,14 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) ack->from = from; ack->to = to; - queue_add(tx_queue, &qobj->qnode); -} - -static int notrack_init(void) -{ - tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD); - if (tx_queue == NULL) { - dlog(LOG_ERR, "cannot create tx queue"); - return -1; - } - - return 0; -} - -static void notrack_kill(void) -{ - queue_destroy(tx_queue); + queue_add(STATE_SYNC(tx_queue), &qobj->qnode); } static int do_cache_to_tx(void *data1, void *data2) { struct cache_object *obj = data2; struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj); - queue_add(tx_queue, &cn->qnode); + queue_add(STATE_SYNC(tx_queue), &cn->qnode); return 0; } @@ -176,25 +158,16 @@ static int tx_queue_xmit(struct queue_node *n, const void *data2) return 0; } -static void notrack_run(fd_set *readfds) -{ - if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) - queue_iterate(tx_queue, NULL, tx_queue_xmit); -} - -static int notrack_register_fds(struct fds *fds) +static void notrack_xmit(void) { - return register_fd(queue_get_eventfd(tx_queue), fds); + queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit); } struct sync_mode sync_notrack = { .internal_cache_flags = LIFETIME, .external_cache_flags = LIFETIME, .internal_cache_extra = &cache_notrack_extra, - .init = notrack_init, - .kill = notrack_kill, .local = notrack_local, .recv = notrack_recv, - .run = notrack_run, - .register_fds = notrack_register_fds, + .xmit = notrack_xmit, }; -- cgit v1.2.3