summaryrefslogtreecommitdiffstats
path: root/src/sync-nack.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-nack.c')
-rw-r--r--src/sync-nack.c260
1 files changed, 180 insertions, 80 deletions
diff --git a/src/sync-nack.c b/src/sync-nack.c
index 20ad1f4..dbda0a7 100644
--- a/src/sync-nack.c
+++ b/src/sync-nack.c
@@ -24,6 +24,7 @@
#include "buffer.h"
#include "debug.h"
#include "network.h"
+#include "alarm.h"
#include <libnfnetlink/libnfnetlink.h>
#include <libnetfilter_conntrack/libnetfilter_conntrack.h>
@@ -33,28 +34,34 @@
#define dp
#endif
-static LIST_HEAD(queue);
+static LIST_HEAD(rs_list);
+static LIST_HEAD(tx_list);
+static unsigned int tx_list_len;
+static struct buffer *rs_queue;
+static struct buffer *tx_queue;
struct cache_nack {
- struct list_head head;
+ struct list_head rs_list;
+ struct list_head tx_list;
u_int32_t seq;
};
static void cache_nack_add(struct us_conntrack *u, void *data)
{
struct cache_nack *cn = data;
- INIT_LIST_HEAD(&cn->head);
+ INIT_LIST_HEAD(&cn->rs_list);
+ INIT_LIST_HEAD(&cn->tx_list);
}
static void cache_nack_del(struct us_conntrack *u, void *data)
{
struct cache_nack *cn = data;
- if (cn->head.next == &cn->head &&
- cn->head.prev == &cn->head)
+ if (cn->rs_list.next == &cn->rs_list &&
+ cn->rs_list.prev == &cn->rs_list)
return;
- list_del(&cn->head);
+ list_del(&cn->rs_list);
}
static struct cache_extra cache_nack_extra = {
@@ -65,19 +72,31 @@ static struct cache_extra cache_nack_extra = {
static int nack_init()
{
- STATE_SYNC(buffer) = buffer_create(CONFIG(resend_buffer_size));
- if (STATE_SYNC(buffer) == NULL)
+ tx_queue = buffer_create(CONFIG(resend_buffer_size));
+ if (tx_queue == NULL) {
+ dlog(STATE(log), "[FAIL] cannot create tx buffer");
return -1;
+ }
+
+ rs_queue = buffer_create(CONFIG(resend_buffer_size));
+ if (rs_queue == NULL) {
+ dlog(STATE(log), "[FAIL] cannot create rs buffer");
+ return -1;
+ }
+
+ INIT_LIST_HEAD(&tx_list);
+ INIT_LIST_HEAD(&rs_list);
return 0;
}
static void nack_kill()
{
- buffer_destroy(STATE_SYNC(buffer));
+ buffer_destroy(rs_queue);
+ buffer_destroy(tx_queue);
}
-static void mcast_send_control(u_int32_t flags, u_int32_t from, u_int32_t to)
+static void tx_queue_add_ctlmsg(u_int32_t flags, u_int32_t from, u_int32_t to)
{
struct nethdr_ack ack = {
.flags = flags,
@@ -85,8 +104,19 @@ static void mcast_send_control(u_int32_t flags, u_int32_t from, u_int32_t to)
.to = to,
};
- mcast_send_error(STATE_SYNC(mcast_client), &ack);
- buffer_add(STATE_SYNC(buffer), &ack, NETHDR_ACK_SIZ);
+ buffer_add(tx_queue, &ack, NETHDR_ACK_SIZ);
+}
+
+static int do_cache_to_tx(void *data1, void *data2)
+{
+ struct us_conntrack *u = data2;
+ struct cache_nack *cn = cache_get_extra(STATE_SYNC(internal), u);
+
+ /* add to tx list */
+ list_add(&cn->tx_list, &tx_list);
+ tx_list_len++;
+
+ return 0;
}
static int nack_local(int fd, int type, void *data)
@@ -94,85 +124,78 @@ static int nack_local(int fd, int type, void *data)
int ret = 1;
switch(type) {
- case REQUEST_DUMP:
- mcast_send_control(NET_F_RESYNC, 0, 0);
- dlog(STATE(log), "[REQ] request resync");
- break;
- default:
- ret = 0;
- break;
+ case REQUEST_DUMP:
+ dlog(STATE(log), "[REQ] request resync");
+ tx_queue_add_ctlmsg(NET_F_RESYNC, 0, 0);
+ break;
+ case SEND_BULK:
+ dlog(STATE(log), "[REQ] sending bulk update");
+ cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx);
+ break;
+ default:
+ ret = 0;
+ break;
}
return ret;
}
-static int buffer_compare(void *data1, void *data2)
+static int rs_queue_to_tx(void *data1, void *data2)
{
struct nethdr *net = data1;
struct nethdr_ack *nack = data2;
- struct nlmsghdr *nlh = data1 + NETHDR_SIZ;
-
- unsigned old_seq = ntohl(net->seq);
- if (between(ntohl(net->seq), nack->from, nack->to)) {
- if (mcast_resend_netmsg(STATE_SYNC(mcast_client), net))
- dp("resend destroy (old seq=%u) (seq=%u)\n",
- old_seq, ntohl(net->seq));
+ if (between(net->seq, nack->from, nack->to)) {
+ dp("rs_queue_to_tx sq: %u fl:%u len:%u\n",
+ net->seq, net->flags, net->len);
+ buffer_add(tx_queue, net, net->len);
}
return 0;
}
-static int buffer_remove(void *data1, void *data2)
+static int rs_queue_empty(void *data1, void *data2)
{
struct nethdr *net = data1;
struct nethdr_ack *h = data2;
- if (between(ntohl(net->seq), h->from, h->to)) {
- dp("remove from buffer (seq=%u)\n", ntohl(net->seq));
- __buffer_del(STATE_SYNC(buffer), data1);
+ if (between(net->seq, h->from, h->to)) {
+ dp("remove from buffer (seq=%u)\n", net->seq);
+ buffer_del(rs_queue, data1);
}
return 0;
}
-static void queue_resend(struct cache *c, unsigned int from, unsigned int to)
+static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to)
{
struct list_head *n;
struct us_conntrack *u;
- list_for_each(n, &queue) {
+ list_for_each(n, &rs_list) {
struct cache_nack *cn = (struct cache_nack *) n;
struct us_conntrack *u;
u = cache_get_conntrack(STATE_SYNC(internal), cn);
-
if (between(cn->seq, from, to)) {
- debug_ct(u->ct, "resend nack");
- dp("resending nack'ed (oldseq=%u) ", cn->seq);
-
- if (mcast_build_send_update(u) == -1)
- continue;
-
- dp("(newseq=%u)\n", cn->seq);
+ dp("resending nack'ed (oldseq=%u)\n", cn->seq);
+ list_add(&cn->tx_list, &tx_list);
+ tx_list_len++;
}
}
}
-static void queue_empty(struct cache *c, unsigned int from, unsigned int to)
+static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to)
{
struct list_head *n, *tmp;
- struct us_conntrack *u;
- dp("ACK from %u to %u\n", from, to);
- list_for_each_safe(n, tmp, &queue) {
+ list_for_each_safe(n, tmp, &rs_list) {
struct cache_nack *cn = (struct cache_nack *) n;
+ struct us_conntrack *u;
u = cache_get_conntrack(STATE_SYNC(internal), cn);
if (between(cn->seq, from, to)) {
- dp("remove %u\n", cn->seq);
- debug_ct(u->ct, "ack received: empty queue");
dp("queue: deleting from queue (seq=%u)\n", cn->seq);
- list_del(&cn->head);
- INIT_LIST_HEAD(&cn->head);
+ list_del(&cn->rs_list);
+ INIT_LIST_HEAD(&cn->rs_list);
}
}
}
@@ -187,73 +210,149 @@ static int nack_recv(const struct nethdr *net)
if (!mcast_track_seq(net->seq, &exp_seq)) {
dp("OOS: sending nack (seq=%u)\n", exp_seq);
- mcast_send_control(NET_F_NACK, exp_seq, net->seq - 1);
+ tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1);
window = CONFIG(window_size);
} else {
/* received a window, send an acknowledgement */
if (--window == 0) {
dp("sending ack (seq=%u)\n", net->seq);
- mcast_send_control(NET_F_ACK,
- net->seq - CONFIG(window_size),
- net->seq);
+ tx_queue_add_ctlmsg(NET_F_ACK,
+ net->seq - CONFIG(window_size),
+ net->seq);
}
}
- if (net->flags & NET_F_NACK) {
+ if (IS_NACK(net)) {
struct nethdr_ack *nack = (struct nethdr_ack *) net;
dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to);
- queue_resend(STATE_SYNC(internal), nack->from, nack->to);
- buffer_iterate(STATE_SYNC(buffer), nack, buffer_compare);
+ rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to);
+ buffer_iterate(rs_queue, nack, rs_queue_to_tx);
return 1;
- } else if (net->flags & NET_F_RESYNC) {
+ } else if (IS_RESYNC(net)) {
dp("RESYNC ALL\n");
- cache_bulk(STATE_SYNC(internal));
+ cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx);
return 1;
- } else if (net->flags & NET_F_ACK) {
+ } else if (IS_ACK(net)) {
struct nethdr_ack *h = (struct nethdr_ack *) net;
dp("ACK: from seq=%u to seq=%u\n", h->from, h->to);
- queue_empty(STATE_SYNC(internal), h->from, h->to);
- buffer_iterate(STATE_SYNC(buffer), h, buffer_remove);
+ rs_list_empty(STATE_SYNC(internal), h->from, h->to);
+ buffer_iterate(rs_queue, h, rs_queue_empty);
+ return 1;
+ } else if (IS_ALIVE(net))
return 1;
- }
return 0;
}
-static void nack_send(int type,
- const struct nethdr *net,
- struct us_conntrack *u)
+static void nack_send(struct nethdr *net, struct us_conntrack *u)
{
- int size = NETHDR_SIZ;
- struct nlmsghdr *nlh = (struct nlmsghdr *) ((void *) net + size);
+ struct netpld *pld = NETHDR_DATA(net);
struct cache_nack *cn;
-
- size += ntohl(nlh->nlmsg_len);
- switch(type) {
- case NFCT_T_NEW:
- case NFCT_T_UPDATE:
+ HDR_NETWORK2HOST(net);
+
+ switch(ntohs(pld->query)) {
+ case NFCT_Q_CREATE:
+ case NFCT_Q_UPDATE:
cn = (struct cache_nack *)
cache_get_extra(STATE_SYNC(internal), u);
- if (cn->head.next == &cn->head &&
- cn->head.prev == &cn->head)
+ if (cn->rs_list.next == &cn->rs_list &&
+ cn->rs_list.prev == &cn->rs_list)
goto insert;
- list_del(&cn->head);
- INIT_LIST_HEAD(&cn->head);
+ list_del(&cn->rs_list);
+ INIT_LIST_HEAD(&cn->rs_list);
insert:
- cn->seq = ntohl(net->seq);
- list_add(&cn->head, &queue);
+ cn->seq = net->seq;
+ list_add(&cn->rs_list, &rs_list);
break;
- case NFCT_T_DESTROY:
- buffer_add(STATE_SYNC(buffer), net, size);
+ case NFCT_Q_DESTROY:
+ buffer_add(rs_queue, net, net->len);
break;
}
}
+static int tx_queue_xmit(void *data1, void *data2)
+{
+ struct nethdr *net = data1;
+ int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
+
+ dp("tx_queue sq: %u fl:%u len:%u\n",
+ ntohl(net->seq), ntohs(net->flags), ntohs(net->len));
+
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
+ HDR_NETWORK2HOST(net);
+
+ if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) {
+ dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n",
+ net->seq, net->flags, net->len);
+ buffer_add(rs_queue, net, net->len);
+ }
+ buffer_del(tx_queue, net);
+
+ return 0;
+}
+
+static int tx_list_xmit(struct list_head *i, struct us_conntrack *u)
+{
+ int ret;
+ struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE);
+ int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
+
+ dp("tx_list sq: %u fl:%u len:%u\n",
+ ntohl(net->seq), ntohs(net->flags),
+ ntohs(net->len));
+
+ list_del(i);
+ INIT_LIST_HEAD(i);
+ tx_list_len--;
+
+ ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
+ if (STATE_SYNC(sync)->send)
+ STATE_SYNC(sync)->send(net, u);
+
+ return ret;
+}
+
+static struct alarm_list alive_alarm;
+
+static void do_alive_alarm(struct alarm_list *a, void *data)
+{
+ del_alarm(a);
+ tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0);
+}
+
+static void nack_run(int step)
+{
+ struct list_head *i, *tmp;
+
+ /* send messages in the tx_queue */
+ buffer_iterate(tx_queue, NULL, tx_queue_xmit);
+
+ /* send conntracks in the tx_list */
+ list_for_each_safe(i, tmp, &tx_list) {
+ struct cache_nack *cn;
+ struct us_conntrack *u;
+
+ cn = container_of(i, struct cache_nack, tx_list);
+ u = cache_get_conntrack(STATE_SYNC(internal), cn);
+ tx_list_xmit(i, u);
+ }
+
+ if (alive_alarm.expires > 0)
+ mod_alarm(&alive_alarm, 1);
+ else {
+ init_alarm(&alive_alarm);
+ /* XXX: alive message expiration configurable */
+ set_alarm_expiration(&alive_alarm, 1);
+ set_alarm_function(&alive_alarm, do_alive_alarm);
+ add_alarm(&alive_alarm);
+ }
+}
+
struct sync_mode nack = {
.internal_cache_flags = LIFETIME,
.external_cache_flags = LIFETIME,
@@ -263,4 +362,5 @@ struct sync_mode nack = {
.local = nack_local,
.recv = nack_recv,
.send = nack_send,
+ .run = nack_run,
};