summaryrefslogtreecommitdiffstats
path: root/src/sync-ftfw.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-ftfw.c')
-rw-r--r--src/sync-ftfw.c237
1 files changed, 185 insertions, 52 deletions
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c
index cac25d0..0b98513 100644
--- a/src/sync-ftfw.c
+++ b/src/sync-ftfw.c
@@ -34,11 +34,26 @@
#define dp(...)
#endif
+#if 0
+#define dprint printf
+#else
+#define dprint(...)
+#endif
+
static LIST_HEAD(rs_list);
static LIST_HEAD(tx_list);
+static unsigned int rs_list_len;
static unsigned int tx_list_len;
static struct queue *rs_queue;
static struct queue *tx_queue;
+static uint32_t exp_seq;
+static uint32_t window;
+static uint32_t ack_from;
+static int ack_from_set = 0;
+static struct alarm_block alive_alarm;
+
+/* XXX: alive message expiration configurable */
+#define ALIVE_INT 1
struct cache_ftfw {
struct list_head rs_list;
@@ -64,6 +79,7 @@ static void cache_ftfw_del(struct us_conntrack *u, void *data)
/* no need for list_del_init since the entry is destroyed */
list_del(&cn->rs_list);
+ rs_list_len--;
}
static struct cache_extra cache_ftfw_extra = {
@@ -83,15 +99,57 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
queue_add(tx_queue, &ack, NETHDR_ACK_SIZ);
}
-static struct alarm_block alive_alarm;
+static void ftfw_run(void);
+/* this function is called from the alarm framework */
static void do_alive_alarm(struct alarm_block *a, void *data)
{
- tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
+ if (ack_from_set && mcast_track_is_seq_set()) {
+ /* exp_seq contains the last update received */
+ dprint("send ALIVE ACK (from=%u, to=%u)\n",
+ ack_from, STATE_SYNC(last_seq_recv));
+ tx_queue_add_ctlmsg(NET_F_ACK,
+ ack_from,
+ STATE_SYNC(last_seq_recv));
+ ack_from_set = 0;
+ } else
+ tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
+
+ /* TODO: no need for buffered send, extracted from run_sync() */
+ ftfw_run();
+ mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
+}
- add_alarm(&alive_alarm, 1, 0);
+#undef _SIGNAL_DEBUG
+#ifdef _SIGNAL_DEBUG
+
+static int rs_dump(void *data1, const void *data2)
+{
+ struct nethdr_ack *net = data1;
+
+ dprint("in RS queue -> seq:%u flags:%u\n", net->seq, net->flags);
+
+ return 0;
}
+#include <signal.h>
+
+static void my_dump(int foo)
+{
+ struct cache_ftfw *cn, *tmp;
+
+ list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
+ struct us_conntrack *u;
+
+ u = cache_get_conntrack(STATE_SYNC(internal), cn);
+ dprint("in RS list -> seq:%u\n", cn->seq);
+ }
+
+ queue_iterate(rs_queue, NULL, rs_dump);
+}
+
+#endif
+
static int ftfw_init(void)
{
tx_queue = queue_create(CONFIG(resend_queue_size));
@@ -106,12 +164,15 @@ static int ftfw_init(void)
return -1;
}
- INIT_LIST_HEAD(&tx_list);
- INIT_LIST_HEAD(&rs_list);
-
- /* XXX: alive message expiration configurable */
init_alarm(&alive_alarm, NULL, do_alive_alarm);
- add_alarm(&alive_alarm, 1, 0);
+ add_alarm(&alive_alarm, ALIVE_INT, 0);
+
+ /* set ack window size */
+ window = CONFIG(window_size);
+
+#ifdef _SIGNAL_DEBUG
+ signal(SIGUSR1, my_dump);
+#endif
return 0;
}
@@ -128,7 +189,7 @@ static int do_cache_to_tx(void *data1, void *data2)
struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), u);
/* add to tx list */
- list_add(&cn->tx_list, &tx_list);
+ list_add_tail(&cn->tx_list, &tx_list);
tx_list_len++;
return 0;
@@ -157,13 +218,14 @@ static int ftfw_local(int fd, int type, void *data)
static int rs_queue_to_tx(void *data1, const void *data2)
{
- struct nethdr *net = data1;
+ struct nethdr_ack *net = data1;
const struct nethdr_ack *nack = data2;
if (between(net->seq, nack->from, nack->to)) {
dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
net->seq, net->flags, net->len);
queue_add(tx_queue, net, net->len);
+ queue_del(rs_queue, net);
}
return 0;
}
@@ -182,18 +244,20 @@ static int rs_queue_empty(void *data1, const void *data2)
static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to)
{
- struct cache_ftfw *cn;
+ struct cache_ftfw *cn, *tmp;
- list_for_each_entry(cn, &rs_list, rs_list) {
+ list_for_each_entry_safe(cn, tmp, &rs_list, rs_list) {
struct us_conntrack *u;
u = cache_get_conntrack(STATE_SYNC(internal), cn);
if (between(cn->seq, from, to)) {
dp("resending nack'ed (oldseq=%u)\n", cn->seq);
- list_add(&cn->tx_list, &tx_list);
+ list_del_init(&cn->rs_list);
+ rs_list_len--;
+ list_add_tail(&cn->tx_list, &tx_list);
tx_list_len++;
- }
- }
+ }
+ }
}
static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
@@ -207,54 +271,113 @@ static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
if (between(cn->seq, from, to)) {
dp("queue: deleting from queue (seq=%u)\n", cn->seq);
list_del_init(&cn->rs_list);
- }
+ rs_list_len--;
+ }
}
}
-static int ftfw_recv(const struct nethdr *net)
+static int digest_msg(const struct nethdr *net)
{
- static unsigned int window = 0;
- unsigned int exp_seq;
+ if (IS_DATA(net))
+ return MSG_DATA;
- if (window == 0)
- window = CONFIG(window_size);
+ else if (IS_ACK(net)) {
+ const struct nethdr_ack *h = (const struct nethdr_ack *) net;
- if (!mcast_track_seq(net->seq, &exp_seq)) {
- dp("OOS: sending nack (seq=%u)\n", exp_seq);
- tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
- window = CONFIG(window_size);
- } else {
- /* received a window, send an acknowledgement */
- if (--window == 0) {
- dp("sending ack (seq=%u)\n", net->seq);
- tx_queue_add_ctlmsg(NET_F_ACK,
- net->seq - CONFIG(window_size),
- net->seq);
- }
- }
+ dprint("ACK(%u): from seq=%u to seq=%u\n",
+ h->seq, h->from, h->to);
+ rs_list_empty(STATE_SYNC(internal), h->from, h->to);
+ queue_iterate(rs_queue, h, rs_queue_empty);
+ return MSG_CTL;
- if (IS_NACK(net)) {
+ } else if (IS_NACK(net)) {
const struct nethdr_ack *nack = (const struct nethdr_ack *) net;
- dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to);
+ dprint("NACK(%u): from seq=%u to seq=%u\n",
+ nack->seq, nack->from, nack->to);
rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to);
queue_iterate(rs_queue, nack, rs_queue_to_tx);
- return 1;
+ return MSG_CTL;
+
} else if (IS_RESYNC(net)) {
dp("RESYNC ALL\n");
cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx);
- return 1;
- } else if (IS_ACK(net)) {
- const struct nethdr_ack *h = (const struct nethdr_ack *) net;
+ return MSG_CTL;
- dp("ACK: from seq=%u to seq=%u\n", h->from, h->to);
- rs_list_empty(STATE_SYNC(internal), h->from, h->to);
- queue_iterate(rs_queue, h, rs_queue_empty);
- return 1;
} else if (IS_ALIVE(net))
- return 1;
+ return MSG_CTL;
- return 0;
+ return MSG_BAD;
+}
+
+static int ftfw_recv(const struct nethdr *net)
+{
+ int ret = MSG_DATA;
+
+ switch (mcast_track_seq(net->seq, &exp_seq)) {
+ case SEQ_AFTER:
+ ret = digest_msg(net);
+ if (ret == MSG_BAD) {
+ ret = MSG_BAD;
+ goto out;
+ }
+
+ if (ack_from_set) {
+ tx_queue_add_ctlmsg(NET_F_ACK, ack_from, exp_seq-1);
+ dprint("OFS send half ACK: from seq=%u to seq=%u\n",
+ ack_from, exp_seq-1);
+ ack_from_set = 0;
+ }
+
+ tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
+ dprint("OFS send NACK: from seq=%u to seq=%u\n",
+ exp_seq, net->seq-1);
+
+ /* count this message as part of the new window */
+ window = CONFIG(window_size) - 1;
+ ack_from = net->seq;
+ ack_from_set = 1;
+ break;
+
+ case SEQ_BEFORE:
+ /* we don't accept delayed packets */
+ dlog(LOG_WARNING, "Received seq=%u before expected seq=%u",
+ net->seq, exp_seq);
+ dlog(LOG_WARNING, "Probably the other node has come back"
+ "to life but you forgot to add "
+ "conntrackd -r to your scripts");
+ ret = MSG_DROP;
+ break;
+
+ case SEQ_UNSET:
+ case SEQ_IN_SYNC:
+ ret = digest_msg(net);
+ if (ret == MSG_BAD) {
+ ret = MSG_BAD;
+ goto out;
+ }
+
+ if (!ack_from_set) {
+ ack_from_set = 1;
+ ack_from = net->seq;
+ }
+
+ if (--window <= 0) {
+ /* received a window, send an acknowledgement */
+ dprint("OFS send ACK: from seq=%u to seq=%u\n",
+ ack_from, net->seq);
+
+ tx_queue_add_ctlmsg(NET_F_ACK, ack_from, net->seq);
+ window = CONFIG(window_size);
+ ack_from_set = 0;
+ }
+ }
+
+out:
+ if ((ret == MSG_DATA || ret == MSG_CTL))
+ mcast_track_update_seq(net->seq);
+
+ return ret;
}
static void ftfw_send(struct nethdr *net, struct us_conntrack *u)
@@ -270,11 +393,14 @@ static void ftfw_send(struct nethdr *net, struct us_conntrack *u)
cn = (struct cache_ftfw *)
cache_get_extra(STATE_SYNC(internal), u);
- if (!list_empty(&cn->rs_list))
- list_del(&cn->rs_list);
+ if (!list_empty(&cn->rs_list)) {
+ list_del_init(&cn->rs_list);
+ rs_list_len--;
+ }
cn->seq = net->seq;
- list_add(&cn->rs_list, &rs_list);
+ list_add_tail(&cn->rs_list, &rs_list);
+ rs_list_len++;
break;
case NFCT_Q_DESTROY:
queue_add(rs_queue, net, net->len);
@@ -294,7 +420,7 @@ static int tx_queue_xmit(void *data1, const void *data2)
HDR_NETWORK2HOST(net);
if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) {
- dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n",
+ dprint("tx_queue -> to_rs_queue sq: %u fl:%u len:%u\n",
net->seq, net->flags, net->len);
queue_add(rs_queue, net, net->len);
}
@@ -317,8 +443,7 @@ static int tx_list_xmit(struct list_head *i, struct us_conntrack *u)
tx_list_len--;
ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
- if (STATE_SYNC(sync)->send)
- STATE_SYNC(sync)->send(net, u);
+ ftfw_send(net, u);
return ret;
}
@@ -337,6 +462,14 @@ static void ftfw_run(void)
u = cache_get_conntrack(STATE_SYNC(internal), cn);
tx_list_xmit(&cn->tx_list, u);
}
+
+ /* reset alive alarm */
+ add_alarm(&alive_alarm, 1, 0);
+
+ dprint("tx_list_len:%u tx_queue_len:%u "
+ "rs_list_len: %u rs_queue_len:%u\n",
+ tx_list_len, queue_len(tx_queue),
+ rs_list_len, queue_len(rs_queue));
}
struct sync_mode sync_ftfw = {