summaryrefslogtreecommitdiffstats
path: root/src/sync-mode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-mode.c')
-rw-r--r--src/sync-mode.c65
1 files changed, 52 insertions, 13 deletions
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 174df80..6781f10 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -98,39 +98,70 @@ do_channel_handler_step(int i, struct nethdr *net, size_t remain)
}
}
+static char __net[65536]; /* XXX: maximum MTU for IPv4 */
+static char *cur = __net;
+
+static int channel_stream(struct channel *m, const char *ptr, ssize_t remain)
+{
+ if (m->channel_flags & CHANNEL_F_STREAM) {
+ /* truncated data. */
+ memcpy(__net, ptr, remain);
+ cur = __net + remain;
+ return 1;
+ }
+ return 0;
+}
+
/* handler for messages received */
static int channel_handler_routine(struct channel *m, int i)
{
ssize_t numbytes;
- ssize_t remain;
- char __net[65536], *ptr = __net; /* XXX: maximum MTU for IPv4 */
+ ssize_t remain, pending = cur - __net;
+ char *ptr = __net;
- numbytes = channel_recv(m, __net, sizeof(__net));
+ numbytes = channel_recv(m, cur, sizeof(__net) - pending);
if (numbytes <= 0)
return -1;
remain = numbytes;
+ if (pending) {
+ remain += pending;
+ cur = __net;
+ }
+
while (remain > 0) {
struct nethdr *net = (struct nethdr *) ptr;
int len;
if (remain < NETHDR_SIZ) {
- STATE_SYNC(error).msg_rcv_malformed++;
- STATE_SYNC(error).msg_rcv_truncated++;
+ if (!channel_stream(m, ptr, remain)) {
+ STATE_SYNC(error).msg_rcv_malformed++;
+ STATE_SYNC(error).msg_rcv_truncated++;
+ }
break;
}
len = ntohs(net->len);
- if (len > remain || len <= 0) {
+ if (len <= 0) {
STATE_SYNC(error).msg_rcv_malformed++;
STATE_SYNC(error).msg_rcv_bad_size++;
break;
}
+ if (len > remain) {
+ if (!channel_stream(m, ptr, remain)) {
+ STATE_SYNC(error).msg_rcv_malformed++;
+ STATE_SYNC(error).msg_rcv_bad_size++;
+ }
+ break;
+ }
+
if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
if (remain < NETHDR_ACK_SIZ) {
- STATE_SYNC(error).msg_rcv_malformed++;
- STATE_SYNC(error).msg_rcv_truncated++;
+ if (!channel_stream(m, ptr, remain)) {
+ STATE_SYNC(error).msg_rcv_malformed++;
+ STATE_SYNC(error).msg_rcv_truncated++;
+ }
break;
}
@@ -322,15 +353,23 @@ static int init_sync(void)
return 0;
}
+static void channel_check(struct channel *c, int i, fd_set *readfds)
+{
+ /* In case that this channel is connection-oriented. */
+ if (channel_accept_isset(c, readfds))
+ channel_accept(c);
+
+ /* For data handling. */
+ if (channel_isset(c, readfds))
+ channel_handler(c, i);
+}
+
static void run_sync(fd_set *readfds)
{
int i;
- for (i=0; i<STATE_SYNC(channel)->channel_num; i++) {
- int fd = channel_get_fd(STATE_SYNC(channel)->channel[i]);
- if (FD_ISSET(fd, readfds))
- channel_handler(STATE_SYNC(channel)->channel[i], i);
- }
+ for (i=0; i<STATE_SYNC(channel)->channel_num; i++)
+ channel_check(STATE_SYNC(channel)->channel[i], i, readfds);
if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds))
STATE_SYNC(sync)->xmit();