summaryrefslogtreecommitdiffstats
path: root/src/sync-mode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-mode.c')
-rw-r--r--src/sync-mode.c165
1 files changed, 148 insertions, 17 deletions
diff --git a/src/sync-mode.c b/src/sync-mode.c
index fa522c7..2505631 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -59,10 +59,29 @@ static struct nf_conntrack *msg2ct_alloc(struct nethdr *net, size_t remain)
return ct;
}
+static struct nf_expect *msg2exp_alloc(struct nethdr *net, size_t remain)
+{
+ struct nf_expect *exp;
+
+ /* TODO: add stats on ENOMEM errors in the future. */
+ exp = nfexp_new();
+ if (exp == NULL)
+ return NULL;
+
+ if (msg2exp(exp, net, remain) == -1) {
+ STATE_SYNC(error).msg_rcv_malformed++;
+ STATE_SYNC(error).msg_rcv_bad_payload++;
+ nfexp_destroy(exp);
+ return NULL;
+ }
+ return exp;
+}
+
static void
do_channel_handler_step(int i, struct nethdr *net, size_t remain)
{
- struct nf_conntrack *ct;
+ struct nf_conntrack *ct = NULL;
+ struct nf_expect *exp = NULL;
if (net->version != CONNTRACKD_PROTOCOL_VERSION) {
STATE_SYNC(error).msg_rcv_malformed++;
@@ -112,12 +131,33 @@ do_channel_handler_step(int i, struct nethdr *net, size_t remain)
return;
STATE_SYNC(external)->ct.del(ct);
break;
+ case NET_T_STATE_EXP_NEW:
+ exp = msg2exp_alloc(net, remain);
+ if (exp == NULL)
+ return;
+ STATE_SYNC(external)->exp.new(exp);
+ break;
+ case NET_T_STATE_EXP_UPD:
+ exp = msg2exp_alloc(net, remain);
+ if (exp == NULL)
+ return;
+ STATE_SYNC(external)->exp.upd(exp);
+ break;
+ case NET_T_STATE_EXP_DEL:
+ exp = msg2exp_alloc(net, remain);
+ if (exp == NULL)
+ return;
+ STATE_SYNC(external)->exp.del(exp);
+ break;
default:
STATE_SYNC(error).msg_rcv_malformed++;
STATE_SYNC(error).msg_rcv_bad_type++;
break;
}
- nfct_destroy(ct);
+ if (ct != NULL)
+ nfct_destroy(ct);
+ if (exp != NULL)
+ nfexp_destroy(exp);
}
static char __net[65536]; /* XXX: maximum MTU for IPv4 */
@@ -351,7 +391,7 @@ static int init_sync(void)
STATE(fds)) == -1)
return -1;
- STATE_SYNC(commit).h = nfct_open(CONNTRACK, 0);
+ STATE_SYNC(commit).h = nfct_open(CONFIG(netlink).subsys_id, 0);
if (STATE_SYNC(commit).h == NULL) {
dlog(LOG_ERR, "can't create handler to commit");
return -1;
@@ -402,8 +442,30 @@ static void run_sync(fd_set *readfds)
interface_handler();
if (FD_ISSET(get_read_evfd(STATE_SYNC(commit).evfd), readfds)) {
+ int ret;
+
read_evfd(STATE_SYNC(commit).evfd);
- STATE_SYNC(external)->ct.commit(STATE_SYNC(commit).h, 0);
+
+ ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, 0);
+ if (ret == 0) {
+ /* we still have things in the callback queue. */
+ if (STATE_SYNC(commit).rq[1].cb) {
+ int fd = STATE_SYNC(commit).clientfd;
+
+ STATE_SYNC(commit).rq[0].cb =
+ STATE_SYNC(commit).rq[1].cb;
+
+ STATE_SYNC(commit).rq[1].cb = NULL;
+
+ STATE_SYNC(commit).clientfd = -1;
+ STATE_SYNC(commit).rq[0].cb(
+ STATE_SYNC(commit).h, fd);
+ } else {
+ /* Close the client socket now, we're done. */
+ close(STATE_SYNC(commit).clientfd);
+ STATE_SYNC(commit).clientfd = -1;
+ }
+ }
}
/* flush pending messages */
@@ -480,6 +542,27 @@ static void dump_stats_sync_extended(int fd)
send(fd, buf, size, 0);
}
+static int local_commit(int fd)
+{
+ int ret;
+
+ /* delete the reset alarm if any before committing */
+ del_alarm(&STATE_SYNC(reset_cache_alarm));
+
+ ret = STATE_SYNC(commit).rq[0].cb(STATE_SYNC(commit).h, fd);
+ if (ret == -1) {
+ dlog(LOG_NOTICE, "commit already in progress, skipping");
+ ret = LOCAL_RET_OK;
+ } else if (ret == 0) {
+ /* we've finished the commit. */
+ ret = LOCAL_RET_OK;
+ } else {
+ /* Keep open the client, we want synchronous commit. */
+ ret = LOCAL_RET_STOLEN;
+ }
+ return ret;
+}
+
/* handler for requests coming via UNIX socket */
static int local_handler_sync(int fd, int type, void *data)
{
@@ -511,19 +594,10 @@ static int local_handler_sync(int fd, int type, void *data)
}
break;
case CT_COMMIT:
- /* delete the reset alarm if any before committing */
- del_alarm(&STATE_SYNC(reset_cache_alarm));
-
- dlog(LOG_NOTICE, "committing external cache");
- ret = STATE_SYNC(external)->ct.commit(STATE_SYNC(commit).h, fd);
- if (ret == 0) {
- dlog(LOG_NOTICE, "commit already in progress, "
- "skipping");
- ret = LOCAL_RET_OK;
- } else {
- /* Keep open the client, we want synchronous commit. */
- ret = LOCAL_RET_STOLEN;
- }
+ dlog(LOG_NOTICE, "committing conntrack cache");
+ STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->ct.commit;
+ STATE_SYNC(commit).rq[1].cb = NULL;
+ ret = local_commit(fd);
break;
case RESET_TIMERS:
if (!alarm_pending(&STATE_SYNC(reset_cache_alarm))) {
@@ -575,6 +649,63 @@ static int local_handler_sync(int fd, int type, void *data)
case STATS_QUEUE:
queue_stats_show(fd);
break;
+ case EXP_STATS:
+ if (!(CONFIG(flags) & CTD_EXPECT))
+ break;
+
+ STATE(mode)->internal->exp.stats(fd);
+ STATE_SYNC(external)->exp.stats(fd);
+ dump_traffic_stats(fd);
+ multichannel_stats(STATE_SYNC(channel), fd);
+ dump_stats_sync(fd);
+ break;
+ case EXP_DUMP_INTERNAL:
+ if (!(CONFIG(flags) & CTD_EXPECT))
+ break;
+
+ if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
+ STATE(mode)->internal->exp.dump(fd, NFCT_O_PLAIN);
+ exit(EXIT_SUCCESS);
+ }
+ break;
+ case EXP_DUMP_EXTERNAL:
+ if (!(CONFIG(flags) & CTD_EXPECT))
+ break;
+
+ if (fork_process_new(CTD_PROC_ANY, 0, NULL, NULL) == 0) {
+ STATE_SYNC(external)->exp.dump(fd, NFCT_O_PLAIN);
+ exit(EXIT_SUCCESS);
+ }
+ break;
+ case EXP_COMMIT:
+ if (!(CONFIG(flags) & CTD_EXPECT))
+ break;
+
+ dlog(LOG_NOTICE, "committing expectation cache");
+ STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->exp.commit;
+ STATE_SYNC(commit).rq[1].cb = NULL;
+ local_commit(fd);
+ break;
+ case ALL_FLUSH_CACHE:
+ dlog(LOG_NOTICE, "flushing caches");
+ STATE(mode)->internal->ct.flush();
+ STATE_SYNC(external)->ct.flush();
+ if (CONFIG(flags) & CTD_EXPECT) {
+ STATE(mode)->internal->exp.flush();
+ STATE_SYNC(external)->exp.flush();
+ }
+ break;
+ case ALL_COMMIT:
+ dlog(LOG_NOTICE, "committing all external caches");
+ STATE_SYNC(commit).rq[0].cb = STATE_SYNC(external)->ct.commit;
+ if (CONFIG(flags) & CTD_EXPECT) {
+ STATE_SYNC(commit).rq[1].cb =
+ STATE_SYNC(external)->exp.commit;
+ } else {
+ STATE_SYNC(commit).rq[1].cb = NULL;
+ }
+ local_commit(fd);
+ break;
default:
if (STATE_SYNC(sync)->local)
ret = STATE_SYNC(sync)->local(fd, type, data);