summaryrefslogtreecommitdiffstats
path: root/src/sync-notrack.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-notrack.c')
-rw-r--r--src/sync-notrack.c114
1 files changed, 55 insertions, 59 deletions
diff --git a/src/sync-notrack.c b/src/sync-notrack.c
index 2d3783e..40cc199 100644
--- a/src/sync-notrack.c
+++ b/src/sync-notrack.c
@@ -23,32 +23,26 @@
#include "network.h"
#include "log.h"
#include "cache.h"
-#include "event.h"
+#include "fds.h"
#include <string.h>
-static LIST_HEAD(tx_list);
-static unsigned int tx_list_len;
static struct queue *tx_queue;
struct cache_notrack {
- struct list_head tx_list;
+ struct queue_node qnode;
};
static void cache_notrack_add(struct cache_object *obj, void *data)
{
struct cache_notrack *cn = data;
- INIT_LIST_HEAD(&cn->tx_list);
+ queue_node_init(&cn->qnode, Q_ELEM_OBJ);
}
static void cache_notrack_del(struct cache_object *obj, void *data)
{
struct cache_notrack *cn = data;
-
- if (!list_empty(&cn->tx_list)) {
- list_del(&cn->tx_list);
- tx_list_len--;
- }
+ queue_del(&cn->qnode);
}
static struct cache_extra cache_notrack_extra = {
@@ -59,20 +53,25 @@ static struct cache_extra cache_notrack_extra = {
static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
{
- struct nethdr_ack ack = {
- .type = NET_T_CTL,
- .flags = flags,
- .from = from,
- .to = to,
- };
-
- queue_add(tx_queue, &ack, NETHDR_ACK_SIZ);
- write_evfd(STATE_SYNC(evfd));
+ struct queue_object *qobj;
+ struct nethdr_ack *ack;
+
+ qobj = queue_object_new(Q_ELEM_CTL, sizeof(struct nethdr_ack));
+ if (qobj == NULL)
+ return;
+
+ ack = (struct nethdr_ack *)qobj->data;
+ ack->type = NET_T_CTL;
+ ack->flags = flags;
+ ack->from = from;
+ ack->to = to;
+
+ queue_add(tx_queue, &qobj->qnode);
}
static int notrack_init(void)
{
- tx_queue = queue_create(~0U);
+ tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD);
if (tx_queue == NULL) {
dlog(LOG_ERR, "cannot create tx queue");
return -1;
@@ -90,16 +89,7 @@ static int do_cache_to_tx(void *data1, void *data2)
{
struct cache_object *obj = data2;
struct cache_notrack *cn = cache_get_extra(STATE_SYNC(internal), obj);
-
- if (!list_empty(&cn->tx_list))
- return 0;
-
- /* add to tx list */
- list_add_tail(&cn->tx_list, &tx_list);
- tx_list_len++;
-
- write_evfd(STATE_SYNC(evfd));
-
+ queue_add(tx_queue, &cn->qnode);
return 0;
}
@@ -152,44 +142,49 @@ static int notrack_recv(const struct nethdr *net)
return ret;
}
-static int tx_queue_xmit(void *data1, const void *data2)
+static int tx_queue_xmit(struct queue_node *n, const void *data2)
{
- struct nethdr *net = data1;
- nethdr_set_ack(net);
- HDR_HOST2NETWORK(net);
- mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
- queue_del(tx_queue, net);
+ switch (n->type) {
+ case Q_ELEM_CTL: {
+ struct nethdr *net = queue_node_data(n);
+ if (IS_RESYNC(net))
+ nethdr_set_ack(net);
+ else
+ nethdr_set_ctl(net);
+ HDR_HOST2NETWORK(net);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ queue_del(n);
+ queue_object_free((struct queue_object *)n);
+ break;
+ }
+ case Q_ELEM_OBJ: {
+ struct cache_ftfw *cn;
+ struct cache_object *obj;
+ int type;
+ struct nethdr *net;
+
+ cn = (struct cache_ftfw *)n;
+ obj = cache_data_get_object(STATE_SYNC(internal), cn);
+ type = object_status_to_network_type(obj->status);;
+ net = BUILD_NETMSG(obj->ct, type);
+ mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
+ queue_del(n);
+ break;
+ }
+ }
return 0;
}
-static int tx_list_xmit(struct list_head *i, struct cache_object *obj, int type)
+static void notrack_run(fd_set *readfds)
{
- int ret;
- struct nethdr *net = BUILD_NETMSG(obj->ct, type);
-
- list_del_init(i);
- tx_list_len--;
-
- ret = mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net);
-
- return ret;
+ if (FD_ISSET(queue_get_eventfd(tx_queue), readfds))
+ queue_iterate(tx_queue, NULL, tx_queue_xmit);
}
-static void notrack_run(void)
+static int notrack_register_fds(struct fds *fds)
{
- struct cache_notrack *cn, *tmp;
-
- /* send messages in the tx_queue */
- queue_iterate(tx_queue, NULL, tx_queue_xmit);
-
- /* send conntracks in the tx_list */
- list_for_each_entry_safe(cn, tmp, &tx_list, tx_list) {
- struct cache_object *obj;
-
- obj = cache_data_get_object(STATE_SYNC(internal), cn);
- tx_list_xmit(&cn->tx_list, obj, NET_T_STATE_UPD);
- }
+ return register_fd(queue_get_eventfd(tx_queue), fds);
}
struct sync_mode sync_notrack = {
@@ -201,4 +196,5 @@ struct sync_mode sync_notrack = {
.local = notrack_local,
.recv = notrack_recv,
.run = notrack_run,
+ .register_fds = notrack_register_fds,
};