summaryrefslogtreecommitdiffstats
path: root/src/sync-alarm.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-alarm.c')
-rw-r--r--src/sync-alarm.c75
1 files changed, 64 insertions, 11 deletions
diff --git a/src/sync-alarm.c b/src/sync-alarm.c
index 34937fe..a2f17ac 100644
--- a/src/sync-alarm.c
+++ b/src/sync-alarm.c
@@ -21,14 +21,21 @@
#include "network.h"
#include "alarm.h"
#include "cache.h"
+#include "queue.h"
#include "debug.h"
#include <stdlib.h>
#include <string.h>
+struct cache_alarm {
+ struct queue_node qnode;
+ struct alarm_block alarm;
+};
+
+static void alarm_enqueue(struct cache_object *obj, int query);
+
static void refresher(struct alarm_block *a, void *data)
{
- struct nethdr *net;
struct cache_object *obj = data;
debug_ct(obj->ct, "persistence update");
@@ -37,36 +44,37 @@ static void refresher(struct alarm_block *a, void *data)
random() % CONFIG(refresh) + 1,
((random() % 5 + 1) * 200000) - 1);
- net = BUILD_NETMSG(obj->ct, NET_T_STATE_UPD);
- mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ alarm_enqueue(obj, NET_T_STATE_UPD);
}
static void cache_alarm_add(struct cache_object *obj, void *data)
{
- struct alarm_block *a = data;
+ struct cache_alarm *ca = data;
- init_alarm(a, obj, refresher);
- add_alarm(a,
+ queue_node_init(&ca->qnode, Q_ELEM_OBJ);
+ init_alarm(&ca->alarm, obj, refresher);
+ add_alarm(&ca->alarm,
random() % CONFIG(refresh) + 1,
((random() % 5 + 1) * 200000) - 1);
}
static void cache_alarm_update(struct cache_object *obj, void *data)
{
- struct alarm_block *a = data;
- add_alarm(a,
+ struct cache_alarm *ca = data;
+ add_alarm(&ca->alarm,
random() % CONFIG(refresh) + 1,
((random() % 5 + 1) * 200000) - 1);
}
static void cache_alarm_destroy(struct cache_object *obj, void *data)
{
- struct alarm_block *a = data;
- del_alarm(a);
+ struct cache_alarm *ca = data;
+ queue_del(&ca->qnode);
+ del_alarm(&ca->alarm);
}
static struct cache_extra cache_alarm_extra = {
- .size = sizeof(struct alarm_block),
+ .size = sizeof(struct cache_alarm),
.add = cache_alarm_add,
.update = cache_alarm_update,
.destroy = cache_alarm_destroy
@@ -102,9 +110,54 @@ static int alarm_recv(const struct nethdr *net)
return 0;
}
+static void alarm_enqueue(struct cache_object *obj, int query)
+{
+ struct cache_alarm *ca = cache_get_extra(STATE_SYNC(internal), obj);
+ if (queue_add(STATE_SYNC(tx_queue), &ca->qnode))
+ cache_object_get(obj);
+}
+
+static int tx_queue_xmit(struct queue_node *n, const void *data)
+{
+ struct nethdr *net;
+
+ queue_del(n);
+
+ switch(n->type) {
+ case Q_ELEM_CTL:
+ net = queue_node_data(n);
+ nethdr_set_ctl(net);
+ HDR_HOST2NETWORK(net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ queue_object_free((struct queue_object *)n);
+ break;
+ case Q_ELEM_OBJ: {
+ struct cache_alarm *ca;
+ struct cache_object *obj;
+ int type;
+
+ ca = (struct cache_alarm *)n;
+ obj = cache_data_get_object(STATE_SYNC(internal), ca);
+ type = object_status_to_network_type(obj->status);
+ net = BUILD_NETMSG(obj->ct, type);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ cache_object_put(obj);
+ break;
+ }
+ }
+ return 0;
+}
+
+static void alarm_xmit(void)
+{
+ queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit);
+}
+
struct sync_mode sync_alarm = {
.internal_cache_flags = LIFETIME,
.external_cache_flags = TIMER | LIFETIME,
.internal_cache_extra = &cache_alarm_extra,
.recv = alarm_recv,
+ .enqueue = alarm_enqueue,
+ .xmit = alarm_xmit,
};