summaryrefslogtreecommitdiffstats
path: root/src/sync-mode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-mode.c')
-rw-r--r--src/sync-mode.c109
1 files changed, 87 insertions, 22 deletions
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 00e2f7b..0dbd12d 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -33,8 +33,10 @@
#include <time.h>
#include <string.h>
#include <stdlib.h>
+#include <net/if.h>
-static void do_mcast_handler_step(struct nethdr *net, size_t remain)
+static void
+do_mcast_handler_step(int if_idx, struct nethdr *net, size_t remain)
{
char __ct[nfct_maxsize()];
struct nf_conntrack *ct = (struct nf_conntrack *)(void*) __ct;
@@ -47,6 +49,9 @@ static void do_mcast_handler_step(struct nethdr *net, size_t remain)
return;
}
+ if (if_idx != mcast_get_current_ifidx(STATE_SYNC(mcast_client)))
+ mcast_set_current_link(STATE_SYNC(mcast_client), if_idx);
+
switch (STATE_SYNC(sync)->recv(net)) {
case MSG_DATA:
break;
@@ -111,13 +116,13 @@ retry:
}
/* handler for multicast messages received */
-static void mcast_handler(void)
+static void mcast_handler(struct mcast_sock *m, int if_idx)
{
ssize_t numbytes;
ssize_t remain;
char __net[65536], *ptr = __net; /* XXX: maximum MTU for IPv4 */
- numbytes = mcast_recv(STATE_SYNC(mcast_server), __net, sizeof(__net));
+ numbytes = mcast_recv(m, __net, sizeof(__net));
if (numbytes <= 0)
return;
@@ -160,12 +165,46 @@ static void mcast_handler(void)
HDR_NETWORK2HOST(net);
- do_mcast_handler_step(net, remain);
+ do_mcast_handler_step(if_idx, net, remain);
ptr += net->len;
remain -= net->len;
}
}
+/* select a new interface candidate in a round robin basis */
+static void mcast_iface_candidate(void)
+{
+ int i, idx;
+ unsigned int flags;
+ char buf[IFNAMSIZ];
+
+ for (i=0; i<STATE_SYNC(mcast_client)->num_links; i++) {
+ idx = mcast_get_ifidx(STATE_SYNC(mcast_client), i);
+ if (idx == mcast_get_current_ifidx(STATE_SYNC(mcast_client)))
+ continue;
+ nlif_get_ifflags(STATE_SYNC(mcast_iface), idx, &flags);
+ if (flags & (IFF_RUNNING | IFF_UP)) {
+ mcast_set_current_link(STATE_SYNC(mcast_client), i);
+ dlog(LOG_NOTICE, "device `%s' becomes multicast "
+ "dedicated link",
+ if_indextoname(idx, buf));
+ return;
+ }
+ }
+ dlog(LOG_ERR, "no dedicated links available!");
+}
+
+static void mcast_iface_handler(void)
+{
+ int idx = mcast_get_current_ifidx(STATE_SYNC(mcast_client));
+ unsigned int flags;
+
+ nlif_catch(STATE_SYNC(mcast_iface));
+ nlif_get_ifflags(STATE_SYNC(mcast_iface), idx, &flags);
+ if (!(flags & IFF_RUNNING) || !(flags & IFF_UP))
+ mcast_iface_candidate();
+}
+
static int init_sync(void)
{
state.sync = malloc(sizeof(struct ct_sync_state));
@@ -216,30 +255,35 @@ static int init_sync(void)
}
/* multicast server to receive events from the wire */
- STATE_SYNC(mcast_server) = mcast_server_create(&CONFIG(mcast));
+ STATE_SYNC(mcast_server) =
+ mcast_server_create_multi(CONFIG(mcast), CONFIG(mcast_links));
if (STATE_SYNC(mcast_server) == NULL) {
dlog(LOG_ERR, "can't open multicast server!");
return -1;
}
- dlog(LOG_NOTICE, "multicast server socket receiver queue "
- "has been set to %d bytes", CONFIG(mcast).rcvbuf);
-
/* multicast client to send events on the wire */
- STATE_SYNC(mcast_client) = mcast_client_create(&CONFIG(mcast));
+ STATE_SYNC(mcast_client) =
+ mcast_client_create_multi(CONFIG(mcast), CONFIG(mcast_links));
if (STATE_SYNC(mcast_client) == NULL) {
dlog(LOG_ERR, "can't open client multicast socket");
- mcast_server_destroy(STATE_SYNC(mcast_server));
+ mcast_server_destroy_multi(STATE_SYNC(mcast_server));
return -1;
}
+ /* we only use one link to send events, but all to receive them */
+ mcast_set_current_link(STATE_SYNC(mcast_client),
+ CONFIG(mcast_default_link));
- dlog(LOG_NOTICE, "multicast client socket sender queue "
- "has been set to %d bytes", CONFIG(mcast).sndbuf);
-
- if (mcast_buffered_init(CONFIG(mcast).mtu) == -1) {
+ if (mcast_buffered_init(STATE_SYNC(mcast_client)->max_mtu) == -1) {
dlog(LOG_ERR, "can't init tx buffer!");
- mcast_server_destroy(STATE_SYNC(mcast_server));
- mcast_client_destroy(STATE_SYNC(mcast_client));
+ mcast_server_destroy_multi(STATE_SYNC(mcast_server));
+ mcast_client_destroy_multi(STATE_SYNC(mcast_client));
+ return -1;
+ }
+
+ STATE_SYNC(mcast_iface) = nl_init_interface_handler();
+ if (!STATE_SYNC(mcast_iface)) {
+ dlog(LOG_ERR, "can't open interface watcher");
return -1;
}
@@ -257,7 +301,14 @@ static int init_sync(void)
static int register_fds_sync(struct fds *fds)
{
- if (register_fd(STATE_SYNC(mcast_server->fd), fds) == -1)
+ int i;
+
+ for (i=0; i<STATE_SYNC(mcast_server)->num_links; i++) {
+ int fd = mcast_get_fd(STATE_SYNC(mcast_server)->multi[i]);
+ if (register_fd(fd, fds) == -1)
+ return -1;
+ }
+ if (register_fd(nlif_fd(STATE_SYNC(mcast_iface)), fds) == -1)
return -1;
if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)), fds) == -1)
@@ -268,13 +319,20 @@ static int register_fds_sync(struct fds *fds)
static void run_sync(fd_set *readfds)
{
- /* multicast packet has been received */
- if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds))
- mcast_handler();
+ int i;
+
+ for (i=0; i<STATE_SYNC(mcast_server)->num_links; i++) {
+ int fd = mcast_get_fd(STATE_SYNC(mcast_server)->multi[i]);
+ if (FD_ISSET(fd, readfds))
+ mcast_handler(STATE_SYNC(mcast_server)->multi[i], i);
+ }
if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds))
STATE_SYNC(sync)->xmit();
+ if (FD_ISSET(nlif_fd(STATE_SYNC(mcast_iface)), readfds))
+ mcast_iface_handler();
+
/* flush pending messages */
mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client));
}
@@ -284,8 +342,10 @@ static void kill_sync(void)
cache_destroy(STATE_SYNC(internal));
cache_destroy(STATE_SYNC(external));
- mcast_server_destroy(STATE_SYNC(mcast_server));
- mcast_client_destroy(STATE_SYNC(mcast_client));
+ mcast_server_destroy_multi(STATE_SYNC(mcast_server));
+ mcast_client_destroy_multi(STATE_SYNC(mcast_client));
+
+ nlif_close(STATE_SYNC(mcast_iface));
mcast_buffered_destroy();
queue_destroy(STATE_SYNC(tx_queue));
@@ -418,6 +478,11 @@ static int local_handler_sync(int fd, int type, void *data)
cache_stats_extended(STATE_SYNC(internal), fd);
cache_stats_extended(STATE_SYNC(external), fd);
break;
+ case STATS_MULTICAST:
+ mcast_dump_stats_extended(fd, STATE_SYNC(mcast_client),
+ STATE_SYNC(mcast_server),
+ STATE_SYNC(mcast_iface));
+ break;
default:
if (STATE_SYNC(sync)->local)
ret = STATE_SYNC(sync)->local(fd, type, data);