summaryrefslogtreecommitdiffstats
path: root/src/sync-mode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-mode.c')
-rw-r--r--src/sync-mode.c152
1 files changed, 89 insertions, 63 deletions
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 38ab016..f30cb95 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -27,43 +27,27 @@
#include <sys/select.h>
#include "sync.h"
#include "network.h"
+#include "buffer.h"
+#include "debug.h"
-/* handler for multicast messages received */
-static void mcast_handler()
+static void do_mcast_handler_step(struct nethdr *net)
{
- int ret;
- unsigned int type;
- char __net[4096];
- struct nethdr *net = (struct nethdr *) __net;
- struct nlmsghdr *nlh = (struct nlmsghdr *) (__net + NETHDR_SIZ);
+ unsigned int query;
+ struct netpld *pld = NETHDR_DATA(net);
char __ct[nfct_maxsize()];
struct nf_conntrack *ct = (struct nf_conntrack *) __ct;
struct us_conntrack *u = NULL;
- ret = mcast_recv_netmsg(STATE_SYNC(mcast_server), net, sizeof(__net));
- if (ret <= 0) {
- STATE(malformed)++;
- return;
- }
-
if (STATE_SYNC(sync)->recv(net))
return;
memset(ct, 0, sizeof(__ct));
- if ((type = parse_network_msg(ct, nlh)) == NFCT_T_ERROR) {
- STATE(malformed)++;
- return;
- }
+ /* XXX: check for malformed */
+ parse_netpld(ct, pld, &query);
- nfct_attr_unset(ct, ATTR_TIMEOUT);
- nfct_attr_unset(ct, ATTR_ORIG_COUNTER_BYTES);
- nfct_attr_unset(ct, ATTR_ORIG_COUNTER_PACKETS);
- nfct_attr_unset(ct, ATTR_REPL_COUNTER_BYTES);
- nfct_attr_unset(ct, ATTR_REPL_COUNTER_PACKETS);
-
- switch(type) {
- case NFCT_T_NEW:
+ switch(query) {
+ case NFCT_Q_CREATE:
retry:
if ((u = cache_add(STATE_SYNC(external), ct))) {
debug_ct(u->ct, "external new");
@@ -80,24 +64,57 @@ retry:
debug_ct(ct, "can't add");
}
break;
- case NFCT_T_UPDATE:
+ case NFCT_Q_UPDATE:
if ((u = cache_update_force(STATE_SYNC(external), ct))) {
debug_ct(u->ct, "external update");
} else
debug_ct(ct, "can't update");
break;
- case NFCT_T_DESTROY:
+ case NFCT_Q_DESTROY:
if (cache_del(STATE_SYNC(external), ct))
debug_ct(ct, "external destroy");
else
debug_ct(ct, "can't destroy");
break;
default:
- dlog(STATE(log), "mcast received unknown msg type %d\n", type);
+ dlog(STATE(log), "mcast received unknown query %d\n", query);
break;
}
}
+/* handler for multicast messages received */
+static void mcast_handler()
+{
+ int numbytes, remain;
+ char __net[4096], *ptr = __net;
+
+ numbytes = mcast_recv(STATE_SYNC(mcast_server), __net, sizeof(__net));
+ if (numbytes <= 0)
+ return;
+
+ remain = numbytes;
+ while (remain > 0) {
+ struct nethdr *net = (struct nethdr *) ptr;
+
+ if (ntohs(net->len) > remain) {
+ dlog(STATE(log), "fragmented messages");
+ break;
+ }
+
+ debug("recv sq: %u fl:%u len:%u (rem:%d)\n",
+ ntohl(net->seq), ntohs(net->flags),
+ ntohs(net->len), remain);
+
+ if (handle_netmsg(net) == -1) {
+ STATE(malformed)++;
+ return;
+ }
+ do_mcast_handler_step(net);
+ ptr += net->len;
+ remain -= net->len;
+ }
+}
+
static int init_sync(void)
{
int ret;
@@ -159,11 +176,6 @@ static int init_sync(void)
/* initialization of multicast sequence generation */
STATE_SYNC(last_seq_sent) = time(NULL);
- if (create_alarm_thread() == -1) {
- dlog(STATE(log), "[FAIL] can't initialize alarm thread");
- return -1;
- }
-
return 0;
}
@@ -174,11 +186,17 @@ static int add_fds_to_set_sync(fd_set *readfds)
return STATE_SYNC(mcast_server->fd);
}
-static void step_sync(fd_set *readfds)
+static void run_sync(fd_set *readfds, int step)
{
/* multicast packet has been received */
if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds))
mcast_handler();
+
+ if (STATE_SYNC(sync)->run)
+ STATE_SYNC(sync)->run(step);
+
+ /* flush pending messages */
+ mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
}
static void kill_sync()
@@ -189,8 +207,6 @@ static void kill_sync()
mcast_server_destroy(STATE_SYNC(mcast_server));
mcast_client_destroy(STATE_SYNC(mcast_client));
- destroy_alarm_thread();
-
if (STATE_SYNC(sync)->kill)
STATE_SYNC(sync)->kill();
}
@@ -267,10 +283,6 @@ static int local_handler_sync(int fd, int type, void *data)
STATE_SYNC(mcast_server));
dump_stats_sync(fd);
break;
- case SEND_BULK:
- dlog(STATE(log), "[REQ] sending bulk update");
- cache_bulk(STATE_SYNC(internal));
- break;
default:
if (STATE_SYNC(sync)->local)
ret = STATE_SYNC(sync)->local(fd, type, data);
@@ -280,7 +292,7 @@ static int local_handler_sync(int fd, int type, void *data)
return ret;
}
-static void dump_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static void dump_sync(struct nf_conntrack *ct)
{
/* This is required by kernels < 2.6.20 */
nfct_attr_unset(ct, ATTR_TIMEOUT);
@@ -294,23 +306,21 @@ static void dump_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
debug_ct(ct, "resync");
}
-static void mcast_send_sync(struct nlmsghdr *nlh,
- struct us_conntrack *u,
+static void mcast_send_sync(struct us_conntrack *u,
struct nf_conntrack *ct,
- int type)
+ int query)
{
- char __net[4096];
- struct nethdr *net = (struct nethdr *) __net;
-
- memset(__net, 0, sizeof(__net));
+ int len;
+ struct nethdr *net;
- if (!state_helper_verdict(type, ct))
+ if (!state_helper_verdict(query, ct))
return;
- memcpy(__net + NETHDR_SIZ, nlh, nlh->nlmsg_len);
- mcast_send_netmsg(STATE_SYNC(mcast_client), net);
+ net = BUILD_NETMSG(ct, query);
+ len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
if (STATE_SYNC(sync)->send)
- STATE_SYNC(sync)->send(type, net, u);
+ STATE_SYNC(sync)->send(net, u);
}
static int overrun_cb(enum nf_conntrack_msg_type type,
@@ -332,8 +342,16 @@ static int overrun_cb(enum nf_conntrack_msg_type type,
if (!cache_test(STATE_SYNC(internal), ct)) {
if ((u = cache_update_force(STATE_SYNC(internal), ct))) {
+ int len;
+
debug_ct(u->ct, "overrun resync");
- mcast_build_send_update(u);
+
+ struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE);
+ len = prepare_send_netmsg(STATE_SYNC(mcast_client),net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client),
+ net, len);
+ if (STATE_SYNC(sync)->send)
+ STATE_SYNC(sync)->send(net, u);
}
}
@@ -348,9 +366,17 @@ static int overrun_purge_step(void *data1, void *data2)
ret = nfct_query(h, NFCT_Q_GET, u->ct);
if (ret == -1 && errno == ENOENT) {
+ int len;
+ struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_DESTROY);
+
debug_ct(u->ct, "overrun purge resync");
- mcast_build_send_destroy(u);
- __cache_del(STATE_SYNC(internal), u->ct);
+
+ len = prepare_send_netmsg(STATE_SYNC(mcast_client), net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len);
+ if (STATE_SYNC(sync)->send)
+ STATE_SYNC(sync)->send(net, u);
+
+ cache_del(STATE_SYNC(internal), u->ct);
}
return 0;
@@ -382,7 +408,7 @@ static void overrun_sync()
nfct_close(h);
}
-static void event_new_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static void event_new_sync(struct nf_conntrack *ct)
{
struct us_conntrack *u;
@@ -394,12 +420,12 @@ static void event_new_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
nfct_attr_unset(ct, ATTR_TIMEOUT);
retry:
if ((u = cache_add(STATE_SYNC(internal), ct))) {
- mcast_send_sync(nlh, u, ct, NFCT_T_NEW);
+ mcast_send_sync(u, ct, NFCT_Q_CREATE);
debug_ct(u->ct, "internal new");
} else {
if (errno == EEXIST) {
cache_del(STATE_SYNC(internal), ct);
- mcast_send_sync(nlh, NULL, ct, NFCT_T_DESTROY);
+ mcast_send_sync(NULL, ct, NFCT_Q_DESTROY);
goto retry;
}
@@ -409,7 +435,7 @@ retry:
}
}
-static void event_update_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static void event_update_sync(struct nf_conntrack *ct)
{
struct us_conntrack *u;
@@ -420,15 +446,15 @@ static void event_update_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
return;
}
debug_ct(u->ct, "internal update");
- mcast_send_sync(nlh, u, ct, NFCT_T_UPDATE);
+ mcast_send_sync(u, ct, NFCT_Q_UPDATE);
}
-static int event_destroy_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
+static int event_destroy_sync(struct nf_conntrack *ct)
{
nfct_attr_unset(ct, ATTR_TIMEOUT);
if (cache_del(STATE_SYNC(internal), ct)) {
- mcast_send_sync(nlh, NULL, ct, NFCT_T_DESTROY);
+ mcast_send_sync(NULL, ct, NFCT_Q_DESTROY);
debug_ct(ct, "internal destroy");
} else
debug_ct(ct, "can't destroy");
@@ -437,7 +463,7 @@ static int event_destroy_sync(struct nf_conntrack *ct, struct nlmsghdr *nlh)
struct ct_mode sync_mode = {
.init = init_sync,
.add_fds_to_set = add_fds_to_set_sync,
- .step = step_sync,
+ .run = run_sync,
.local = local_handler_sync,
.kill = kill_sync,
.dump = dump_sync,