From 96213d5f0821aee2fe52459ab2cd54569e50cf85 Mon Sep 17 00:00:00 2001 From: "/C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org" Date: Sat, 26 Apr 2008 16:07:00 +0000 Subject: rework of the FT-FW approach --- src/network.c | 43 ++++++---- src/queue.c | 2 +- src/sync-ftfw.c | 237 +++++++++++++++++++++++++++++++++++++++++++------------- src/sync-mode.c | 46 +++++------ 4 files changed, 233 insertions(+), 95 deletions(-) (limited to 'src') diff --git a/src/network.c b/src/network.c index 92999a1..d7ab415 100644 --- a/src/network.c +++ b/src/network.c @@ -33,13 +33,14 @@ static size_t __do_send(struct mcast_sock *m, void *data, size_t len) #undef _TEST_DROP #ifdef _TEST_DROP - static int drop = 0; - if (++drop >= 10) { +#define DROP_RATE .25 + + /* simulate message omission with a certain probability */ + if ((random() & 0x7FFFFFFF) < 0x80000000 * DROP_RATE) { printf("drop sq: %u fl:%u len:%u\n", ntohl(net->seq), ntohs(net->flags), ntohs(net->len)); - drop = 0; return 0; } #endif @@ -57,7 +58,6 @@ static size_t __do_prepare(struct mcast_sock *m, void *data, size_t len) if (!seq_set) { seq_set = 1; cur_seq = time(NULL); - net->flags |= NET_F_HELLO; } net->len = len; net->seq = cur_seq++; @@ -181,9 +181,6 @@ int handle_netmsg(struct nethdr *net) HDR_NETWORK2HOST(net); - if (IS_HELLO(net)) - STATE_SYNC(last_seq_recv) = net->seq - 1; - if (IS_CTL(net)) return 0; @@ -198,37 +195,51 @@ int handle_netmsg(struct nethdr *net) return 0; } +static int local_seq_set = 0; + +/* this function only tracks, it does not update the last sequence received */ int mcast_track_seq(uint32_t seq, uint32_t *exp_seq) { - static int local_seq_set = 0; - int ret = 1; + int ret = SEQ_UNKNOWN; /* netlink sequence tracking initialization */ if (!local_seq_set) { - local_seq_set = 1; + ret = SEQ_UNSET; goto out; } /* fast path: we received the correct sequence */ - if (seq == STATE_SYNC(last_seq_recv)+1) + if (seq == STATE_SYNC(last_seq_recv)+1) { + ret = SEQ_IN_SYNC; goto out; + } /* out of sequence: some messages got lost */ if (after(seq, STATE_SYNC(last_seq_recv)+1)) { STATE_SYNC(packets_lost) += seq-STATE_SYNC(last_seq_recv)+1; - ret = 0; + ret = SEQ_AFTER; goto out; } /* out of sequence: replayed/delayed packet? */ if (before(seq, STATE_SYNC(last_seq_recv)+1)) - dlog(LOG_WARNING, "delayed packet? exp=%u rcv=%u", - STATE_SYNC(last_seq_recv)+1, seq); + ret = SEQ_BEFORE; out: *exp_seq = STATE_SYNC(last_seq_recv)+1; - /* update expected sequence */ - STATE_SYNC(last_seq_recv) = seq; return ret; } + +void mcast_track_update_seq(uint32_t seq) +{ + if (!local_seq_set) + local_seq_set = 1; + + STATE_SYNC(last_seq_recv) = seq; +} + +int mcast_track_is_seq_set() +{ + return local_seq_set; +} diff --git a/src/queue.c b/src/queue.c index 7b20e83..cdd70ae 100644 --- a/src/queue.c +++ b/src/queue.c @@ -93,7 +93,7 @@ retry: goto err; } - list_add(&n->head, &b->head); + list_add_tail(&n->head, &b->head); b->cur_size += size; b->num_elems++; diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index cac25d0..0b98513 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -34,11 +34,26 @@ #define dp(...) #endif +#if 0 +#define dprint printf +#else +#define dprint(...) +#endif + static LIST_HEAD(rs_list); static LIST_HEAD(tx_list); +static unsigned int rs_list_len; static unsigned int tx_list_len; static struct queue *rs_queue; static struct queue *tx_queue; +static uint32_t exp_seq; +static uint32_t window; +static uint32_t ack_from; +static int ack_from_set = 0; +static struct alarm_block alive_alarm; + +/* XXX: alive message expiration configurable */ +#define ALIVE_INT 1 struct cache_ftfw { struct list_head rs_list; @@ -64,6 +79,7 @@ static void cache_ftfw_del(struct us_conntrack *u, void *data) /* no need for list_del_init since the entry is destroyed */ list_del(&cn->rs_list); + rs_list_len--; } static struct cache_extra cache_ftfw_extra = { @@ -83,15 +99,57 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) queue_add(tx_queue, &ack, NETHDR_ACK_SIZ); } -static struct alarm_block alive_alarm; +static void ftfw_run(void); +/* this function is called from the alarm framework */ static void do_alive_alarm(struct alarm_block *a, void *data) { - tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0); + if (ack_from_set && mcast_track_is_seq_set()) { + /* exp_seq contains the last update received */ + dprint("send ALIVE ACK (from=%u, to=%u)\n", + ack_from, STATE_SYNC(last_seq_recv)); + tx_queue_add_ctlmsg(NET_F_ACK, + ack_from, + STATE_SYNC(last_seq_recv)); + ack_from_set = 0; + } else + tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0); + + /* TODO: no need for buffered send, extracted from run_sync() */ + ftfw_run(); + mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client)); +} - add_alarm(&alive_alarm, 1, 0); +#undef _SIGNAL_DEBUG +#ifdef _SIGNAL_DEBUG + +static int rs_dump(void *data1, const void *data2) +{ + struct nethdr_ack *net = data1; + + dprint("in RS queue -> seq:%u flags:%u\n", net->seq, net->flags); + + return 0; } +#include + +static void my_dump(int foo) +{ + struct cache_ftfw *cn, *tmp; + + list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { + struct us_conntrack *u; + + u = cache_get_conntrack(STATE_SYNC(internal), cn); + dprint("in RS list -> seq:%u\n", cn->seq); + } + + queue_iterate(rs_queue, NULL, rs_dump); +} + +#endif + static int ftfw_init(void) { tx_queue = queue_create(CONFIG(resend_queue_size)); @@ -106,12 +164,15 @@ static int ftfw_init(void) return -1; } - INIT_LIST_HEAD(&tx_list); - INIT_LIST_HEAD(&rs_list); - - /* XXX: alive message expiration configurable */ init_alarm(&alive_alarm, NULL, do_alive_alarm); - add_alarm(&alive_alarm, 1, 0); + add_alarm(&alive_alarm, ALIVE_INT, 0); + + /* set ack window size */ + window = CONFIG(window_size); + +#ifdef _SIGNAL_DEBUG + signal(SIGUSR1, my_dump); +#endif return 0; } @@ -128,7 +189,7 @@ static int do_cache_to_tx(void *data1, void *data2) struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), u); /* add to tx list */ - list_add(&cn->tx_list, &tx_list); + list_add_tail(&cn->tx_list, &tx_list); tx_list_len++; return 0; @@ -157,13 +218,14 @@ static int ftfw_local(int fd, int type, void *data) static int rs_queue_to_tx(void *data1, const void *data2) { - struct nethdr *net = data1; + struct nethdr_ack *net = data1; const struct nethdr_ack *nack = data2; if (between(net->seq, nack->from, nack->to)) { dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", net->seq, net->flags, net->len); queue_add(tx_queue, net, net->len); + queue_del(rs_queue, net); } return 0; } @@ -182,18 +244,20 @@ static int rs_queue_empty(void *data1, const void *data2) static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to) { - struct cache_ftfw *cn; + struct cache_ftfw *cn, *tmp; - list_for_each_entry(cn, &rs_list, rs_list) { + list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) { struct us_conntrack *u; u = cache_get_conntrack(STATE_SYNC(internal), cn); if (between(cn->seq, from, to)) { dp("resending nack'ed (oldseq=%u)\n", cn->seq); - list_add(&cn->tx_list, &tx_list); + list_del_init(&cn->rs_list); + rs_list_len--; + list_add_tail(&cn->tx_list, &tx_list); tx_list_len++; - } - } + } + } } static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to) @@ -207,54 +271,113 @@ static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to) if (between(cn->seq, from, to)) { dp("queue: deleting from queue (seq=%u)\n", cn->seq); list_del_init(&cn->rs_list); - } + rs_list_len--; + } } } -static int ftfw_recv(const struct nethdr *net) +static int digest_msg(const struct nethdr *net) { - static unsigned int window = 0; - unsigned int exp_seq; + if (IS_DATA(net)) + return MSG_DATA; - if (window == 0) - window = CONFIG(window_size); + else if (IS_ACK(net)) { + const struct nethdr_ack *h = (const struct nethdr_ack *) net; - if (!mcast_track_seq(net->seq, &exp_seq)) { - dp("OOS: sending nack (seq=%u)\n", exp_seq); - tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1); - window = CONFIG(window_size); - } else { - /* received a window, send an acknowledgement */ - if (--window == 0) { - dp("sending ack (seq=%u)\n", net->seq); - tx_queue_add_ctlmsg(NET_F_ACK, - net->seq - CONFIG(window_size), - net->seq); - } - } + dprint("ACK(%u): from seq=%u to seq=%u\n", + h->seq, h->from, h->to); + rs_list_empty(STATE_SYNC(internal), h->from, h->to); + queue_iterate(rs_queue, h, rs_queue_empty); + return MSG_CTL; - if (IS_NACK(net)) { + } else if (IS_NACK(net)) { const struct nethdr_ack *nack = (const struct nethdr_ack *) net; - dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to); + dprint("NACK(%u): from seq=%u to seq=%u\n", + nack->seq, nack->from, nack->to); rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to); queue_iterate(rs_queue, nack, rs_queue_to_tx); - return 1; + return MSG_CTL; + } else if (IS_RESYNC(net)) { dp("RESYNC ALL\n"); cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx); - return 1; - } else if (IS_ACK(net)) { - const struct nethdr_ack *h = (const struct nethdr_ack *) net; + return MSG_CTL; - dp("ACK: from seq=%u to seq=%u\n", h->from, h->to); - rs_list_empty(STATE_SYNC(internal), h->from, h->to); - queue_iterate(rs_queue, h, rs_queue_empty); - return 1; } else if (IS_ALIVE(net)) - return 1; + return MSG_CTL; - return 0; + return MSG_BAD; +} + +static int ftfw_recv(const struct nethdr *net) +{ + int ret = MSG_DATA; + + switch (mcast_track_seq(net->seq, &exp_seq)) { + case SEQ_AFTER: + ret = digest_msg(net); + if (ret == MSG_BAD) { + ret = MSG_BAD; + goto out; + } + + if (ack_from_set) { + tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1); + dprint("OFS send half ACK: from seq=%u to seq=%u\n", + ack_from, exp_seq-1); + ack_from_set = 0; + } + + tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1); + dprint("OFS send NACK: from seq=%u to seq=%u\n", + exp_seq, net->seq-1); + + /* count this message as part of the new window */ + window = CONFIG(window_size) - 1; + ack_from = net->seq; + ack_from_set = 1; + break; + + case SEQ_BEFORE: + /* we don't accept delayed packets */ + dlog(LOG_WARNING, "Received seq=%u before expected seq=%u", + net->seq, exp_seq); + dlog(LOG_WARNING, "Probably the other node has come back" + "to life but you forgot to add " + "conntrackd -r to your scripts"); + ret = MSG_DROP; + break; + + case SEQ_UNSET: + case SEQ_IN_SYNC: + ret = digest_msg(net); + if (ret == MSG_BAD) { + ret = MSG_BAD; + goto out; + } + + if (!ack_from_set) { + ack_from_set = 1; + ack_from = net->seq; + } + + if (--window <= 0) { + /* received a window, send an acknowledgement */ + dprint("OFS send ACK: from seq=%u to seq=%u\n", + ack_from, net->seq); + + tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq); + window = CONFIG(window_size); + ack_from_set = 0; + } + } + +out: + if ((ret == MSG_DATA || ret == MSG_CTL)) + mcast_track_update_seq(net->seq); + + return ret; } static void ftfw_send(struct nethdr *net, struct us_conntrack *u) @@ -270,11 +393,14 @@ static void ftfw_send(struct nethdr *net, struct us_conntrack *u) cn = (struct cache_ftfw *) cache_get_extra(STATE_SYNC(internal), u); - if (!list_empty(&cn->rs_list)) - list_del(&cn->rs_list); + if (!list_empty(&cn->rs_list)) { + list_del_init(&cn->rs_list); + rs_list_len--; + } cn->seq = net->seq; - list_add(&cn->rs_list, &rs_list); + list_add_tail(&cn->rs_list, &rs_list); + rs_list_len++; break; case NFCT_Q_DESTROY: queue_add(rs_queue, net, net->len); @@ -294,7 +420,7 @@ static int tx_queue_xmit(void *data1, const void *data2) HDR_NETWORK2HOST(net); if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) { - dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n", + dprint("tx_queue -> to_rs_queue sq: %u fl:%u len:%u\n", net->seq, net->flags, net->len); queue_add(rs_queue, net, net->len); } @@ -317,8 +443,7 @@ static int tx_list_xmit(struct list_head *i, struct us_conntrack *u) tx_list_len--; ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); - if (STATE_SYNC(sync)->send) - STATE_SYNC(sync)->send(net, u); + ftfw_send(net, u); return ret; } @@ -337,6 +462,14 @@ static void ftfw_run(void) u = cache_get_conntrack(STATE_SYNC(internal), cn); tx_list_xmit(&cn->tx_list, u); } + + /* reset alive alarm */ + add_alarm(&alive_alarm, 1, 0); + + dprint("tx_list_len:%u tx_queue_len:%u " + "rs_list_len: %u rs_queue_len:%u\n", + tx_list_len, queue_len(tx_queue), + rs_list_len, queue_len(rs_queue)); } struct sync_mode sync_ftfw = { diff --git a/src/sync-mode.c b/src/sync-mode.c index 3851e4a..cbb4769 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -42,8 +42,18 @@ static void do_mcast_handler_step(struct nethdr *net) struct nf_conntrack *ct = (struct nf_conntrack *)(void*) __ct; struct us_conntrack *u; - if (STATE_SYNC(sync)->recv(net)) - return; + switch (STATE_SYNC(sync)->recv(net)) { + case MSG_DATA: + break; + case MSG_DROP: + case MSG_CTL: + return; + case MSG_BAD: + STATE(malformed)++; + return; + default: + break; + } memset(ct, 0, sizeof(__ct)); @@ -211,14 +221,15 @@ static int register_fds_sync(struct fds *fds) static void run_sync(fd_set *readfds) { /* multicast packet has been received */ - if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds)) + if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds)) { mcast_handler(); - if (STATE_SYNC(sync)->run) - STATE_SYNC(sync)->run(); + if (STATE_SYNC(sync)->run) + STATE_SYNC(sync)->run(); - /* flush pending messages */ - mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client)); + /* flush pending messages */ + mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client)); + } } static void kill_sync(void) @@ -358,16 +369,8 @@ static int purge_step(void *data1, void *data2) ret = nfct_query(h, NFCT_Q_GET, u->ct); if (ret == -1 && errno == ENOENT) { - size_t len; - struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_DESTROY); - debug_ct(u->ct, "overrun purge resync"); - - len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); - mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); - if (STATE_SYNC(sync)->send) - STATE_SYNC(sync)->send(net, u); - + mcast_send_sync(u, u->ct, NFCT_Q_DESTROY); cache_del(STATE_SYNC(internal), u->ct); } @@ -402,16 +405,8 @@ static int overrun_sync(enum nf_conntrack_msg_type type, if (!cache_test(STATE_SYNC(internal), ct)) { if ((u = cache_update_force(STATE_SYNC(internal), ct))) { - size_t len; - debug_ct(u->ct, "overrun resync"); - - struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE); - len = prepare_send_netmsg(STATE_SYNC(mcast_client),net); - mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), - net, len); - if (STATE_SYNC(sync)->send) - STATE_SYNC(sync)->send(net, u); + mcast_send_sync(u, u->ct, NFCT_Q_UPDATE); } } @@ -437,7 +432,6 @@ retry: } else { if (errno == EEXIST) { cache_del(STATE_SYNC(internal), ct); - mcast_send_sync(NULL, ct, NFCT_Q_DESTROY); goto retry; } -- cgit v1.2.3