summaryrefslogtreecommitdiffstats
path: root/src/sync-ftfw.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-ftfw.c')
-rw-r--r--src/sync-ftfw.c44
1 files changed, 39 insertions, 5 deletions
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c
index 0d49756..493c15f 100644
--- a/src/sync-ftfw.c
+++ b/src/sync-ftfw.c
@@ -27,6 +27,7 @@
#include "fds.h"
#include <string.h>
+#include <errno.h>
#if 0
#define dp printf
@@ -143,7 +144,7 @@ static void do_alive_alarm(struct alarm_block *a, void *data)
static int ftfw_init(void)
{
- rs_queue = queue_create(INT_MAX, 0);
+ rs_queue = queue_create(CONFIG(resend_queue_size), 0);
if (rs_queue == NULL) {
dlog(LOG_ERR, "cannot create rs queue");
return -1;
@@ -451,6 +452,29 @@ out:
return ret;
}
+static void rs_queue_purge_full(void)
+{
+ struct queue_node *n;
+
+ n = queue_del_head(rs_queue);
+ switch(n->type) {
+ case Q_ELEM_CTL: {
+ struct queue_object *qobj = (struct queue_object *)n;
+ queue_object_free(qobj);
+ break;
+ }
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn;
+ struct cache_object *obj;
+
+ cn = (struct cache_ftfw *)n;
+ obj = cache_data_get_object(STATE_SYNC(internal), cn);
+ cache_object_put(obj);
+ break;
+ }
+ }
+}
+
static int tx_queue_xmit(struct queue_node *n, const void *data)
{
queue_del(n);
@@ -474,9 +498,14 @@ static int tx_queue_xmit(struct queue_node *n, const void *data)
mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
HDR_NETWORK2HOST(net);
- if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net))
- queue_add(rs_queue, n);
- else
+ if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
+ if (queue_add(rs_queue, n) < 0) {
+ if (errno == ENOSPC) {
+ rs_queue_purge_full();
+ queue_add(rs_queue, n);
+ }
+ }
+ } else
queue_object_free((struct queue_object *)n);
break;
}
@@ -497,7 +526,12 @@ static int tx_queue_xmit(struct queue_node *n, const void *data)
mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
cn->seq = ntohl(net->seq);
- queue_add(rs_queue, &cn->qnode);
+ if (queue_add(rs_queue, &cn->qnode) < 0) {
+ if (errno == ENOSPC) {
+ rs_queue_purge_full();
+ queue_add(rs_queue, &cn->qnode);
+ }
+ }
/* we release the object once we get the acknowlegment */
break;
}