summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/sync/ftfw/conntrackd.conf15
-rw-r--r--include/queue.h1
-rw-r--r--src/queue.c7
-rw-r--r--src/read_config_lex.l1
-rw-r--r--src/read_config_yy.y13
-rw-r--r--src/sync-ftfw.c44
6 files changed, 66 insertions, 15 deletions
diff --git a/doc/sync/ftfw/conntrackd.conf b/doc/sync/ftfw/conntrackd.conf
index 77ef76c..4fd86d7 100644
--- a/doc/sync/ftfw/conntrackd.conf
+++ b/doc/sync/ftfw/conntrackd.conf
@@ -4,14 +4,15 @@
Sync {
Mode FTFW {
#
- # Size of the buffer that hold control messages for
- # possible resends (in bytes). Under message omission,
- # this size determines the length of the history window
- # of control message. Control messages are 16 bytes long,
- # so that we keep a history of 262144/16 = 16384 control
- # messages.
+ # Size of the resend queue (in objects). This is the maximum
+ # number of objects that can be stored waiting to be confirmed
+ # via acknoledgment. If you keep this value low, the daemon
+ # will have less chances to recover state-changes under message
+ # omission. On the other hand, if you keep this value high,
+ # the daemon will consume more memory to store dead objects.
+ # Default is 131072 objects.
#
- ResendBufferSize 262144
+ # ResendQueueSize 131072
#
# Entries committed to the connection tracking table
diff --git a/include/queue.h b/include/queue.h
index ef56323..9213b3d 100644
--- a/include/queue.h
+++ b/include/queue.h
@@ -44,6 +44,7 @@ void queue_destroy(struct queue *b);
unsigned int queue_len(const struct queue *b);
int queue_add(struct queue *b, struct queue_node *n);
int queue_del(struct queue_node *n);
+struct queue_node *queue_del_head(struct queue *b);
int queue_in(struct queue *b, struct queue_node *n);
void queue_iterate(struct queue *b,
const void *data,
diff --git a/src/queue.c b/src/queue.c
index cffcc93..7b36dc6 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -113,6 +113,13 @@ int queue_del(struct queue_node *n)
return 1;
}
+struct queue_node *queue_del_head(struct queue *b)
+{
+ struct queue_node *n = (struct queue_node *) b->head.next;
+ queue_del(n);
+ return n;
+}
+
int queue_in(struct queue *b, struct queue_node *n)
{
return b == n->owner;
diff --git a/src/read_config_lex.l b/src/read_config_lex.l
index 4953974..9bc4c18 100644
--- a/src/read_config_lex.l
+++ b/src/read_config_lex.l
@@ -88,6 +88,7 @@ notrack [N|n][O|o][T|t][R|r][A|a][C|c][K|k]
"ListenTo" { return T_LISTEN_TO; }
"Family" { return T_FAMILY; }
"ResendBufferSize" { return T_RESEND_BUFFER_SIZE; }
+"ResendQueueSize" { return T_RESEND_QUEUE_SIZE; }
"Checksum" { return T_CHECKSUM; }
"ACKWindowSize" { return T_WINDOWSIZE; }
"Replicate" { return T_REPLICATE; }
diff --git a/src/read_config_yy.y b/src/read_config_yy.y
index ce604d9..97aa178 100644
--- a/src/read_config_yy.y
+++ b/src/read_config_yy.y
@@ -54,7 +54,7 @@ static void __max_mcast_dedicated_links_reached(void);
%token T_GENERAL T_SYNC T_STATS T_RELAX_TRANSITIONS T_BUFFER_SIZE T_DELAY
%token T_SYNC_MODE T_LISTEN_TO T_FAMILY T_RESEND_BUFFER_SIZE
%token T_ALARM T_FTFW T_CHECKSUM T_WINDOWSIZE T_ON T_OFF
-%token T_REPLICATE T_FOR T_IFACE T_PURGE
+%token T_REPLICATE T_FOR T_IFACE T_PURGE T_RESEND_QUEUE_SIZE
%token T_ESTABLISHED T_SYN_SENT T_SYN_RECV T_FIN_WAIT
%token T_CLOSE_WAIT T_LAST_ACK T_TIME_WAIT T_CLOSE T_LISTEN
%token T_SYSLOG T_WRITE_THROUGH T_STAT_BUFFER_SIZE T_DESTROY_TIMEOUT
@@ -525,6 +525,7 @@ sync_mode_ftfw_list:
| sync_mode_ftfw_list sync_mode_ftfw_line;
sync_mode_ftfw_line: resend_queue_size
+ | resend_buffer_size
| timeout
| purge
| window_size
@@ -537,7 +538,13 @@ sync_mode_notrack_line: timeout
| purge
;
-resend_queue_size: T_RESEND_BUFFER_SIZE T_NUMBER
+resend_buffer_size: T_RESEND_BUFFER_SIZE T_NUMBER
+{
+ fprintf(stderr, "WARNING: `ResendBufferSize' is deprecated. "
+ "Use `ResendQueueSize' instead\n");
+};
+
+resend_queue_size: T_RESEND_QUEUE_SIZE T_NUMBER
{
conf.resend_queue_size = $2;
};
@@ -1146,7 +1153,7 @@ init_config(char *filename)
CONFIG(refresh) = 60;
if (CONFIG(resend_queue_size) == 0)
- CONFIG(resend_queue_size) = 262144;
+ CONFIG(resend_queue_size) = 131072;
/* default to a window size of 300 packets */
if (CONFIG(window_size) == 0)
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c
index 0d49756..493c15f 100644
--- a/src/sync-ftfw.c
+++ b/src/sync-ftfw.c
@@ -27,6 +27,7 @@
#include "fds.h"
#include <string.h>
+#include <errno.h>
#if 0
#define dp printf
@@ -143,7 +144,7 @@ static void do_alive_alarm(struct alarm_block *a, void *data)
static int ftfw_init(void)
{
- rs_queue = queue_create(INT_MAX, 0);
+ rs_queue = queue_create(CONFIG(resend_queue_size), 0);
if (rs_queue == NULL) {
dlog(LOG_ERR, "cannot create rs queue");
return -1;
@@ -451,6 +452,29 @@ out:
return ret;
}
+static void rs_queue_purge_full(void)
+{
+ struct queue_node *n;
+
+ n = queue_del_head(rs_queue);
+ switch(n->type) {
+ case Q_ELEM_CTL: {
+ struct queue_object *qobj = (struct queue_object *)n;
+ queue_object_free(qobj);
+ break;
+ }
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn;
+ struct cache_object *obj;
+
+ cn = (struct cache_ftfw *)n;
+ obj = cache_data_get_object(STATE_SYNC(internal), cn);
+ cache_object_put(obj);
+ break;
+ }
+ }
+}
+
static int tx_queue_xmit(struct queue_node *n, const void *data)
{
queue_del(n);
@@ -474,9 +498,14 @@ static int tx_queue_xmit(struct queue_node *n, const void *data)
mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
HDR_NETWORK2HOST(net);
- if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net))
- queue_add(rs_queue, n);
- else
+ if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
+ if (queue_add(rs_queue, n) < 0) {
+ if (errno == ENOSPC) {
+ rs_queue_purge_full();
+ queue_add(rs_queue, n);
+ }
+ }
+ } else
queue_object_free((struct queue_object *)n);
break;
}
@@ -497,7 +526,12 @@ static int tx_queue_xmit(struct queue_node *n, const void *data)
mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
cn->seq = ntohl(net->seq);
- queue_add(rs_queue, &cn->qnode);
+ if (queue_add(rs_queue, &cn->qnode) < 0) {
+ if (errno == ENOSPC) {
+ rs_queue_purge_full();
+ queue_add(rs_queue, &cn->qnode);
+ }
+ }
/* we release the object once we get the acknowlegment */
break;
}