From 5bac4f06e8ebc81ed16ec93a0db8682e6a359608 Mon Sep 17 00:00:00 2001 From: "/C=EU/ST=EU/CN=Pablo Neira Ayuso/emailAddress=pablo@netfilter.org" Date: Fri, 21 Dec 2007 18:04:49 +0000 Subject: o Use more appropriate names for the existing synchronization modes: o rename `persistent' mode to `alarm' o rename `nack' mode to `ftfw' o Now default synchronization mode is ftfw instead of alarm --- src/Makefile.am | 2 +- src/main.c | 2 +- src/read_config_lex.l | 14 +- src/read_config_yy.y | 10 +- src/sync-alarm.c | 104 ++++++++++++++ src/sync-ftfw.c | 366 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/sync-mode.c | 8 +- src/sync-nack.c | 366 -------------------------------------------------- src/sync-notrack.c | 104 -------------- 9 files changed, 493 insertions(+), 483 deletions(-) create mode 100644 src/sync-alarm.c create mode 100644 src/sync-ftfw.c delete mode 100644 src/sync-nack.c delete mode 100644 src/sync-notrack.c (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 8f5c620..1fac3dc 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -15,7 +15,7 @@ conntrackd_SOURCES = alarm.c main.c run.c hash.c buffer.c \ ignore_pool.c \ cache.c cache_iterators.c \ cache_lifetime.c cache_timer.c cache_wt.c \ - sync-mode.c sync-notrack.c sync-nack.c \ + sync-mode.c sync-alarm.c sync-ftfw.c \ traffic_stats.c stats-mode.c \ network.c \ state_helper.c state_helper_tcp.c \ diff --git a/src/main.c b/src/main.c index 712b572..a3164a6 100644 --- a/src/main.c +++ b/src/main.c @@ -46,7 +46,7 @@ static const char usage_client_commands[] = " -k, kill conntrack daemon\n" " -s, dump statistics\n" " -R, resync with kernel conntrack table\n" - " -n, request resync with other node (only NACK mode)\n" + " -n, request resync with other node (only FT-FW mode)\n" " -x, dump cache in XML format (requires -i or -e)"; static const char usage_options[] = diff --git a/src/read_config_lex.l b/src/read_config_lex.l index 844cae1..55794a8 100644 --- a/src/read_config_lex.l +++ b/src/read_config_lex.l @@ -45,6 +45,8 @@ ip6 {ip6_form1}|{ip6_form2} string [a-zA-Z][a-zA-Z0-9]* persistent [P|p][E|e][R|r][S|s][I|i][S|s][T|t][E|e][N|n][T|T] nack [N|n][A|a][C|c][K|k] +alarm [A|a][L|l][A|a][R|r][M|m] +ftfw [F|f][T|t][F|f][W|w] %% "UNIX" { return T_UNIX; } @@ -107,8 +109,16 @@ nack [N|n][A|a][C|c][K|k] {ip4} { yylval.string = strdup(yytext); return T_IP; } {ip6} { yylval.string = strdup(yytext); return T_IP; } {path} { yylval.string = strdup(yytext); return T_PATH_VAL; } -{persistent} { return T_PERSISTENT; } -{nack} { return T_NACK; } +{alarm} { return T_ALARM; } +{persistent} { printf("WARNING: Now `persistent' mode is called " + "`alarm'. Please, update your " + "your conntrackd.conf file.\n"); + return T_ALARM; } +{ftfw} { return T_FTFW; } +{nack} { printf("WARNING: Now `nack' mode is called " + "`ftfw'. Please, update your " + "your conntrackd.conf file.\n"); + return T_FTFW; } {string} { yylval.string = strdup(yytext); return T_STRING; } {comment} ; diff --git a/src/read_config_yy.y b/src/read_config_yy.y index e5ce195..795aae9 100644 --- a/src/read_config_yy.y +++ b/src/read_config_yy.y @@ -45,7 +45,7 @@ struct ct_conf conf; %token T_LOCK T_STRIP_NAT T_BUFFER_SIZE_MAX_GROWN T_EXPIRE T_TIMEOUT %token T_GENERAL T_SYNC T_STATS T_RELAX_TRANSITIONS T_BUFFER_SIZE T_DELAY %token T_SYNC_MODE T_LISTEN_TO T_FAMILY T_RESEND_BUFFER_SIZE -%token T_PERSISTENT T_NACK T_CHECKSUM T_WINDOWSIZE T_ON T_OFF +%token T_ALARM T_FTFW T_CHECKSUM T_WINDOWSIZE T_ON T_OFF %token T_REPLICATE T_FOR T_IFACE %token T_ESTABLISHED T_SYN_SENT T_SYN_RECV T_FIN_WAIT %token T_CLOSE_WAIT T_LAST_ACK T_TIME_WAIT T_CLOSE T_LISTEN @@ -369,14 +369,14 @@ sync_line: refreshtime | cache_writethrough ; -sync_mode_persistent: T_SYNC_MODE T_PERSISTENT '{' sync_mode_persistent_list '}' +sync_mode_persistent: T_SYNC_MODE T_ALARM '{' sync_mode_persistent_list '}' { - conf.flags |= SYNC_MODE_PERSISTENT; + conf.flags |= SYNC_MODE_ALARM; }; -sync_mode_nack: T_SYNC_MODE T_NACK '{' sync_mode_nack_list '}' +sync_mode_nack: T_SYNC_MODE T_FTFW '{' sync_mode_nack_list '}' { - conf.flags |= SYNC_MODE_NACK; + conf.flags |= SYNC_MODE_FTFW; }; sync_mode_persistent_list: diff --git a/src/sync-alarm.c b/src/sync-alarm.c new file mode 100644 index 0000000..a0791ac --- /dev/null +++ b/src/sync-alarm.c @@ -0,0 +1,104 @@ +/* + * (C) 2006-2007 by Pablo Neira Ayuso + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include "conntrackd.h" +#include "sync.h" +#include "network.h" +#include "us-conntrack.h" +#include "alarm.h" + +static void refresher(struct alarm_list *a, void *data) +{ + int len; + struct nethdr *net; + struct us_conntrack *u = data; + + debug_ct(u->ct, "persistence update"); + + a->expires = random() % CONFIG(refresh) + 1; + net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE); + len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); +} + +static void cache_alarm_add(struct us_conntrack *u, void *data) +{ + struct alarm_list *alarm = data; + + init_alarm(alarm); + set_alarm_expiration(alarm, (random() % conf.refresh) + 1); + set_alarm_data(alarm, u); + set_alarm_function(alarm, refresher); + add_alarm(alarm); +} + +static void cache_alarm_update(struct us_conntrack *u, void *data) +{ + struct alarm_list *alarm = data; + mod_alarm(alarm, (random() % conf.refresh) + 1); +} + +static void cache_alarm_destroy(struct us_conntrack *u, void *data) +{ + struct alarm_list *alarm = data; + del_alarm(alarm); +} + +static struct cache_extra cache_alarm_extra = { + .size = sizeof(struct alarm_list), + .add = cache_alarm_add, + .update = cache_alarm_update, + .destroy = cache_alarm_destroy +}; + +static int alarm_recv(const struct nethdr *net) +{ + unsigned int exp_seq; + + /* + * Ignore error messages: Although this message type is not ever + * generated in alarm mode, we don't want to crash the daemon + * if someone nuts mixes ftfw and alarm. + */ + if (net->flags) + return 1; + + /* + * Multicast sequence tracking: we keep track of multicast messages + * although we don't do any explicit message recovery. So, why do + * we do sequence tracking? Just to let know the sysadmin. + * + * Let t be 1 < t < RefreshTime. To ensure consistency, conntrackd + * retransmit every t seconds a message with the state of a certain + * entry even if such entry did not change. This mechanism also + * provides passive resynchronization, in other words, there is + * no facility to request a full synchronization from new nodes that + * just joined the cluster, instead they just get resynchronized in + * RefreshTime seconds at worst case. + */ + mcast_track_seq(net->seq, &exp_seq); + + return 0; +} + +struct sync_mode alarm = { + .internal_cache_flags = LIFETIME, + .external_cache_flags = TIMER | LIFETIME, + .internal_cache_extra = &cache_alarm_extra, + .recv = alarm_recv, +}; diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c new file mode 100644 index 0000000..2f27fe6 --- /dev/null +++ b/src/sync-ftfw.c @@ -0,0 +1,366 @@ +/* + * (C) 2006-2007 by Pablo Neira Ayuso + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include "conntrackd.h" +#include "sync.h" +#include "linux_list.h" +#include "us-conntrack.h" +#include "buffer.h" +#include "debug.h" +#include "network.h" +#include "alarm.h" +#include +#include + +#if 0 +#define dp printf +#else +#define dp +#endif + +static LIST_HEAD(rs_list); +static LIST_HEAD(tx_list); +static unsigned int tx_list_len; +static struct buffer *rs_queue; +static struct buffer *tx_queue; + +struct cache_ftfw { + struct list_head rs_list; + struct list_head tx_list; + u_int32_t seq; +}; + +static void cache_ftfw_add(struct us_conntrack *u, void *data) +{ + struct cache_ftfw *cn = data; + INIT_LIST_HEAD(&cn->rs_list); + INIT_LIST_HEAD(&cn->tx_list); +} + +static void cache_ftfw_del(struct us_conntrack *u, void *data) +{ + struct cache_ftfw *cn = data; + + if (cn->rs_list.next == &cn->rs_list && + cn->rs_list.prev == &cn->rs_list) + return; + + list_del(&cn->rs_list); +} + +static struct cache_extra cache_ftfw_extra = { + .size = sizeof(struct cache_ftfw), + .add = cache_ftfw_add, + .destroy = cache_ftfw_del +}; + +static int ftfw_init() +{ + tx_queue = buffer_create(CONFIG(resend_buffer_size)); + if (tx_queue == NULL) { + dlog(STATE(log), LOG_ERR, "cannot create tx buffer"); + return -1; + } + + rs_queue = buffer_create(CONFIG(resend_buffer_size)); + if (rs_queue == NULL) { + dlog(STATE(log), LOG_ERR, "cannot create rs buffer"); + return -1; + } + + INIT_LIST_HEAD(&tx_list); + INIT_LIST_HEAD(&rs_list); + + return 0; +} + +static void ftfw_kill() +{ + buffer_destroy(rs_queue); + buffer_destroy(tx_queue); +} + +static void tx_queue_add_ctlmsg(u_int32_t flags, u_int32_t from, u_int32_t to) +{ + struct nethdr_ack ack = { + .flags = flags, + .from = from, + .to = to, + }; + + buffer_add(tx_queue, &ack, NETHDR_ACK_SIZ); +} + +static int do_cache_to_tx(void *data1, void *data2) +{ + struct us_conntrack *u = data2; + struct cache_ftfw *cn = cache_get_extra(STATE_SYNC(internal), u); + + /* add to tx list */ + list_add(&cn->tx_list, &tx_list); + tx_list_len++; + + return 0; +} + +static int ftfw_local(int fd, int type, void *data) +{ + int ret = 1; + + switch(type) { + case REQUEST_DUMP: + dlog(STATE(log), LOG_NOTICE, "request resync"); + tx_queue_add_ctlmsg(NET_F_RESYNC, 0, 0); + break; + case SEND_BULK: + dlog(STATE(log), LOG_NOTICE, "sending bulk update"); + cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx); + break; + default: + ret = 0; + break; + } + + return ret; +} + +static int rs_queue_to_tx(void *data1, void *data2) +{ + struct nethdr *net = data1; + struct nethdr_ack *nack = data2; + + if (between(net->seq, nack->from, nack->to)) { + dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", + net->seq, net->flags, net->len); + buffer_add(tx_queue, net, net->len); + } + return 0; +} + +static int rs_queue_empty(void *data1, void *data2) +{ + struct nethdr *net = data1; + struct nethdr_ack *h = data2; + + if (between(net->seq, h->from, h->to)) { + dp("remove from buffer (seq=%u)\n", net->seq); + buffer_del(rs_queue, data1); + } + return 0; +} + +static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to) +{ + struct list_head *n; + struct us_conntrack *u; + + list_for_each(n, &rs_list) { + struct cache_ftfw *cn = (struct cache_ftfw *) n; + struct us_conntrack *u; + + u = cache_get_conntrack(STATE_SYNC(internal), cn); + if (between(cn->seq, from, to)) { + dp("resending nack'ed (oldseq=%u)\n", cn->seq); + list_add(&cn->tx_list, &tx_list); + tx_list_len++; + } + } +} + +static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to) +{ + struct list_head *n, *tmp; + + list_for_each_safe(n, tmp, &rs_list) { + struct cache_ftfw *cn = (struct cache_ftfw *) n; + struct us_conntrack *u; + + u = cache_get_conntrack(STATE_SYNC(internal), cn); + if (between(cn->seq, from, to)) { + dp("queue: deleting from queue (seq=%u)\n", cn->seq); + list_del(&cn->rs_list); + INIT_LIST_HEAD(&cn->rs_list); + } + } +} + +static int ftfw_recv(const struct nethdr *net) +{ + static unsigned int window = 0; + unsigned int exp_seq; + + if (window == 0) + window = CONFIG(window_size); + + if (!mcast_track_seq(net->seq, &exp_seq)) { + dp("OOS: sending nack (seq=%u)\n", exp_seq); + tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1); + window = CONFIG(window_size); + } else { + /* received a window, send an acknowledgement */ + if (--window == 0) { + dp("sending ack (seq=%u)\n", net->seq); + tx_queue_add_ctlmsg(NET_F_ACK, + net->seq - CONFIG(window_size), + net->seq); + } + } + + if (IS_NACK(net)) { + struct nethdr_ack *nack = (struct nethdr_ack *) net; + + dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to); + rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to); + buffer_iterate(rs_queue, nack, rs_queue_to_tx); + return 1; + } else if (IS_RESYNC(net)) { + dp("RESYNC ALL\n"); + cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx); + return 1; + } else if (IS_ACK(net)) { + struct nethdr_ack *h = (struct nethdr_ack *) net; + + dp("ACK: from seq=%u to seq=%u\n", h->from, h->to); + rs_list_empty(STATE_SYNC(internal), h->from, h->to); + buffer_iterate(rs_queue, h, rs_queue_empty); + return 1; + } else if (IS_ALIVE(net)) + return 1; + + return 0; +} + +static void ftfw_send(struct nethdr *net, struct us_conntrack *u) +{ + struct netpld *pld = NETHDR_DATA(net); + struct cache_ftfw *cn; + + HDR_NETWORK2HOST(net); + + switch(ntohs(pld->query)) { + case NFCT_Q_CREATE: + case NFCT_Q_UPDATE: + cn = (struct cache_ftfw *) + cache_get_extra(STATE_SYNC(internal), u); + + if (cn->rs_list.next == &cn->rs_list && + cn->rs_list.prev == &cn->rs_list) + goto insert; + + list_del(&cn->rs_list); + INIT_LIST_HEAD(&cn->rs_list); +insert: + cn->seq = net->seq; + list_add(&cn->rs_list, &rs_list); + break; + case NFCT_Q_DESTROY: + buffer_add(rs_queue, net, net->len); + break; + } +} + +static int tx_queue_xmit(void *data1, void *data2) +{ + struct nethdr *net = data1; + int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); + + dp("tx_queue sq: %u fl:%u len:%u\n", + ntohl(net->seq), ntohs(net->flags), ntohs(net->len)); + + mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); + HDR_NETWORK2HOST(net); + + if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) { + dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n", + net->seq, net->flags, net->len); + buffer_add(rs_queue, net, net->len); + } + buffer_del(tx_queue, net); + + return 0; +} + +static int tx_list_xmit(struct list_head *i, struct us_conntrack *u) +{ + int ret; + struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE); + int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); + + dp("tx_list sq: %u fl:%u len:%u\n", + ntohl(net->seq), ntohs(net->flags), + ntohs(net->len)); + + list_del(i); + INIT_LIST_HEAD(i); + tx_list_len--; + + ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); + if (STATE_SYNC(sync)->send) + STATE_SYNC(sync)->send(net, u); + + return ret; +} + +static struct alarm_list alive_alarm; + +static void do_alive_alarm(struct alarm_list *a, void *data) +{ + del_alarm(a); + tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0); +} + +static void ftfw_run(int step) +{ + struct list_head *i, *tmp; + + /* send messages in the tx_queue */ + buffer_iterate(tx_queue, NULL, tx_queue_xmit); + + /* send conntracks in the tx_list */ + list_for_each_safe(i, tmp, &tx_list) { + struct cache_ftfw *cn; + struct us_conntrack *u; + + cn = container_of(i, struct cache_ftfw, tx_list); + u = cache_get_conntrack(STATE_SYNC(internal), cn); + tx_list_xmit(i, u); + } + + if (alive_alarm.expires > 0) + mod_alarm(&alive_alarm, 1); + else { + init_alarm(&alive_alarm); + /* XXX: alive message expiration configurable */ + set_alarm_expiration(&alive_alarm, 1); + set_alarm_function(&alive_alarm, do_alive_alarm); + add_alarm(&alive_alarm); + } +} + +struct sync_mode ftfw = { + .internal_cache_flags = LIFETIME, + .external_cache_flags = LIFETIME, + .internal_cache_extra = &cache_ftfw_extra, + .init = ftfw_init, + .kill = ftfw_kill, + .local = ftfw_local, + .recv = ftfw_recv, + .send = ftfw_send, + .run = ftfw_run, +}; diff --git a/src/sync-mode.c b/src/sync-mode.c index 8a19ac5..7cd2b84 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -126,11 +126,11 @@ static int init_sync(void) } memset(state.sync, 0, sizeof(struct ct_sync_state)); - if (CONFIG(flags) & SYNC_MODE_NACK) - STATE_SYNC(sync) = &nack; + if (CONFIG(flags) & SYNC_MODE_FTFW) + STATE_SYNC(sync) = &ftfw; else - /* default to persistent mode */ - STATE_SYNC(sync) = ¬rack; + /* default to ftfw mode */ + STATE_SYNC(sync) = &ftfw; if (STATE_SYNC(sync)->init) STATE_SYNC(sync)->init(); diff --git a/src/sync-nack.c b/src/sync-nack.c deleted file mode 100644 index fa61be4..0000000 --- a/src/sync-nack.c +++ /dev/null @@ -1,366 +0,0 @@ -/* - * (C) 2006-2007 by Pablo Neira Ayuso - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include -#include "conntrackd.h" -#include "sync.h" -#include "linux_list.h" -#include "us-conntrack.h" -#include "buffer.h" -#include "debug.h" -#include "network.h" -#include "alarm.h" -#include -#include - -#if 0 -#define dp printf -#else -#define dp -#endif - -static LIST_HEAD(rs_list); -static LIST_HEAD(tx_list); -static unsigned int tx_list_len; -static struct buffer *rs_queue; -static struct buffer *tx_queue; - -struct cache_nack { - struct list_head rs_list; - struct list_head tx_list; - u_int32_t seq; -}; - -static void cache_nack_add(struct us_conntrack *u, void *data) -{ - struct cache_nack *cn = data; - INIT_LIST_HEAD(&cn->rs_list); - INIT_LIST_HEAD(&cn->tx_list); -} - -static void cache_nack_del(struct us_conntrack *u, void *data) -{ - struct cache_nack *cn = data; - - if (cn->rs_list.next == &cn->rs_list && - cn->rs_list.prev == &cn->rs_list) - return; - - list_del(&cn->rs_list); -} - -static struct cache_extra cache_nack_extra = { - .size = sizeof(struct cache_nack), - .add = cache_nack_add, - .destroy = cache_nack_del -}; - -static int nack_init() -{ - tx_queue = buffer_create(CONFIG(resend_buffer_size)); - if (tx_queue == NULL) { - dlog(STATE(log), LOG_ERR, "cannot create tx buffer"); - return -1; - } - - rs_queue = buffer_create(CONFIG(resend_buffer_size)); - if (rs_queue == NULL) { - dlog(STATE(log), LOG_ERR, "cannot create rs buffer"); - return -1; - } - - INIT_LIST_HEAD(&tx_list); - INIT_LIST_HEAD(&rs_list); - - return 0; -} - -static void nack_kill() -{ - buffer_destroy(rs_queue); - buffer_destroy(tx_queue); -} - -static void tx_queue_add_ctlmsg(u_int32_t flags, u_int32_t from, u_int32_t to) -{ - struct nethdr_ack ack = { - .flags = flags, - .from = from, - .to = to, - }; - - buffer_add(tx_queue, &ack, NETHDR_ACK_SIZ); -} - -static int do_cache_to_tx(void *data1, void *data2) -{ - struct us_conntrack *u = data2; - struct cache_nack *cn = cache_get_extra(STATE_SYNC(internal), u); - - /* add to tx list */ - list_add(&cn->tx_list, &tx_list); - tx_list_len++; - - return 0; -} - -static int nack_local(int fd, int type, void *data) -{ - int ret = 1; - - switch(type) { - case REQUEST_DUMP: - dlog(STATE(log), LOG_NOTICE, "request resync"); - tx_queue_add_ctlmsg(NET_F_RESYNC, 0, 0); - break; - case SEND_BULK: - dlog(STATE(log), LOG_NOTICE, "sending bulk update"); - cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx); - break; - default: - ret = 0; - break; - } - - return ret; -} - -static int rs_queue_to_tx(void *data1, void *data2) -{ - struct nethdr *net = data1; - struct nethdr_ack *nack = data2; - - if (between(net->seq, nack->from, nack->to)) { - dp("rs_queue_to_tx sq: %u fl:%u len:%u\n", - net->seq, net->flags, net->len); - buffer_add(tx_queue, net, net->len); - } - return 0; -} - -static int rs_queue_empty(void *data1, void *data2) -{ - struct nethdr *net = data1; - struct nethdr_ack *h = data2; - - if (between(net->seq, h->from, h->to)) { - dp("remove from buffer (seq=%u)\n", net->seq); - buffer_del(rs_queue, data1); - } - return 0; -} - -static void rs_list_to_tx(struct cache *c, unsigned int from, unsigned int to) -{ - struct list_head *n; - struct us_conntrack *u; - - list_for_each(n, &rs_list) { - struct cache_nack *cn = (struct cache_nack *) n; - struct us_conntrack *u; - - u = cache_get_conntrack(STATE_SYNC(internal), cn); - if (between(cn->seq, from, to)) { - dp("resending nack'ed (oldseq=%u)\n", cn->seq); - list_add(&cn->tx_list, &tx_list); - tx_list_len++; - } - } -} - -static void rs_list_empty(struct cache *c, unsigned int from, unsigned int to) -{ - struct list_head *n, *tmp; - - list_for_each_safe(n, tmp, &rs_list) { - struct cache_nack *cn = (struct cache_nack *) n; - struct us_conntrack *u; - - u = cache_get_conntrack(STATE_SYNC(internal), cn); - if (between(cn->seq, from, to)) { - dp("queue: deleting from queue (seq=%u)\n", cn->seq); - list_del(&cn->rs_list); - INIT_LIST_HEAD(&cn->rs_list); - } - } -} - -static int nack_recv(const struct nethdr *net) -{ - static unsigned int window = 0; - unsigned int exp_seq; - - if (window == 0) - window = CONFIG(window_size); - - if (!mcast_track_seq(net->seq, &exp_seq)) { - dp("OOS: sending nack (seq=%u)\n", exp_seq); - tx_queue_add_ctlmsg(NET_F_NACK, exp_seq, net->seq-1); - window = CONFIG(window_size); - } else { - /* received a window, send an acknowledgement */ - if (--window == 0) { - dp("sending ack (seq=%u)\n", net->seq); - tx_queue_add_ctlmsg(NET_F_ACK, - net->seq - CONFIG(window_size), - net->seq); - } - } - - if (IS_NACK(net)) { - struct nethdr_ack *nack = (struct nethdr_ack *) net; - - dp("NACK: from seq=%u to seq=%u\n", nack->from, nack->to); - rs_list_to_tx(STATE_SYNC(internal), nack->from, nack->to); - buffer_iterate(rs_queue, nack, rs_queue_to_tx); - return 1; - } else if (IS_RESYNC(net)) { - dp("RESYNC ALL\n"); - cache_iterate(STATE_SYNC(internal), NULL, do_cache_to_tx); - return 1; - } else if (IS_ACK(net)) { - struct nethdr_ack *h = (struct nethdr_ack *) net; - - dp("ACK: from seq=%u to seq=%u\n", h->from, h->to); - rs_list_empty(STATE_SYNC(internal), h->from, h->to); - buffer_iterate(rs_queue, h, rs_queue_empty); - return 1; - } else if (IS_ALIVE(net)) - return 1; - - return 0; -} - -static void nack_send(struct nethdr *net, struct us_conntrack *u) -{ - struct netpld *pld = NETHDR_DATA(net); - struct cache_nack *cn; - - HDR_NETWORK2HOST(net); - - switch(ntohs(pld->query)) { - case NFCT_Q_CREATE: - case NFCT_Q_UPDATE: - cn = (struct cache_nack *) - cache_get_extra(STATE_SYNC(internal), u); - - if (cn->rs_list.next == &cn->rs_list && - cn->rs_list.prev == &cn->rs_list) - goto insert; - - list_del(&cn->rs_list); - INIT_LIST_HEAD(&cn->rs_list); -insert: - cn->seq = net->seq; - list_add(&cn->rs_list, &rs_list); - break; - case NFCT_Q_DESTROY: - buffer_add(rs_queue, net, net->len); - break; - } -} - -static int tx_queue_xmit(void *data1, void *data2) -{ - struct nethdr *net = data1; - int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); - - dp("tx_queue sq: %u fl:%u len:%u\n", - ntohl(net->seq), ntohs(net->flags), ntohs(net->len)); - - mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); - HDR_NETWORK2HOST(net); - - if (IS_DATA(net) || IS_ACK(net) || IS_NACK(net)) { - dp("-> back_to_tx_queue sq: %u fl:%u len:%u\n", - net->seq, net->flags, net->len); - buffer_add(rs_queue, net, net->len); - } - buffer_del(tx_queue, net); - - return 0; -} - -static int tx_list_xmit(struct list_head *i, struct us_conntrack *u) -{ - int ret; - struct nethdr *net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE); - int len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); - - dp("tx_list sq: %u fl:%u len:%u\n", - ntohl(net->seq), ntohs(net->flags), - ntohs(net->len)); - - list_del(i); - INIT_LIST_HEAD(i); - tx_list_len--; - - ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); - if (STATE_SYNC(sync)->send) - STATE_SYNC(sync)->send(net, u); - - return ret; -} - -static struct alarm_list alive_alarm; - -static void do_alive_alarm(struct alarm_list *a, void *data) -{ - del_alarm(a); - tx_queue_add_ctlmsg(NET_F_ALIVE, 0, 0); -} - -static void nack_run(int step) -{ - struct list_head *i, *tmp; - - /* send messages in the tx_queue */ - buffer_iterate(tx_queue, NULL, tx_queue_xmit); - - /* send conntracks in the tx_list */ - list_for_each_safe(i, tmp, &tx_list) { - struct cache_nack *cn; - struct us_conntrack *u; - - cn = container_of(i, struct cache_nack, tx_list); - u = cache_get_conntrack(STATE_SYNC(internal), cn); - tx_list_xmit(i, u); - } - - if (alive_alarm.expires > 0) - mod_alarm(&alive_alarm, 1); - else { - init_alarm(&alive_alarm); - /* XXX: alive message expiration configurable */ - set_alarm_expiration(&alive_alarm, 1); - set_alarm_function(&alive_alarm, do_alive_alarm); - add_alarm(&alive_alarm); - } -} - -struct sync_mode nack = { - .internal_cache_flags = LIFETIME, - .external_cache_flags = LIFETIME, - .internal_cache_extra = &cache_nack_extra, - .init = nack_init, - .kill = nack_kill, - .local = nack_local, - .recv = nack_recv, - .send = nack_send, - .run = nack_run, -}; diff --git a/src/sync-notrack.c b/src/sync-notrack.c deleted file mode 100644 index 8588ecf..0000000 --- a/src/sync-notrack.c +++ /dev/null @@ -1,104 +0,0 @@ -/* - * (C) 2006-2007 by Pablo Neira Ayuso - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include "conntrackd.h" -#include "sync.h" -#include "network.h" -#include "us-conntrack.h" -#include "alarm.h" - -static void refresher(struct alarm_list *a, void *data) -{ - int len; - struct nethdr *net; - struct us_conntrack *u = data; - - debug_ct(u->ct, "persistence update"); - - a->expires = random() % CONFIG(refresh) + 1; - net = BUILD_NETMSG(u->ct, NFCT_Q_UPDATE); - len = prepare_send_netmsg(STATE_SYNC(mcast_client), net); - mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net, len); -} - -static void cache_notrack_add(struct us_conntrack *u, void *data) -{ - struct alarm_list *alarm = data; - - init_alarm(alarm); - set_alarm_expiration(alarm, (random() % conf.refresh) + 1); - set_alarm_data(alarm, u); - set_alarm_function(alarm, refresher); - add_alarm(alarm); -} - -static void cache_notrack_update(struct us_conntrack *u, void *data) -{ - struct alarm_list *alarm = data; - mod_alarm(alarm, (random() % conf.refresh) + 1); -} - -static void cache_notrack_destroy(struct us_conntrack *u, void *data) -{ - struct alarm_list *alarm = data; - del_alarm(alarm); -} - -static struct cache_extra cache_notrack_extra = { - .size = sizeof(struct alarm_list), - .add = cache_notrack_add, - .update = cache_notrack_update, - .destroy = cache_notrack_destroy -}; - -static int notrack_recv(const struct nethdr *net) -{ - unsigned int exp_seq; - - /* - * Ignore error messages: Although this message type is not ever - * generated in notrack mode, we don't want to crash the daemon - * if someone nuts mixes nack and notrack. - */ - if (net->flags) - return 1; - - /* - * Multicast sequence tracking: we keep track of multicast messages - * although we don't do any explicit message recovery. So, why do - * we do sequence tracking? Just to let know the sysadmin. - * - * Let t be 1 < t < RefreshTime. To ensure consistency, conntrackd - * retransmit every t seconds a message with the state of a certain - * entry even if such entry did not change. This mechanism also - * provides passive resynchronization, in other words, there is - * no facility to request a full synchronization from new nodes that - * just joined the cluster, instead they just get resynchronized in - * RefreshTime seconds at worst case. - */ - mcast_track_seq(net->seq, &exp_seq); - - return 0; -} - -struct sync_mode notrack = { - .internal_cache_flags = LIFETIME, - .external_cache_flags = TIMER | LIFETIME, - .internal_cache_extra = &cache_notrack_extra, - .recv = notrack_recv, -}; -- cgit v1.2.3