diff options
Diffstat (limited to 'src/sync-nack.c')
-rw-r--r-- | src/sync-nack.c | 260 |
1 files changed, 180 insertions, 80 deletions
diff --git a/src/sync-nack.c b/src/sync-nack.c index 20ad1f4..dbda0a7 100644 --- a/src/sync-nack.c +++ b/src/sync-nack.c @@ -24,6 +24,7 @@ #include "buffer.h" #include "debug.h" #include "network.h" +#include "alarm.h" #include <libnfnetlink/libnfnetlink.h> #include <libnetfilter_conntrack/libnetfilter_conntrack.h> @@ -33,28 +34,34 @@ #define dp #endif -static LIST_HEAD(queue); +static LIST_HEAD(rs_list); +static LIST_HEAD(tx_list); +static unsigned int tx_list_len; +static struct buffer *rs_queue; +static struct buffer *tx_queue; struct cache_nack { - struct list_head head; + struct list_head rs_list; + struct list_head tx_list; u_int32_t seq; }; static void cache_nack_add(struct us_conntrack *u, void *data) { struct cache_nack *cn = data; - INIT_LIST_HEAD(&cn->head); + INIT_LIST_HEAD(&cn->rs_list); + INIT_LIST_HEAD(&cn->tx_list); } static void cache_nack_del(struct us_conntrack *u, void *data) { struct cache_nack *cn = data; - if (cn->head.next == &cn->head && - cn->head.prev == &cn->head) + if (cn->rs_list.next == &cn->rs_list && + cn->rs_list.prev == &cn->rs_list) return; - list_del(&cn->head); + list_del(&cn->rs_list); } static struct cache_extra cache_nack_extra = { @@ -65,19 +72,31 @@ static struct cache_extra cache_nack_extra = { static int nack_init() { - STATE_SYNC(buffer) = buffer_create(CONFIG(resend_buffer_size)); - if (STATE_SYNC(buffer) == NULL) + tx_queue = buffer_create(CONFIG(resend_buffer_size)); + if (tx_queue == NULL) { + dlog(STATE(log), "[FAIL] cannot create tx buffer"); return -1; + } + + rs_queue = buffer_create(CONFIG(resend_buffer_size)); + if (rs_queue == NULL) { + dlog(STATE(log), "[FAIL] cannot create rs buffer"); + return -1; + } + + INIT_LIST_HEAD(&tx_list); + INIT_LIST_HEAD(&rs_list); return 0; } static void nack_kill() { - buffer_destroy(STATE_SYNC(buffer)); + buffer_destroy(rs_queue); + buffer_destroy(tx_queue); } -static void mcast_send_control(u_int32_t flags, u_int32_t from, u_int32_t to) +static void tx_queue_add_ctlmsg(u_int32_t flags, u_int32_t from, u_int32_t to) { struct nethdr_ack ack = { .flags = flags, @@ -85,8 +104,19 @@ static void mcast_send_control(u_int32_t flags, u_int32_t from, u_int32_t to) .to = to, }; - mcast_send_error(STATE_SYNC(mcast_client), &ack); - buffer_add(STATE_SYNC(buffer), &ack, NETHDR_ACK_SIZ); + buffer_add(tx_queue, &ack, NETHDR_ACK_SIZ); +} + +static int do_cache_to_tx(void *data1, void *data2) +{ + struct us_conntrack *u = data2; + struct cache_nack *cn = cache_get_extra(STATE_SYNC(internal), u); + + /* add to tx list */ + list_add(&cn->tx_list, &tx_list); + tx_list_len++; + + return 0; } static int nack_local(int fd, int type, void *data) @@ -94,85 +124,78 @@ static int nack_local(int fd, int type, void *data) int ret = 1; switch(type) { - case REQUEST_DUMP: - mcast_send_control(NET_F_RESYNC, 0, 0); - dlog(STATE(log), "[REQ] request resync"); - break; - default: - ret = 0; - break; + case REQUEST_DUMP: + dlog(STATE(log), "[REQ] request resync"); + tx_queue_add_ctlmsg(NET_F_RESYNC, 0, 0); + break; + case SEND_BULK: + dlog(STATE(log), "[REQ] sending bulk update"); + cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx); + break; + default: + ret = 0; + break; } return ret; } -static int buffer_compare(void *data1, void *data2) +static int rs_queue_to_tx(void *data1, void *data2) { struct nethdr *net = data1; struct nethdr_ack *nack = data2; - struct nlmsghdr *nlh = data1 + NETHDR_SIZ; - - unsigned old_seq = ntohl(net->seq); - if (between(ntohl(net->seq), nack->from, nack->to)) { - if (mcast_resend_netmsg(STATE_SYNC(mcast_client), net)) - dp("resend destroy (old seq=%u) (seq=%u)\n", - old_seq, ntohl(net->seq)); + if (between(net->seq, nack->from, nack->to)) { + dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", + net->seq, net->flags, net->len); + buffer_add(tx_queue, net, net->len); } return 0; } -static int buffer_remove(void *data1, void *data2) +static int rs_queue_empty(void *data1, void *data2) { struct nethdr *net = data1; struct nethdr_ack *h = data2; - if (between(ntohl(net->seq), h->from, h->to)) { - dp("remove from buffer (seq=%u)\n", ntohl(net->seq)); - __buffer_del(STATE_SYNC(buffer), data1); + if (between(net->seq, h->from, h->to)) { + dp("remove from buffer (seq=%u)\n", net->seq); + buffer_del(rs_queue, data1); } return 0; } -static void queue_resend(struct cache *c, unsigned int from, unsigned int to) +static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to) { struct list_head *n; struct us_conntrack *u; - list_for_each(n, &queue) { + list_for_each(n, &rs_list) { struct cache_nack *cn = (struct cache_nack *) n; struct us_conntrack *u; u = cache_get_conntrack(STATE_SYNC(internal), cn); - if (between(cn->seq, from, to)) { - debug_ct(u->ct, "resend nack"); - dp("resending nack'ed (oldseq=%u) ", cn->seq); - - if (mcast_build_send_update(u) == -1) - continue; - - dp("(newseq=%u)\n", cn->seq); + dp("resending nack'ed (oldseq=%u)\n", cn->seq); + list_add(&cn->tx_list, &tx_list); + tx_list_len++; } } } -static void queue_empty(struct cache *c, unsigned int from, unsigned int to) +static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to) { struct list_head *n, *tmp; - struct us_conntrack *u; - dp("ACK from %u to %u\n", from, to); - list_for_each_safe(n, tmp, &queue) { + list_for_each_safe(n, tmp, &rs_list) { struct cache_nack *cn = (struct cache_nack *) n; + struct us_conntrack *u; u = cache_get_conntrack(STATE_SYNC(internal), cn); if (between(cn->seq, from, to)) { - dp("remove %u\n", cn->seq); - debug_ct(u->ct, "ack received: empty queue"); dp("queue: deleting from queue (seq=%u)\n", cn->seq); - list_del(&cn->head); - INIT_LIST_HEAD(&cn->head); + list_del(&cn->rs_list); + INIT_LIST_HEAD(&cn->rs_list); } } } @@ -187,73 +210,149 @@ static int nack_recv(const struct nethdr *net) if (!mcast_track_seq(net->seq, &exp_seq)) { dp("OOS: sending nack (seq=%u)\n", exp_seq); - mcast_send_control(NET_F_NACK, exp_seq, net->seq - 1); + tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1); window = CONFIG(window_size); } else { /* received a window, send an acknowledgement */ if (--window == 0) { dp("sending ack (seq=%u)\n", net->seq); - mcast_send_control(NET_F_ACK, - net->seq - CONFIG(window_size), - net->seq); + tx_queue_add_ctlmsg(NET_F_ACK, + net->seq - CONFIG(window_size), + net->seq); } } - if (net->flags & NET_F_NACK) { + if (IS_NACK(net)) { struct nethdr_ack *nack = (struct nethdr_ack *) net; dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to); - queue_resend(STATE_SYNC(internal), nack->from, nack->to); - buffer_iterate(STATE_SYNC(buffer), nack, buffer_compare); + rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to); + buffer_iterate(rs_queue, nack, rs_queue_to_tx); return 1; - } else if (net->flags & NET_F_RESYNC) { + } else if (IS_RESYNC(net)) { dp("RESYNC ALL\n"); - cache_bulk(STATE_SYNC(internal)); + cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx); return 1; - } else if (net->flags & NET_F_ACK) { + } else if (IS_ACK(net)) { struct nethdr_ack *h = (struct nethdr_ack *) net; dp("ACK: from seq=%u to seq=%u\n", h->from, h->to); - queue_empty(STATE_SYNC(internal), h->from, h->to); - buffer_iterate(STATE_SYNC(buffer), h, buffer_remove); + rs_list_empty(STATE_SYNC(internal), h->from, h->to); + buffer_iterate(rs_queue, h, rs_queue_empty); + return 1; + } else if (IS_ALIVE(net)) return 1; - } return 0; } -static void nack_send(int type, - const struct nethdr *net, - struct us_conntrack *u) +static void nack_send(struct nethdr *net, struct us_conntrack *u) { - int size = NETHDR_SIZ; - struct nlmsghdr *nlh = (struct nlmsghdr *) ((void *) net + size); + struct netpld *pld = NETHDR_DATA(net); struct cache_nack *cn; - - size += ntohl(nlh->nlmsg_len); - switch(type) { - case NFCT_T_NEW: - case NFCT_T_UPDATE: + HDR_NETWORK2HOST(net); + + switch(ntohs(pld->query)) { + case NFCT_Q_CREATE: + case NFCT_Q_UPDATE: cn = (struct cache_nack *) cache_get_extra(STATE_SYNC(internal), u); - if (cn->head.next == &cn->head && - cn->head.prev == &cn->head) + if (cn->rs_list.next == &cn->rs_list && + cn->rs_list.prev == &cn->rs_list) goto insert; - list_del(&cn->head); - INIT_LIST_HEAD(&cn->head); + list_del(&cn->rs_list); + INIT_LIST_HEAD(&cn->rs_list); insert: - cn->seq = ntohl(net->seq); - list_add(&cn->head, &queue); + cn->seq = net->seq; + list_add(&cn->rs_list, &rs_list); break; - case NFCT_T_DESTROY: - buffer_add(STATE_SYNC(buffer), net, size); + case NFCT_Q_DESTROY: + buffer_add(rs_queue, net, net->len); break; } } +static int tx_queue_xmit(void *data1, void *data2) +{ + struct nethdr *net = data1; + int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); + + dp("tx_queue sq: %u fl:%u len:%u\n", + ntohl(net->seq), ntohs(net->flags), ntohs(net->len)); + + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); + HDR_NETWORK2HOST(net); + + if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) { + dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n", + net->seq, net->flags, net->len); + buffer_add(rs_queue, net, net->len); + } + buffer_del(tx_queue, net); + + return 0; +} + +static int tx_list_xmit(struct list_head *i, struct us_conntrack *u) +{ + int ret; + struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE); + int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); + + dp("tx_list sq: %u fl:%u len:%u\n", + ntohl(net->seq), ntohs(net->flags), + ntohs(net->len)); + + list_del(i); + INIT_LIST_HEAD(i); + tx_list_len--; + + ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); + if (STATE_SYNC(sync)->send) + STATE_SYNC(sync)->send(net, u); + + return ret; +} + +static struct alarm_list alive_alarm; + +static void do_alive_alarm(struct alarm_list *a, void *data) +{ + del_alarm(a); + tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0); +} + +static void nack_run(int step) +{ + struct list_head *i, *tmp; + + /* send messages in the tx_queue */ + buffer_iterate(tx_queue, NULL, tx_queue_xmit); + + /* send conntracks in the tx_list */ + list_for_each_safe(i, tmp, &tx_list) { + struct cache_nack *cn; + struct us_conntrack *u; + + cn = container_of(i, struct cache_nack, tx_list); + u = cache_get_conntrack(STATE_SYNC(internal), cn); + tx_list_xmit(i, u); + } + + if (alive_alarm.expires > 0) + mod_alarm(&alive_alarm, 1); + else { + init_alarm(&alive_alarm); + /* XXX: alive message expiration configurable */ + set_alarm_expiration(&alive_alarm, 1); + set_alarm_function(&alive_alarm, do_alive_alarm); + add_alarm(&alive_alarm); + } +} + struct sync_mode nack = { .internal_cache_flags = LIFETIME, .external_cache_flags = LIFETIME, @@ -263,4 +362,5 @@ struct sync_mode nack = { .local = nack_local, .recv = nack_recv, .send = nack_send, + .run = nack_run, }; |