From 786f37040cdcb64b24eb0b437307ed5e208f717f Mon Sep 17 00:00:00 2001 From: Pablo Neira Ayuso Date: Sat, 17 Jan 2009 17:54:57 +0100 Subject: sync: add generic tx_queue for all synchronization modes This patch adds a generic tx queue for all synchronization modes. Signed-off-by: Pablo Neira Ayuso --- src/sync-ftfw.c | 37 +++++++++++-------------------------- 1 file changed, 11 insertions(+), 26 deletions(-) (limited to 'src/sync-ftfw.c') diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index 565a4bc..d544a7b 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -34,7 +34,6 @@ #define dp(...) #endif -struct queue *tx_queue; struct queue *rs_queue; static uint32_t exp_seq; static uint32_t window; @@ -108,7 +107,7 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) ack->from = from; ack->to = to; - queue_add(tx_queue, &qobj->qnode); + queue_add(STATE_SYNC(tx_queue), &qobj->qnode); } static void tx_queue_add_ctlmsg2(uint32_t flags) @@ -124,7 +123,7 @@ static void tx_queue_add_ctlmsg2(uint32_t flags) ctl->type = NET_T_CTL; ctl->flags = flags; - queue_add(tx_queue, &qobj->qnode); + queue_add(STATE_SYNC(tx_queue), &qobj->qnode); } /* this function is called from the alarm framework */ @@ -144,11 +143,6 @@ static void do_alive_alarm(struct alarm_block *a, void *data) static int ftfw_init(void) { - tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD); - if (tx_queue == NULL) { - dlog(LOG_ERR, "cannot create tx queue"); - return -1; - } rs_queue = queue_create(INT_MAX, 0); if (rs_queue == NULL) { dlog(LOG_ERR, "cannot create rs queue"); @@ -167,7 +161,6 @@ static int ftfw_init(void) static void ftfw_kill(void) { queue_destroy(rs_queue); - queue_destroy(tx_queue); } static int do_cache_to_tx(void *data1, void *data2) @@ -178,7 +171,7 @@ static int do_cache_to_tx(void *data1, void *data2) if (queue_in(rs_queue, &cn->qnode)) queue_del(&cn->qnode); - queue_add(tx_queue, &cn->qnode); + queue_add(STATE_SYNC(tx_queue), &cn->qnode); return 0; } @@ -259,7 +252,7 @@ static int rs_queue_to_tx(struct queue_node *n, const void *data) net->seq, net->flags, net->len); queue_del(n); - queue_add(tx_queue, n); + queue_add(STATE_SYNC(tx_queue), n); break; } case Q_ELEM_OBJ: { @@ -274,7 +267,7 @@ static int rs_queue_to_tx(struct queue_node *n, const void *data) dp("resending nack'ed (oldseq=%u)\n", cn->seq); queue_del(n); - queue_add(tx_queue, n); + queue_add(STATE_SYNC(tx_queue), n); break; } } @@ -526,19 +519,12 @@ static int tx_queue_xmit(struct queue_node *n, const void *data) return 0; } -static void ftfw_run(fd_set *readfds) +static void ftfw_xmit(void) { - if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) { - queue_iterate(tx_queue, NULL, tx_queue_xmit); - add_alarm(&alive_alarm, 1, 0); - dp("tx_queue_len:%u rs_queue_len:%u\n", - queue_len(tx_queue), queue_len(rs_queue)); - } -} - -static int ftfw_register_fds(struct fds *fds) -{ - return register_fd(queue_get_eventfd(tx_queue), fds); + queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit); + add_alarm(&alive_alarm, ALIVE_INT, 0); + dp("tx_queue_len:%u rs_queue_len:%u\n", + queue_len(tx_queue), queue_len(rs_queue)); } struct sync_mode sync_ftfw = { @@ -550,6 +536,5 @@ struct sync_mode sync_ftfw = { .local = ftfw_local, .recv = ftfw_recv, .send = ftfw_send, - .run = ftfw_run, - .register_fds = ftfw_register_fds, + .xmit = ftfw_xmit, }; -- cgit v1.2.3