From cf3be894fcb95adb360425c8482954522e9110d2 Mon Sep 17 00:00:00 2001 From: Pablo Neira Ayuso Date: Sun, 23 Aug 2009 12:11:20 +0200 Subject: conntrackd: add support state-replication based on TCP This patch adds support for TCP as protocol to replicate state-changes between two daemons. Note that this only makes sense with the notrack mode. Signed-off-by: Pablo Neira Ayuso --- doc/sync/notrack/conntrackd.conf | 3 +- include/Makefile.am | 2 +- include/channel.h | 18 +- include/mcast.h | 1 + include/tcp.h | 75 +++++++ include/udp.h | 1 + src/Makefile.am | 1 + src/channel.c | 17 ++ src/channel_mcast.c | 15 ++ src/channel_tcp.c | 149 +++++++++++++ src/channel_udp.c | 15 ++ src/mcast.c | 5 + src/read_config_lex.l | 1 + src/read_config_yy.y | 158 +++++++++++++- src/sync-mode.c | 65 ++++-- src/tcp.c | 440 +++++++++++++++++++++++++++++++++++++++ src/udp.c | 5 + 17 files changed, 954 insertions(+), 17 deletions(-) create mode 100644 include/tcp.h create mode 100644 src/channel_tcp.c create mode 100644 src/tcp.c diff --git a/doc/sync/notrack/conntrackd.conf b/doc/sync/notrack/conntrackd.conf index 9cdb2c7..25c4e7f 100644 --- a/doc/sync/notrack/conntrackd.conf +++ b/doc/sync/notrack/conntrackd.conf @@ -122,7 +122,8 @@ Sync { # # You can use Unicast UDP instead of Multicast to propagate events. # Note that you cannot use unicast UDP and Multicast at the same - # time, you can only select one. + # time, you can only select one. You can also select TCP in notrack + # mode. # # UDP { # diff --git a/include/Makefile.am b/include/Makefile.am index 844c5b8..a89490e 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -1,6 +1,6 @@ noinst_HEADERS = alarm.h jhash.h cache.h linux_list.h linux_rbtree.h \ - sync.h conntrackd.h local.h udp.h \ + sync.h conntrackd.h local.h udp.h tcp.h \ debug.h log.h hash.h mcast.h conntrack.h \ network.h filter.h queue.h vector.h cidr.h \ traffic_stats.h netlink.h fds.h event.h bitops.h channel.h \ diff --git a/include/channel.h b/include/channel.h index 1d3c48c..98605d9 100644 --- a/include/channel.h +++ b/include/channel.h @@ -3,6 +3,7 @@ #include "mcast.h" #include "udp.h" +#include "tcp.h" struct channel; struct nethdr; @@ -11,6 +12,7 @@ enum { CHANNEL_NONE, CHANNEL_MCAST, CHANNEL_UDP, + CHANNEL_TCP, CHANNEL_MAX, }; @@ -24,13 +26,20 @@ struct udp_channel { struct udp_sock *server; }; +struct tcp_channel { + struct tcp_sock *client; + struct tcp_sock *server; +}; + #define CHANNEL_F_DEFAULT (1 << 0) #define CHANNEL_F_BUFFERED (1 << 1) -#define CHANNEL_F_MAX (1 << 2) +#define CHANNEL_F_STREAM (1 << 2) +#define CHANNEL_F_MAX (1 << 3) union channel_type_conf { struct mcast_conf mcast; struct udp_conf udp; + struct tcp_conf tcp; }; struct channel_conf { @@ -47,7 +56,10 @@ struct channel_ops { void (*close)(void *channel); int (*send)(void *channel, const void *data, int len); int (*recv)(void *channel, char *buf, int len); + int (*accept)(struct channel *c); int (*get_fd)(void *channel); + int (*isset)(struct channel *c, fd_set *readfds); + int (*accept_isset)(struct channel *c, fd_set *readfds); void (*stats)(struct channel *c, int fd); void (*stats_extended)(struct channel *c, int active, struct nlif_handle *h, int fd); @@ -72,8 +84,12 @@ void channel_close(struct channel *c); int channel_send(struct channel *c, const struct nethdr *net); int channel_send_flush(struct channel *c); int channel_recv(struct channel *c, char *buf, int size); +int channel_accept(struct channel *c); int channel_get_fd(struct channel *c); +int channel_accept_isset(struct channel *c, fd_set *readfds); +int channel_isset(struct channel *c, fd_set *readfds); + void channel_stats(struct channel *c, int fd); void channel_stats_extended(struct channel *c, int active, struct nlif_handle *h, int fd); diff --git a/include/mcast.h b/include/mcast.h index 38c77f9..402a033 100644 --- a/include/mcast.h +++ b/include/mcast.h @@ -48,6 +48,7 @@ ssize_t mcast_send(struct mcast_sock *m, const void *data, int size); ssize_t mcast_recv(struct mcast_sock *m, void *data, int size); int mcast_get_fd(struct mcast_sock *m); +int mcast_isset(struct mcast_sock *m, fd_set *readfds); int mcast_snprintf_stats(char *buf, size_t buflen, char *ifname, struct mcast_stats *s, struct mcast_stats *r); diff --git a/include/tcp.h b/include/tcp.h new file mode 100644 index 0000000..1b1d391 --- /dev/null +++ b/include/tcp.h @@ -0,0 +1,75 @@ +#ifndef _TCP_H_ +#define _TCP_H_ + +#include +#include + +struct tcp_conf { + int ipproto; + int reuseaddr; + int checksum; + unsigned short port; + union { + struct { + struct in_addr inet_addr; + } ipv4; + struct { + struct in6_addr inet_addr6; + int scope_id; + } ipv6; + } server; + union { + struct in_addr inet_addr; + struct in6_addr inet_addr6; + } client; + int sndbuf; + int rcvbuf; +}; + +struct tcp_stats { + uint64_t bytes; + uint64_t messages; + uint64_t error; +}; + +enum tcp_sock_state { + TCP_SERVER_ACCEPTING, + TCP_SERVER_CONNECTED, + TCP_CLIENT_DISCONNECTED, + TCP_CLIENT_CONNECTED +}; + +struct tcp_sock { + int state; /* enum tcp_sock_state */ + int fd; + int client_fd; /* only for the server side */ + union { + struct sockaddr_in ipv4; + struct sockaddr_in6 ipv6; + } addr; + socklen_t sockaddr_len; + struct tcp_stats stats; +}; + +struct tcp_sock *tcp_server_create(struct tcp_conf *conf); +void tcp_server_destroy(struct tcp_sock *m); + +struct tcp_sock *tcp_client_create(struct tcp_conf *conf); +void tcp_client_destroy(struct tcp_sock *m); + +ssize_t tcp_send(struct tcp_sock *m, const void *data, int size); +ssize_t tcp_recv(struct tcp_sock *m, void *data, int size); +int tcp_accept(struct tcp_sock *m); + +int tcp_get_fd(struct tcp_sock *m); +int tcp_isset(struct tcp_sock *m, fd_set *readfds); +int tcp_accept_isset(struct tcp_sock *m, fd_set *readfds); + +int tcp_snprintf_stats(char *buf, size_t buflen, char *ifname, + struct tcp_sock *s, struct tcp_sock *r); + +int tcp_snprintf_stats2(char *buf, size_t buflen, const char *ifname, + const char *status, int active, + struct tcp_stats *s, struct tcp_stats *r); + +#endif diff --git a/include/udp.h b/include/udp.h index 6c659b9..9f9c17a 100644 --- a/include/udp.h +++ b/include/udp.h @@ -52,6 +52,7 @@ ssize_t udp_send(struct udp_sock *m, const void *data, int size); ssize_t udp_recv(struct udp_sock *m, void *data, int size); int udp_get_fd(struct udp_sock *m); +int udp_isset(struct udp_sock *m, fd_set *readfds); int udp_snprintf_stats(char *buf, size_t buflen, char *ifname, struct udp_stats *s, struct udp_stats *r); diff --git a/src/Makefile.am b/src/Makefile.am index e969f4d..8b36642 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -20,6 +20,7 @@ conntrackd_SOURCES = alarm.c main.c run.c hash.c queue.c rbtree.c \ network.c cidr.c \ build.c parse.c \ channel.c multichannel.c channel_mcast.c channel_udp.c \ + tcp.c channel_tcp.c \ external_cache.c external_inject.c \ read_config_yy.y read_config_lex.l diff --git a/src/channel.c b/src/channel.c index 9d74b7f..76fb057 100644 --- a/src/channel.c +++ b/src/channel.c @@ -20,11 +20,13 @@ static struct channel_ops *ops[CHANNEL_MAX]; extern struct channel_ops channel_mcast; extern struct channel_ops channel_udp; +extern struct channel_ops channel_tcp; void channel_init(void) { ops[CHANNEL_MCAST] = &channel_mcast; ops[CHANNEL_UDP] = &channel_udp; + ops[CHANNEL_TCP] = &channel_tcp; } #define HEADERSIZ 28 /* IP header (20 bytes) + UDP header 8 (bytes) */ @@ -183,3 +185,18 @@ void channel_stats_extended(struct channel *c, int active, { return c->ops->stats_extended(c, active, h, fd); } + +int channel_accept_isset(struct channel *c, fd_set *readfds) +{ + return c->ops->accept_isset(c, readfds); +} + +int channel_isset(struct channel *c, fd_set *readfds) +{ + return c->ops->isset(c, readfds); +} + +int channel_accept(struct channel *c) +{ + return c->ops->accept(c); +} diff --git a/src/channel_mcast.c b/src/channel_mcast.c index 898b194..9fcacac 100644 --- a/src/channel_mcast.c +++ b/src/channel_mcast.c @@ -112,12 +112,27 @@ channel_mcast_stats_extended(struct channel *c, int active, send(fd, buf, size, 0); } +static int +channel_mcast_isset(struct channel *c, fd_set *readfds) +{ + struct mcast_channel *m = c->data; + return mcast_isset(m->server, readfds); +} + +static int +channel_mcast_accept_isset(struct channel *c, fd_set *readfds) +{ + return 0; +} + struct channel_ops channel_mcast = { .open = channel_mcast_open, .close = channel_mcast_close, .send = channel_mcast_send, .recv = channel_mcast_recv, .get_fd = channel_mcast_get_fd, + .isset = channel_mcast_isset, + .accept_isset = channel_mcast_accept_isset, .stats = channel_mcast_stats, .stats_extended = channel_mcast_stats_extended, }; diff --git a/src/channel_tcp.c b/src/channel_tcp.c new file mode 100644 index 0000000..9fb4b07 --- /dev/null +++ b/src/channel_tcp.c @@ -0,0 +1,149 @@ +/* + * (C) 2009 by Pablo Neira Ayuso + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * TCP support has been sponsored by 6WIND . + */ + +#include +#include + +#include "channel.h" +#include "tcp.h" + +static void +*channel_tcp_open(void *conf) +{ + struct tcp_channel *m; + struct tcp_conf *c = conf; + + m = calloc(sizeof(struct tcp_channel), 1); + if (m == NULL) + return NULL; + + m->client = tcp_client_create(c); + if (m->client == NULL) { + free(m); + return NULL; + } + + m->server = tcp_server_create(c); + if (m->server == NULL) { + tcp_client_destroy(m->client); + free(m); + return NULL; + } + return m; +} + +static int +channel_tcp_send(void *channel, const void *data, int len) +{ + struct tcp_channel *m = channel; + return tcp_send(m->client, data, len); +} + +static int +channel_tcp_recv(void *channel, char *buf, int size) +{ + struct tcp_channel *m = channel; + return tcp_recv(m->server, buf, size); +} + +static void +channel_tcp_close(void *channel) +{ + struct tcp_channel *m = channel; + tcp_client_destroy(m->client); + tcp_server_destroy(m->server); + free(m); +} + +static int +channel_tcp_get_fd(void *channel) +{ + struct tcp_channel *m = channel; + return tcp_get_fd(m->server); +} + +static void +channel_tcp_stats(struct channel *c, int fd) +{ + struct tcp_channel *m = c->data; + char ifname[IFNAMSIZ], buf[512]; + int size; + + if_indextoname(c->channel_ifindex, ifname); + size = tcp_snprintf_stats(buf, sizeof(buf), ifname, + m->client, m->server); + send(fd, buf, size, 0); +} + +static void +channel_tcp_stats_extended(struct channel *c, int active, + struct nlif_handle *h, int fd) +{ + struct tcp_channel *m = c->data; + char ifname[IFNAMSIZ], buf[512]; + const char *status; + unsigned int flags; + int size; + + if_indextoname(c->channel_ifindex, ifname); + nlif_get_ifflags(h, c->channel_ifindex, &flags); + /* + * IFF_UP shows administrative status + * IFF_RUNNING shows carrier status + */ + if (flags & IFF_UP) { + if (!(flags & IFF_RUNNING)) + status = "NO-CARRIER"; + else + status = "RUNNING"; + } else { + status = "DOWN"; + } + size = tcp_snprintf_stats2(buf, sizeof(buf), + ifname, status, active, + &m->client->stats, + &m->server->stats); + send(fd, buf, size, 0); +} + +static int +channel_tcp_isset(struct channel *c, fd_set *readfds) +{ + struct tcp_channel *m = c->data; + return tcp_isset(m->server, readfds); +} + +static int +channel_tcp_accept_isset(struct channel *c, fd_set *readfds) +{ + struct tcp_channel *m = c->data; + return tcp_accept_isset(m->server, readfds); +} + +static int +channel_tcp_accept(struct channel *c) +{ + struct tcp_channel *m = c->data; + return tcp_accept(m->server); +} + +struct channel_ops channel_tcp = { + .open = channel_tcp_open, + .close = channel_tcp_close, + .send = channel_tcp_send, + .recv = channel_tcp_recv, + .accept = channel_tcp_accept, + .get_fd = channel_tcp_get_fd, + .isset = channel_tcp_isset, + .accept_isset = channel_tcp_accept_isset, + .stats = channel_tcp_stats, + .stats_extended = channel_tcp_stats_extended, +}; diff --git a/src/channel_udp.c b/src/channel_udp.c index 1c15b47..5c88647 100644 --- a/src/channel_udp.c +++ b/src/channel_udp.c @@ -112,12 +112,27 @@ channel_udp_stats_extended(struct channel *c, int active, send(fd, buf, size, 0); } +static int +channel_udp_isset(struct channel *c, fd_set *readfds) +{ + struct udp_channel *m = c->data; + return udp_isset(m->server, readfds); +} + +static int +channel_udp_accept_isset(struct channel *c, fd_set *readfds) +{ + return 0; +} + struct channel_ops channel_udp = { .open = channel_udp_open, .close = channel_udp_close, .send = channel_udp_send, .recv = channel_udp_recv, .get_fd = channel_udp_get_fd, + .isset = channel_udp_isset, + .accept_isset = channel_udp_accept_isset, .stats = channel_udp_stats, .stats_extended = channel_udp_stats_extended, }; diff --git a/src/mcast.c b/src/mcast.c index ec11100..4107d5d 100644 --- a/src/mcast.c +++ b/src/mcast.c @@ -304,6 +304,11 @@ int mcast_get_fd(struct mcast_sock *m) return m->fd; } +int mcast_isset(struct mcast_sock *m, fd_set *readfds) +{ + return FD_ISSET(m->fd, readfds); +} + int mcast_snprintf_stats(char *buf, size_t buflen, char *ifname, struct mcast_stats *s, struct mcast_stats *r) diff --git a/src/read_config_lex.l b/src/read_config_lex.l index d3f83aa..9c53c6c 100644 --- a/src/read_config_lex.l +++ b/src/read_config_lex.l @@ -65,6 +65,7 @@ notrack [N|n][O|o][T|t][R|r][A|a][C|c][K|k] "Interface" { return T_IFACE; } "Multicast" { return T_MULTICAST; } "UDP" { return T_UDP; } +"TCP" { return T_TCP; } "HashSize" { return T_HASHSIZE; } "RefreshTime" { return T_REFRESH; } "CacheTimeout" { return T_EXPIRE; } diff --git a/src/read_config_yy.y b/src/read_config_yy.y index 38c5929..0804689 100644 --- a/src/read_config_yy.y +++ b/src/read_config_yy.y @@ -58,7 +58,7 @@ static void __max_dedicated_links_reached(void); %token T_IPV4_ADDR T_IPV4_IFACE T_PORT T_HASHSIZE T_HASHLIMIT T_MULTICAST %token T_PATH T_UNIX T_REFRESH T_IPV6_ADDR T_IPV6_IFACE %token T_IGNORE_UDP T_IGNORE_ICMP T_IGNORE_TRAFFIC T_BACKLOG T_GROUP -%token T_LOG T_UDP T_ICMP T_IGMP T_VRRP T_IGNORE_PROTOCOL +%token T_LOG T_UDP T_ICMP T_IGMP T_VRRP T_TCP T_IGNORE_PROTOCOL %token T_LOCK T_STRIP_NAT T_BUFFER_SIZE_MAX_GROWN T_EXPIRE T_TIMEOUT %token T_GENERAL T_SYNC T_STATS T_RELAX_TRANSITIONS T_BUFFER_SIZE T_DELAY %token T_SYNC_MODE T_LISTEN_TO T_FAMILY T_RESEND_BUFFER_SIZE @@ -573,6 +573,142 @@ udp_option: T_CHECKSUM T_OFF conf.channel[conf.channel_num].u.udp.checksum = 1; }; +tcp_line : T_TCP '{' tcp_options '}' +{ + if (conf.channel_type_global != CHANNEL_NONE && + conf.channel_type_global != CHANNEL_TCP) { + print_err(CTD_CFG_ERROR, "cannot use `TCP' with other " + "dedicated link protocols!"); + exit(EXIT_FAILURE); + } + conf.channel_type_global = CHANNEL_TCP; + conf.channel[conf.channel_num].channel_type = CHANNEL_TCP; + conf.channel[conf.channel_num].channel_flags = CHANNEL_F_BUFFERED | + CHANNEL_F_STREAM; + conf.channel_num++; +}; + +tcp_line : T_TCP T_DEFAULT '{' tcp_options '}' +{ + if (conf.channel_type_global != CHANNEL_NONE && + conf.channel_type_global != CHANNEL_TCP) { + print_err(CTD_CFG_ERROR, "cannot use `TCP' with other " + "dedicated link protocols!"); + exit(EXIT_FAILURE); + } + conf.channel_type_global = CHANNEL_TCP; + conf.channel[conf.channel_num].channel_type = CHANNEL_TCP; + conf.channel[conf.channel_num].channel_flags = CHANNEL_F_DEFAULT | + CHANNEL_F_BUFFERED | + CHANNEL_F_STREAM; + conf.channel_default = conf.channel_num; + conf.channel_num++; +}; + +tcp_options : + | tcp_options tcp_option; + +tcp_option : T_IPV4_ADDR T_IP +{ + __max_dedicated_links_reached(); + + if (!inet_aton($2, &conf.channel[conf.channel_num].u.tcp.server.ipv4)) { + print_err(CTD_CFG_WARN, "%s is not a valid IPv4 address", $2); + break; + } + conf.channel[conf.channel_num].u.tcp.ipproto = AF_INET; +}; + +tcp_option : T_IPV6_ADDR T_IP +{ + __max_dedicated_links_reached(); + +#ifdef HAVE_INET_PTON_IPV6 + if (inet_pton(AF_INET6, $2, + &conf.channel[conf.channel_num].u.tcp.server.ipv6) <= 0) { + print_err(CTD_CFG_WARN, "%s is not a valid IPv6 address", $2); + break; + } +#else + print_err(CTD_CFG_WARN, "cannot find inet_pton(), IPv6 unsupported!"); + break; +#endif + conf.channel[conf.channel_num].u.tcp.ipproto = AF_INET6; +}; + +tcp_option : T_IPV4_DEST_ADDR T_IP +{ + __max_dedicated_links_reached(); + + if (!inet_aton($2, &conf.channel[conf.channel_num].u.tcp.client)) { + print_err(CTD_CFG_WARN, "%s is not a valid IPv4 address", $2); + break; + } + conf.channel[conf.channel_num].u.tcp.ipproto = AF_INET; +}; + +tcp_option : T_IPV6_DEST_ADDR T_IP +{ + __max_dedicated_links_reached(); + +#ifdef HAVE_INET_PTON_IPV6 + if (inet_pton(AF_INET6, $2, + &conf.channel[conf.channel_num].u.tcp.client) <= 0) { + print_err(CTD_CFG_WARN, "%s is not a valid IPv6 address", $2); + break; + } +#else + print_err(CTD_CFG_WARN, "cannot find inet_pton(), IPv6 unsupported!"); + break; +#endif + conf.channel[conf.channel_num].u.tcp.ipproto = AF_INET6; +}; + +tcp_option : T_IFACE T_STRING +{ + int idx; + + __max_dedicated_links_reached(); + strncpy(conf.channel[conf.channel_num].channel_ifname, $2, IFNAMSIZ); + + idx = if_nametoindex($2); + if (!idx) { + print_err(CTD_CFG_WARN, "%s is an invalid interface", $2); + break; + } + conf.channel[conf.channel_num].u.tcp.server.ipv6.scope_id = idx; +}; + +tcp_option : T_PORT T_NUMBER +{ + __max_dedicated_links_reached(); + conf.channel[conf.channel_num].u.tcp.port = $2; +}; + +tcp_option: T_SNDBUFF T_NUMBER +{ + __max_dedicated_links_reached(); + conf.channel[conf.channel_num].u.tcp.sndbuf = $2; +}; + +tcp_option: T_RCVBUFF T_NUMBER +{ + __max_dedicated_links_reached(); + conf.channel[conf.channel_num].u.tcp.rcvbuf = $2; +}; + +tcp_option: T_CHECKSUM T_ON +{ + __max_dedicated_links_reached(); + conf.channel[conf.channel_num].u.tcp.checksum = 0; +}; + +tcp_option: T_CHECKSUM T_OFF +{ + __max_dedicated_links_reached(); + conf.channel[conf.channel_num].u.tcp.checksum = 1; +}; + hashsize : T_HASHSIZE T_NUMBER { conf.hashsize = $2; @@ -654,6 +790,7 @@ sync_line: refreshtime | checksum | multicast_line | udp_line + | tcp_line | relax_transitions | delay_destroy_msgs | sync_mode_alarm @@ -1043,6 +1180,25 @@ filter_protocol_item : T_STRING pent->p_proto); }; +filter_protocol_item : T_TCP +{ + struct protoent *pent; + + pent = getprotobyname("tcp"); + if (pent == NULL) { + print_err(CTD_CFG_WARN, "getprotobyname() cannot find " + "protocol `tcp' in /etc/protocols"); + break; + } + ct_filter_add_proto(STATE(us_filter), pent->p_proto); + + __kernel_filter_start(); + + nfct_filter_add_attr_u32(STATE(filter), + NFCT_FILTER_L4PROTO, + pent->p_proto); +}; + filter_item : T_ADDRESS T_ACCEPT '{' filter_address_list '}' { ct_filter_set_logic(STATE(us_filter), diff --git a/src/sync-mode.c b/src/sync-mode.c index 174df80..6781f10 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -98,39 +98,70 @@ do_channel_handler_step(int i, struct nethdr *net, size_t remain) } } +static char __net[65536]; /* XXX: maximum MTU for IPv4 */ +static char *cur = __net; + +static int channel_stream(struct channel *m, const char *ptr, ssize_t remain) +{ + if (m->channel_flags & CHANNEL_F_STREAM) { + /* truncated data. */ + memcpy(__net, ptr, remain); + cur = __net + remain; + return 1; + } + return 0; +} + /* handler for messages received */ static int channel_handler_routine(struct channel *m, int i) { ssize_t numbytes; - ssize_t remain; - char __net[65536], *ptr = __net; /* XXX: maximum MTU for IPv4 */ + ssize_t remain, pending = cur - __net; + char *ptr = __net; - numbytes = channel_recv(m, __net, sizeof(__net)); + numbytes = channel_recv(m, cur, sizeof(__net) - pending); if (numbytes <= 0) return -1; remain = numbytes; + if (pending) { + remain += pending; + cur = __net; + } + while (remain > 0) { struct nethdr *net = (struct nethdr *) ptr; int len; if (remain < NETHDR_SIZ) { - STATE_SYNC(error).msg_rcv_malformed++; - STATE_SYNC(error).msg_rcv_truncated++; + if (!channel_stream(m, ptr, remain)) { + STATE_SYNC(error).msg_rcv_malformed++; + STATE_SYNC(error).msg_rcv_truncated++; + } break; } len = ntohs(net->len); - if (len > remain || len <= 0) { + if (len <= 0) { STATE_SYNC(error).msg_rcv_malformed++; STATE_SYNC(error).msg_rcv_bad_size++; break; } + if (len > remain) { + if (!channel_stream(m, ptr, remain)) { + STATE_SYNC(error).msg_rcv_malformed++; + STATE_SYNC(error).msg_rcv_bad_size++; + } + break; + } + if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { if (remain < NETHDR_ACK_SIZ) { - STATE_SYNC(error).msg_rcv_malformed++; - STATE_SYNC(error).msg_rcv_truncated++; + if (!channel_stream(m, ptr, remain)) { + STATE_SYNC(error).msg_rcv_malformed++; + STATE_SYNC(error).msg_rcv_truncated++; + } break; } @@ -322,15 +353,23 @@ static int init_sync(void) return 0; } +static void channel_check(struct channel *c, int i, fd_set *readfds) +{ + /* In case that this channel is connection-oriented. */ + if (channel_accept_isset(c, readfds)) + channel_accept(c); + + /* For data handling. */ + if (channel_isset(c, readfds)) + channel_handler(c, i); +} + static void run_sync(fd_set *readfds) { int i; - for (i=0; ichannel_num; i++) { - int fd = channel_get_fd(STATE_SYNC(channel)->channel[i]); - if (FD_ISSET(fd, readfds)) - channel_handler(STATE_SYNC(channel)->channel[i], i); - } + for (i=0; ichannel_num; i++) + channel_check(STATE_SYNC(channel)->channel[i], i, readfds); if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds)) STATE_SYNC(sync)->xmit(); diff --git a/src/tcp.c b/src/tcp.c new file mode 100644 index 0000000..f99c1cb --- /dev/null +++ b/src/tcp.c @@ -0,0 +1,440 @@ +/* + * (C) 2009 by Pablo Neira Ayuso + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * TCP support has been sponsored by 6WIND . + */ + +#include "tcp.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "conntrackd.h" +#include "fds.h" + +struct tcp_sock *tcp_server_create(struct tcp_conf *c) +{ + int yes = 1, ret; + struct tcp_sock *m; + socklen_t socklen = sizeof(int); + + m = calloc(sizeof(struct tcp_sock), 1); + if (m == NULL) + return NULL; + + switch(c->ipproto) { + case AF_INET: + m->addr.ipv4.sin_family = AF_INET; + m->addr.ipv4.sin_port = htons(c->port); + m->addr.ipv4.sin_addr = c->server.ipv4.inet_addr; + m->sockaddr_len = sizeof(struct sockaddr_in); + break; + + case AF_INET6: + m->addr.ipv6.sin6_family = AF_INET6; + m->addr.ipv6.sin6_port = htons(c->port); + m->addr.ipv6.sin6_addr = c->server.ipv6.inet_addr6; + m->addr.ipv6.sin6_scope_id = c->server.ipv6.scope_id; + m->sockaddr_len = sizeof(struct sockaddr_in6); + break; + } + + m->fd = socket(c->ipproto, SOCK_STREAM, 0); + if (m->fd == -1) { + free(m); + return NULL; + } + + if (setsockopt(m->fd, SOL_SOCKET, SO_REUSEADDR, &yes, + sizeof(int)) == -1) { + close(m->fd); + free(m); + return NULL; + } + + if (setsockopt(m->fd, SOL_SOCKET, SO_KEEPALIVE, &yes, + sizeof(int)) == -1) { + close(m->fd); + free(m); + return NULL; + } + +#ifndef SO_RCVBUFFORCE +#define SO_RCVBUFFORCE 33 +#endif + + if (c->rcvbuf && + setsockopt(m->fd, SOL_SOCKET, SO_RCVBUFFORCE, &c->rcvbuf, + sizeof(int)) == -1) { + /* not supported in linux kernel < 2.6.14 */ + if (errno != ENOPROTOOPT) { + close(m->fd); + free(m); + return NULL; + } + } + + getsockopt(m->fd, SOL_SOCKET, SO_RCVBUF, &c->rcvbuf, &socklen); + + if (bind(m->fd, (struct sockaddr *) &m->addr, m->sockaddr_len) == -1) { + close(m->fd); + free(m); + return NULL; + } + + if (listen(m->fd, 1) == -1) { + close(m->fd); + free(m); + return NULL; + } + + if (fcntl(m->fd, F_SETFL, O_NONBLOCK) == -1) { + close(m->fd); + free(m); + return NULL; + } + + /* now we accept new connections ... */ + ret = accept(m->fd, NULL, NULL); + if (ret == -1) { + if (errno != EAGAIN) { + /* unexpected error, give up. */ + close(m->fd); + free(m); + m = NULL; + } else { + /* still in progress ... we'll do it in tcp_recv() */ + m->state = TCP_SERVER_ACCEPTING; + } + } else { + /* very unlikely at this stage. */ + if (fcntl(ret, F_SETFL, O_NONBLOCK) == -1) { + /* unexpected error, give up. */ + close(m->fd); + free(m); + return NULL; + } + m->client_fd = ret; + m->state = TCP_SERVER_CONNECTED; + register_fd(m->client_fd, STATE(fds)); + } + + return m; +} + +void tcp_server_destroy(struct tcp_sock *m) +{ + close(m->fd); + free(m); +} + +static int +tcp_client_init(struct tcp_sock *m, struct tcp_conf *c) +{ + int ret = 0; + socklen_t socklen = sizeof(int); + + m->fd = socket(c->ipproto, SOCK_STREAM, 0); + if (m->fd == -1) + return -1; + + if (setsockopt(m->fd, SOL_SOCKET, SO_NO_CHECK, &c->checksum, + sizeof(int)) == -1) { + close(m->fd); + return -1; + } + +#ifndef SO_SNDBUFFORCE +#define SO_SNDBUFFORCE 32 +#endif + + if (c->sndbuf && + setsockopt(m->fd, SOL_SOCKET, SO_SNDBUFFORCE, &c->sndbuf, + sizeof(int)) == -1) { + /* not supported in linux kernel < 2.6.14 */ + if (errno != ENOPROTOOPT) { + close(m->fd); + return -1; + } + } + + getsockopt(m->fd, SOL_SOCKET, SO_SNDBUF, &c->sndbuf, &socklen); + + switch(c->ipproto) { + case AF_INET: + m->addr.ipv4.sin_family = AF_INET; + m->addr.ipv4.sin_port = htons(c->port); + m->addr.ipv4.sin_addr = c->client.inet_addr; + m->sockaddr_len = sizeof(struct sockaddr_in); + break; + case AF_INET6: + m->addr.ipv6.sin6_family = AF_INET6; + m->addr.ipv6.sin6_port = htons(c->port); + memcpy(&m->addr.ipv6.sin6_addr, &c->client.inet_addr6, + sizeof(struct in6_addr)); + m->sockaddr_len = sizeof(struct sockaddr_in6); + break; + default: + ret = -1; + break; + } + + if (ret == -1) { + close(m->fd); + return -1; + } + + if (fcntl(m->fd, F_SETFL, O_NONBLOCK) == -1) { + close(m->fd); + return -1; + } + + ret = connect(m->fd, (struct sockaddr *)&m->addr, m->sockaddr_len); + if (ret == -1) { + if (errno == EINPROGRESS) { + /* connection in progress ... */ + m->state = TCP_CLIENT_DISCONNECTED; + } else if (errno == ECONNREFUSED) { + /* connection refused. */ + m->state = TCP_CLIENT_DISCONNECTED; + } else { + /* unexpected error, give up. */ + close(m->fd); + return -1; + } + } else { + /* very unlikely at this stage. */ + m->state = TCP_CLIENT_CONNECTED; + } + return 0; +} + +static struct tcp_conf *tcp_client_conf; /* XXX: need this to re-connect. */ + +struct tcp_sock *tcp_client_create(struct tcp_conf *c) +{ + struct tcp_sock *m; + + tcp_client_conf = c; + + m = calloc(sizeof(struct tcp_sock), 1); + if (m == NULL) + return NULL; + + if (tcp_client_init(m, c) == -1) { + free(m); + return NULL; + } + + return m; +} + +void tcp_client_destroy(struct tcp_sock *m) +{ + close(m->fd); + free(m); +} + +int tcp_accept(struct tcp_sock *m) +{ + int ret; + + /* we got an attempt to connect but we already have a client? */ + if (m->state != TCP_SERVER_ACCEPTING) { + /* clear the session and restart ... */ + unregister_fd(m->client_fd, STATE(fds)); + close(m->client_fd); + m->client_fd = -1; + m->state = TCP_SERVER_ACCEPTING; + } + + /* the other peer wants to connect ... */ + ret = accept(m->fd, NULL, NULL); + if (ret == -1) { + if (errno != EAGAIN) { + /* unexpected error. Give us another try. */ + m->state = TCP_SERVER_ACCEPTING; + } else { + /* waiting for new connections. */ + m->state = TCP_SERVER_ACCEPTING; + } + } else { + /* the peer finally got connected. */ + if (fcntl(ret, F_SETFL, O_NONBLOCK) == -1) { + /* close the connection and give us another chance. */ + close(ret); + return -1; + } + + m->client_fd = ret; + m->state = TCP_SERVER_CONNECTED; + register_fd(m->client_fd, STATE(fds)); + } + return m->client_fd; +} + +ssize_t tcp_send(struct tcp_sock *m, const void *data, int size) +{ + ssize_t ret = 0; + + switch(m->state) { + case TCP_CLIENT_DISCONNECTED: + ret = connect(m->fd, (struct sockaddr *)&m->addr, + m->sockaddr_len); + if (ret == -1) { + if (errno == EINPROGRESS || errno == EALREADY) { + /* connection in progress or already trying. */ + m->state = TCP_CLIENT_DISCONNECTED; + } else if (errno == ECONNREFUSED) { + /* connection refused. */ + m->state = TCP_CLIENT_DISCONNECTED; + } else { + /* unexpected error, give up. */ + m->state = TCP_CLIENT_DISCONNECTED; + } + break; + } else { + /* we got connected :) */ + m->state = TCP_CLIENT_CONNECTED; + } + case TCP_CLIENT_CONNECTED: + ret = sendto(m->fd, data, size, 0, + (struct sockaddr *) &m->addr, m->sockaddr_len); + if (ret == -1) { + if (errno == EPIPE || errno == ECONNRESET) { + close(m->fd); + tcp_client_init(m, tcp_client_conf); + m->state = TCP_CLIENT_DISCONNECTED; + } else { + m->stats.error++; + return 0; + } + } + } + + if (ret >= 0) { + m->stats.bytes += ret; + m->stats.messages++; + } + return ret; +} + +ssize_t tcp_recv(struct tcp_sock *m, void *data, int size) +{ + ssize_t ret = 0; + socklen_t sin_size = sizeof(struct sockaddr_in); + + /* we are not connected, skip. */ + if (m->state != TCP_SERVER_CONNECTED) + return 0; + + ret = recvfrom(m->client_fd, data, size, 0, + (struct sockaddr *)&m->addr, &sin_size); + if (ret == -1) { + /* the other peer has disconnected... */ + if (errno == ENOTCONN) { + unregister_fd(m->client_fd, STATE(fds)); + close(m->client_fd); + m->client_fd = -1; + m->state = TCP_SERVER_ACCEPTING; + tcp_accept(m); + } else if (errno != EAGAIN) { + m->stats.error++; + } + } else if (ret == 0) { + /* the other peer has closed the connection... */ + unregister_fd(m->client_fd, STATE(fds)); + close(m->client_fd); + m->client_fd = -1; + m->state = TCP_SERVER_ACCEPTING; + tcp_accept(m); + } + + if (ret >= 0) { + m->stats.bytes += ret; + m->stats.messages++; + } + return ret; +} + +int tcp_get_fd(struct tcp_sock *m) +{ + return m->fd; +} + +int tcp_isset(struct tcp_sock *m, fd_set *readfds) +{ + return m->client_fd >= 0 ? FD_ISSET(m->client_fd, readfds) : 0; +} + +int tcp_accept_isset(struct tcp_sock *m, fd_set *readfds) +{ + return FD_ISSET(m->fd, readfds); +} + +int +tcp_snprintf_stats(char *buf, size_t buflen, char *ifname, + struct tcp_sock *client, struct tcp_sock *server) +{ + size_t size; + struct tcp_stats *s = &client->stats, *r = &server->stats; + + size = snprintf(buf, buflen, "TCP traffic (active device=%s) " + "server=%s client=%s:\n" + "%20llu Bytes sent " + "%20llu Bytes recv\n" + "%20llu Pckts sent " + "%20llu Pckts recv\n" + "%20llu Error send " + "%20llu Error recv\n\n", + ifname, + server->state == TCP_SERVER_CONNECTED ? + "connected" : "disconnected", + client->state == TCP_CLIENT_CONNECTED ? + "connected" : "disconnected", + (unsigned long long)s->bytes, + (unsigned long long)r->bytes, + (unsigned long long)s->messages, + (unsigned long long)r->messages, + (unsigned long long)s->error, + (unsigned long long)r->error); + return size; +} + +int +tcp_snprintf_stats2(char *buf, size_t buflen, const char *ifname, + const char *status, int active, + struct tcp_stats *s, struct tcp_stats *r) +{ + size_t size; + + size = snprintf(buf, buflen, + "TCP traffic device=%s status=%s role=%s:\n" + "%20llu Bytes sent " + "%20llu Bytes recv\n" + "%20llu Pckts sent " + "%20llu Pckts recv\n" + "%20llu Error send " + "%20llu Error recv\n\n", + ifname, status, active ? "ACTIVE" : "BACKUP", + (unsigned long long)s->bytes, + (unsigned long long)r->bytes, + (unsigned long long)s->messages, + (unsigned long long)r->messages, + (unsigned long long)s->error, + (unsigned long long)r->error); + return size; +} diff --git a/src/udp.c b/src/udp.c index 4b9eb80..ecaa46e 100644 --- a/src/udp.c +++ b/src/udp.c @@ -214,6 +214,11 @@ int udp_get_fd(struct udp_sock *m) return m->fd; } +int udp_isset(struct udp_sock *m, fd_set *readfds) +{ + return FD_ISSET(m->fd, readfds); +} + int udp_snprintf_stats(char *buf, size_t buflen, char *ifname, struct udp_stats *s, struct udp_stats *r) -- cgit v1.2.3