about summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard Nyberg <rnyberg@murmeldjur.se>2005-10-04 17:52:56 +0000
committerRichard Nyberg <rnyberg@murmeldjur.se>2005-10-04 17:52:56 +0000
commit32a88ff5d8ba98700ef5c52d3b689ab00718fe5c (patch)
tree42d92647af4414e76bc373759c9e346304fad8aa
parentd5bf714f1d8ed1e57fcdfc4d41de420fde2db54c (diff)
downloadbtpd-32a88ff5d8ba98700ef5c52d3b689ab00718fe5c.tar.gz
btpd-32a88ff5d8ba98700ef5c52d3b689ab00718fe5c.zip
Rewrite of the code for receiving data from peers.
It's not quite how I want it yet, but it's getting there.

-rw-r--r--btpd/net.c650
-rw-r--r--btpd/net.h42
-rw-r--r--btpd/peer.c14
-rw-r--r--btpd/peer.h7
4 files changed, 265 insertions, 448 deletions
diff --git a/btpd/net.c b/btpd/net.c
index 6435064..90f1203 100644
--- a/btpd/net.c
+++ b/btpd/net.c
@@ -18,43 +18,11 @@
 
 #include "btpd.h"
 
-#define min(x, y) ((x) <= (y) ? (x) : (y))
-
-static unsigned long
-net_write(struct peer *p, unsigned long wmax);
-
-void
-net_read_cb(int sd, short type, void *arg)
-{
-    struct peer *p = (struct peer *)arg;
-    if (btpd.ibwlim == 0) {
-	p->reader->read(p, 0);
-    } else if (btpd.ibw_left > 0) {
-	btpd.ibw_left -= p->reader->read(p, btpd.ibw_left);
-    } else {
-	p->flags |= PF_ON_READQ;
-	BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry);
-    }
-}
+#ifndef IOV_MAX
+#define IOV_MAX 1024
+#endif
 
-void
-net_write_cb(int sd, short type, void *arg)
-{
-    struct peer *p = (struct peer *)arg;
-    if (type == EV_TIMEOUT) {
-	btpd_log(BTPD_L_CONN, "Write attempt timed out.\n");
-	peer_kill(p);
-	return;
-    }
-    if (btpd.obwlim == 0) {
-	net_write(p, 0);
-    } else if (btpd.obw_left > 0) {
-	btpd.obw_left -= net_write(p, btpd.obw_left);
-    } else {
-	p->flags |= PF_ON_WRITEQ;
-	BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry);
-    }
-}
+#define min(x, y) ((x) <= (y) ? (x) : (y))
 
 void
 net_write32(void *buf, uint32_t num)
@@ -68,19 +36,11 @@ net_read32(void *buf)
     return ntohl(*(uint32_t *)buf);
 }
 
-void
-kill_shake(struct input_reader *reader)
-{
-    free(reader);
-}
-
-#define NIOV 16
-
 static unsigned long
 net_write(struct peer *p, unsigned long wmax)
 {
     struct nb_link *nl;
-    struct iovec iov[NIOV];
+    struct iovec iov[IOV_MAX];
     int niov;
     int limited;
     ssize_t nwritten;
@@ -90,7 +50,8 @@ net_write(struct peer *p, unsigned long wmax)
 
     niov = 0;
     assert((nl = BTPDQ_FIRST(&p->outq)) != NULL);
-    while (niov < NIOV && nl != NULL && (!limited || (limited && wmax > 0))) {
+    while ((niov < IOV_MAX && nl != NULL
+	       && (!limited || (limited && wmax > 0)))) {
 	if (niov > 0) {
 	    iov[niov].iov_base = nl->nb->buf;
 	    iov[niov].iov_len = nl->nb->len;
@@ -159,363 +120,136 @@ net_write(struct peer *p, unsigned long wmax)
     return nwritten;
 }
 
-static void
-kill_generic(struct input_reader *reader)
-{
-    free(reader);
-}
-
-static size_t
-net_read(struct peer *p, char *buf, size_t len)
-{
-    ssize_t nread = read(p->sd, buf, len);
-    if (nread < 0) {
-	if (errno == EAGAIN) {
-	    event_add(&p->in_ev, NULL);
-	    return 0;
-	} else {
-	    btpd_log(BTPD_L_CONN, "read error: %s\n", strerror(errno));
-	    peer_kill(p);
-	    return 0;
-	}
-    } else if (nread == 0) {
-	btpd_log(BTPD_L_CONN, "conn closed by other side.\n");
-	if (!BTPDQ_EMPTY(&p->outq))
-	    p->flags |= PF_WRITE_CLOSE;
-	else
-	    peer_kill(p);
-	return 0;
-    } else 
-	return nread;
-}
-
-static size_t
-net_read_to_buf(struct peer *p, struct io_buffer *iob, unsigned long rmax)
-{
-    if (rmax == 0)
-	rmax = iob->buf_len - iob->buf_off;
-    else
-	rmax = min(rmax, iob->buf_len - iob->buf_off);
-
-    assert(rmax > 0);
-    size_t nread = net_read(p, iob->buf + iob->buf_off, rmax);
-    if (nread > 0)
-	iob->buf_off += nread;
-    return nread;
-}
-
 void
-kill_bitfield(struct input_reader *rd)
+net_set_state(struct peer *p, int state, size_t size)
 {
-    free(rd);
+    p->net_state = state;
+    p->state_bytes = size;
 }
 
-static void net_generic_reader(struct peer *p);
-
-static unsigned long
-read_bitfield(struct peer *p, unsigned long rmax)
+static int
+net_dispatch_msg(struct peer *p, uint32_t mlen, uint8_t mnum, uint8_t *buf)
 {
-    struct bitfield_reader *rd = (struct bitfield_reader *)p->reader;
-
-    size_t nread = net_read_to_buf(p, &rd->iob, rmax);
-    if (nread == 0)
-	return 0;
-
-    if (rd->iob.buf_off == rd->iob.buf_len) {
-	peer_on_bitfield(p, rd->iob.buf);
-	free(rd);
-	net_generic_reader(p);
-    } else
-	event_add(&p->in_ev, NULL);
-
-    return nread;
-}
-
-void
-kill_piece(struct input_reader *rd)
-{
-    free(rd);
+    uint32_t index, begin, length;
+    int res = 0;
+
+    switch (mnum) {
+    case MSG_CHOKE:
+	peer_on_choke(p);
+	break;
+    case MSG_UNCHOKE:
+	peer_on_unchoke(p);
+	break;
+    case MSG_INTEREST:
+	peer_on_interest(p);
+	break;
+    case MSG_UNINTEREST:
+	peer_on_uninterest(p);
+	break;
+    case MSG_HAVE:
+	peer_on_have(p, net_read32(buf + 5));
+	break;
+    case MSG_BITFIELD:
+	peer_on_bitfield(p, buf + 5);
+	break;
+    case MSG_REQUEST:
+	if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) {
+	    index = net_read32(buf + 5);
+	    begin = net_read32(buf + 9);
+	    length = net_read32(buf + 13);
+	    if ((length > PIECE_BLOCKLEN
+		    || index >= p->tp->meta.npieces
+		    || !has_bit(p->tp->piece_field, index)
+		    || begin + length < torrent_piece_size(p->tp, index))) {
+		res = 1;
+		break;
+	    }
+	    peer_on_request(p, index, begin, length);
+	}
+	break;
+    case MSG_CANCEL:
+	index = net_read32(buf + 5);
+	begin = net_read32(buf + 9);
+	length = net_read32(buf + 13);
+	peer_on_cancel(p, index, begin, length);
+	break;
+    case MSG_PIECE:
+	index = net_read32(buf + 5);
+	begin = net_read32(buf + 9);
+	length = mlen - 9;
+	peer_on_piece(p, index, begin, length, buf + 13);
+	break;
+    default:
+	abort();
+    }
+    return res;
 }
 
-static unsigned long
-read_piece(struct peer *p, unsigned long rmax)
+static int
+net_mh_ok(struct peer *p, uint32_t mlen, uint8_t mnum)
 {
-    struct piece_reader *rd = (struct piece_reader *)p->reader;
-
-    size_t nread = net_read_to_buf(p, &rd->iob, rmax);
-    if (nread == 0)
+    switch (mnum) {
+    case MSG_CHOKE:
+    case MSG_UNCHOKE:
+    case MSG_INTEREST:
+    case MSG_UNINTEREST:
+	return mlen == 1;
+    case MSG_HAVE:
+	return mlen == 5;
+    case MSG_BITFIELD:
+	return mlen == (uint32_t)ceil(p->tp->meta.npieces / 8.0) + 1;
+    case MSG_REQUEST:
+    case MSG_CANCEL:
+	return mlen == 13;
+    case MSG_PIECE:
+	return mlen <= PIECE_BLOCKLEN + 9;
+    default:
 	return 0;
-
-    p->rate_to_me[btpd.seconds % RATEHISTORY] += nread;
-    p->tp->downloaded += nread;
-    if (rd->iob.buf_off == rd->iob.buf_len) {
-	peer_on_piece(p, rd->index, rd->begin, rd->iob.buf_len, rd->iob.buf);
-	free(rd);
-	net_generic_reader(p);
-    } else
-	event_add(&p->in_ev, NULL);
-
-    return nread;
-}
-
-#define GRBUFLEN (1 << 15)
-
-static unsigned long
-net_generic_read(struct peer *p, unsigned long rmax)
-{
-    char buf[GRBUFLEN];
-    struct io_buffer iob = { 0, GRBUFLEN, buf };
-    struct generic_reader *gr = (struct generic_reader *)p->reader;
-    size_t nread;
-    size_t off, len;
-    int got_part;
-
-    if (gr->iob.buf_off > 0) {
-	iob.buf_off = gr->iob.buf_off;
-	bcopy(gr->iob.buf, iob.buf, iob.buf_off);
-	gr->iob.buf_off = 0;
     }
-    
-    if ((nread = net_read_to_buf(p, &iob, rmax)) == 0)
-	return 0;
-
-    len = iob.buf_off;
-    off = 0;
-
-    got_part = 0;
-    while (!got_part && len - off >= 4) {
-	size_t msg_len = net_read32(buf + off);
-
-	if (msg_len == 0) {	/* Keep alive */
-	    off += 4;
-	    continue;
-	}
-	if (len - off < 5) {
-	    got_part = 1;
-	    break;
-	}
-
-	switch (buf[off + 4]) {
-	case MSG_CHOKE:
-	    if (msg_len != 1)
-		goto bad_data;
-	    peer_on_choke(p);
-	    break;
-	case MSG_UNCHOKE:
-	    if (msg_len != 1)
-		goto bad_data;
-	    peer_on_unchoke(p);
-	    break;
-	case MSG_INTEREST:
-	    if (msg_len != 1)
-		goto bad_data;
-	    peer_on_interest(p);
-	    break;
-	case MSG_UNINTEREST:
-	    if (msg_len != 1)
-		goto bad_data;
-	    peer_on_uninterest(p);
-	    break;
-	case MSG_HAVE:
-	    if (msg_len != 5)
-		goto bad_data;
-	    else if (len - off >= msg_len + 4) {
-		uint32_t index = net_read32(buf + off + 5);
-		peer_on_have(p, index);
-	    } else
-		got_part = 1;
-	    break;
-	case MSG_BITFIELD:
-	    if (msg_len != (size_t)ceil(p->tp->meta.npieces / 8.0) + 1)
-		goto bad_data;
-	    else if (p->npieces != 0)
-		goto bad_data;
-	    else if (len - off >= msg_len + 4)
-		peer_on_bitfield(p, buf + off + 5);
-	    else {
-		struct bitfield_reader *rp;
-		size_t mem = sizeof(*rp) + msg_len - 1;
-		p->reader->kill(p->reader);
-		rp = btpd_calloc(1, mem);
-		rp->rd.kill = kill_bitfield;
-		rp->rd.read = read_bitfield;
-		rp->iob.buf = (char *)rp + sizeof(*rp);
-		rp->iob.buf_len = msg_len - 1;
-		rp->iob.buf_off = len - off - 5;
-		bcopy(buf + off + 5, rp->iob.buf, rp->iob.buf_off);
-		p->reader = (struct input_reader *)rp;
-		event_add(&p->in_ev, NULL);
-		return nread;
-	    }
-	    break;
-	case MSG_REQUEST:
-	    if (msg_len != 13)
-		goto bad_data;
-	    else if (len - off >= msg_len + 4) {
-		if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) != PF_P_WANT)
-		    break;
-		uint32_t index, begin, length;
-		index = net_read32(buf + off + 5);
-		begin = net_read32(buf + off + 9);
-		length = net_read32(buf + off + 13);
-		if (length > (1 << 15))
-		    goto bad_data;
-		if (index >= p->tp->meta.npieces)
-		    goto bad_data;
-		if (!has_bit(p->tp->piece_field, index))
-		    goto bad_data;
-		if (begin + length > torrent_piece_size(p->tp, index))
-		    goto bad_data;
-		peer_on_request(p, index, begin, length);
-	    } else
-		got_part = 1;
-	    break;
-	case MSG_PIECE:
-	    if (msg_len < 10)
-		goto bad_data;
-	    else if (len - off >= 13) {
-		uint32_t index = net_read32(buf + off + 5);
-		uint32_t begin = net_read32(buf + off + 9);
-		uint32_t length = msg_len - 9;
-		if (len - off >= msg_len + 4) {
-		    p->tp->downloaded += length;
-		    p->rate_to_me[btpd.seconds % RATEHISTORY] += length;
-		    peer_on_piece(p, index, begin, length, buf + off + 13);
-		} else {
-		    struct piece_reader *rp;
-		    size_t mem = sizeof(*rp) + length;
-		    p->reader->kill(p->reader);
-		    rp = btpd_calloc(1, mem);
-		    rp->rd.kill = kill_piece;
-		    rp->rd.read = read_piece;
-		    rp->index = index;
-		    rp->begin = begin;
-		    rp->iob.buf = (char *)rp + sizeof(*rp);
-		    rp->iob.buf_len = length;
-		    rp->iob.buf_off = len - off - 13;
-		    bcopy(buf + off + 13, rp->iob.buf, rp->iob.buf_off);
-		    p->reader = (struct input_reader *)rp;
-		    event_add(&p->in_ev, NULL);
-		    p->tp->downloaded += rp->iob.buf_off;
-		    p->rate_to_me[btpd.seconds % RATEHISTORY] +=
-			rp->iob.buf_off;
-		    return nread;
-		}
-	    } else
-		got_part = 1;
-	    break;
-	case MSG_CANCEL:
-	    if (msg_len != 13)
-		goto bad_data;
-	    else if (len - off >= msg_len + 4) {
-		uint32_t index = net_read32(buf + off + 5);
-		uint32_t begin = net_read32(buf + off + 9);
-		uint32_t length = net_read32(buf + off + 13);
-		if (index > p->tp->meta.npieces)
-		    goto bad_data;
-		if (begin + length > torrent_piece_size(p->tp, index))
-		    goto bad_data;
-		peer_on_cancel(p, index, begin, length);
-	    } else
-		got_part = 1;
-	    break;
-	default:
-	    goto bad_data;
-	}
-	if (!got_part)
-	    off += 4 + msg_len;
-    }
-    if (off != len) {
-	gr->iob.buf_off = len - off;
-        assert(gr->iob.buf_off <= gr->iob.buf_len);
-	bcopy(buf + off, gr->iob.buf, gr->iob.buf_off);
-    }
-    event_add(&p->in_ev, NULL);
-    return nread;
-
-bad_data:
-    btpd_log(BTPD_L_MSG, "received bad data from %p\n", p);
-    peer_kill(p);
-    return nread;
 }
 
 static void
-net_generic_reader(struct peer *p)
+net_progress(struct peer *p, size_t length)
 {
-    struct generic_reader *gr;
-    gr = btpd_calloc(1, sizeof(*gr));
-
-    gr->rd.read = net_generic_read;
-    gr->rd.kill = kill_generic;
-
-    gr->iob.buf = gr->_io_buf;
-    gr->iob.buf_len = MAX_INPUT_LEFT;
-    gr->iob.buf_off = 0;
-
-    p->reader = (struct input_reader *)gr;
-
-    event_add(&p->in_ev, NULL);
+    if (p->net_state == NET_MSGPIECE) {
+	p->tp->downloaded += length;
+	p->rate_to_me[btpd.seconds % RATEHISTORY] += length;
+    }
 }
 
-static unsigned long
-net_shake_read(struct peer *p, unsigned long rmax)
+static int
+net_state_foo(struct peer *p, struct io_buffer *iob)
 {
-    struct handshake *hs = (struct handshake *)p->reader;
-    struct io_buffer *in = &hs->in;
+    uint32_t mlen;
+    uint32_t mnum;
 
-    size_t nread = net_read_to_buf(p, in, rmax);
-    if (nread == 0)
-	return 0;
-
-    switch (hs->state) {
-    case SHAKE_INIT:
-	if (in->buf_off < 20)
-	    break;
-	else if (bcmp(in->buf, "\x13""BitTorrent protocol", 20) == 0)
-	    hs->state = SHAKE_PSTR;
-	else
-	    goto bad_shake;
+    switch (p->net_state) {
     case SHAKE_PSTR:
-	if (in->buf_off < 28)
-	    break;
-	else
-	    hs->state = SHAKE_RESERVED;
-    case SHAKE_RESERVED:
-	if (in->buf_off < 48)
-	    break;
-	else if (hs->incoming) {
-	    struct torrent *tp = torrent_get_by_hash(in->buf + 28);
-	    if (tp != NULL) {
-		hs->state = SHAKE_INFO;
-		p->tp = tp;
-		peer_send(p, nb_create_shake(p->tp));
-	    } else
-		goto bad_shake;
-	} else {
-	    if (bcmp(in->buf + 28, p->tp->meta.info_hash, 20) == 0)
-		hs->state = SHAKE_INFO;
-	    else
-		goto bad_shake;
-	}
+	assert(iob->buf_len >= 28);
+	if (bcmp(iob->buf, "\x13""BitTorrent protocol", 20) != 0)
+	    goto bad;
+	net_set_state(p, SHAKE_INFO, 20);
+	return 28;
     case SHAKE_INFO:
-	if (in->buf_off < 68)
-	    break;
-	else {
-	    if (torrent_has_peer(p->tp, in->buf + 48))
-		goto bad_shake; // Not really, but we're already connected.
-	    else if (bcmp(in->buf + 48, btpd.peer_id, 20) == 0)
-		goto bad_shake; // Connection from myself.
-	    bcopy(in->buf + 48, p->id, 20);
-	    hs->state = SHAKE_ID;
-	}
-    default:
-	assert(hs->state == SHAKE_ID);
-    }
-    if (hs->state == SHAKE_ID) {
+	assert(iob->buf_len >= 20);
+	if (p->flags & PF_INCOMING) {
+	    struct torrent *tp = torrent_get_by_hash(iob->buf);
+	    if (tp == NULL)
+		goto bad;
+	    p->tp = tp;
+	    peer_send(p, nb_create_shake(p->tp));
+	} else if (bcmp(iob->buf, p->tp->meta.info_hash, 20) != 0)
+	    goto bad;
+	net_set_state(p, SHAKE_ID, 20);
+	return 20;
+    case SHAKE_ID:
+	assert(iob->buf_len >= 20);
+	if ((torrent_has_peer(p->tp, iob->buf)
+		|| bcmp(iob->buf, btpd.peer_id, 20) == 0))
+	    goto bad;
+	bcopy(iob->buf, p->id, 20);
 	btpd_log(BTPD_L_CONN, "Got whole shake.\n");
-	free(hs);
 	p->piece_field = btpd_calloc(1, (int)ceil(p->tp->meta.npieces / 8.0));
-	net_generic_reader(p);
 	if (p->tp->have_npieces > 0) {
 	    if (p->tp->have_npieces * 9 < 5 + ceil(p->tp->meta.npieces / 8.0))
 		peer_send(p, nb_create_multihave(p->tp));
@@ -525,37 +259,112 @@ net_shake_read(struct peer *p, unsigned long rmax)
 	    }
 	}
 	cm_on_new_peer(p);
-    } else
-	event_add(&p->in_ev, NULL);
-
-    return nread;
+	net_set_state(p, NET_MSGSIZE, 4);
+	return 20;
+    case NET_MSGSIZE:
+	assert(iob->buf_len >= 4);
+	if (bcmp(iob->buf, "\0\0\0\0", 4) == 0)
+	    return 4;
+	else {
+	    net_set_state(p, NET_MSGHEAD, 5);
+	    return 0;
+	}
+    case NET_MSGHEAD:
+	assert(iob->buf_len >= 5);
+	mlen = net_read32(iob->buf);
+	mnum = iob->buf[4];
+	if (!net_mh_ok(p, mlen, mnum)) {
+	    btpd_log(BTPD_L_ERROR, "error in head\n");
+	    goto bad;
+	} else if (mlen == 1) {
+	    if (net_dispatch_msg(p, mlen, mnum, iob->buf) != 0) {
+		btpd_log(BTPD_L_ERROR, "error in dispatch\n");
+		goto bad;
+		}
+	    net_set_state(p, NET_MSGSIZE, 4);
+	    return 5;
+	} else {
+	    uint8_t nstate = mnum == MSG_PIECE ? NET_MSGPIECE : NET_MSGBODY;
+	    net_set_state(p, nstate, mlen + 4);
+	    return 0;
+	}
+    case NET_MSGPIECE:
+    case NET_MSGBODY:
+	mlen = net_read32(iob->buf);
+	mnum = iob->buf[4];
+	assert(iob->buf_len >= mlen + 4);
+	if (net_dispatch_msg(p, mlen, mnum, iob->buf) != 0) {
+	    btpd_log(BTPD_L_ERROR, "error in dispatch\n");
+	    goto bad;
+	}
+	net_set_state(p, NET_MSGSIZE, 4);
+	return mlen + 4;
+    default:
+	abort();
+    }
 
-bad_shake:
-    btpd_log(BTPD_L_CONN, "Bad shake(%d)\n", hs->state);
+bad:
+    btpd_log(BTPD_L_CONN, "bad data.\n");
     peer_kill(p);
-    return nread;
+    return -1;
 }
 
+#define GRBUFLEN (1 << 15)
 
-void
-net_handshake(struct peer *p, int incoming)
+static unsigned long
+net_read(struct peer *p, unsigned long rmax)
 {
-    struct handshake *hs;
+    size_t baggage = p->net_in.buf_len;
+    char buf[GRBUFLEN + baggage];
+    struct io_buffer sbuf = { baggage, sizeof(buf), buf };
+    if (baggage > 0) {
+	bcopy(p->net_in.buf, buf, baggage);
+	free(p->net_in.buf);
+	p->net_in.buf = NULL;
+	p->net_in.buf_off = 0;
+	p->net_in.buf_len = 0;
+    }
+
+    if (rmax > 0)
+	rmax = min(rmax, sbuf.buf_len - sbuf.buf_off);
+    else
+	rmax = sbuf.buf_len - sbuf.buf_off;
+
+    ssize_t nread = read(p->sd, sbuf.buf + sbuf.buf_off, rmax);
+    if (nread < 0 && errno == EAGAIN)
+	goto out;
+    else if (nread < 0) {
+	btpd_log(BTPD_L_CONN, "Read error (%s) on %p.\n", strerror(errno), p);
+	peer_kill(p);
+	return 0;
+    } else if (nread == 0) {
+	btpd_log(BTPD_L_CONN, "Connection closed by %p.\n", p);
+	peer_kill(p);
+	return 0;
+    }
 
-    hs = calloc(1, sizeof(*hs));
-    hs->incoming = incoming;
-    hs->state = SHAKE_INIT;
+    sbuf.buf_len = sbuf.buf_off + nread;
+    sbuf.buf_off = 0;
+    while (p->state_bytes <= sbuf.buf_len) {
+	ssize_t chomped = net_state_foo(p, &sbuf);
+	if (chomped < 0)
+	    return nread;
+	sbuf.buf += chomped;
+	sbuf.buf_len -= chomped;
+	baggage = 0;
+    }
 
-    hs->in.buf_len = SHAKE_LEN;
-    hs->in.buf_off = 0;
-    hs->in.buf = hs->_io_buf;
+    net_progress(p, sbuf.buf_len - baggage);
 
-    p->reader = (struct input_reader *)hs;
-    hs->rd.read = net_shake_read;
-    hs->rd.kill = kill_shake;
+    if (sbuf.buf_len > 0) {
+	p->net_in = sbuf;
+	p->net_in.buf = btpd_malloc(sbuf.buf_len);
+	bcopy(sbuf.buf, p->net_in.buf, sbuf.buf_len);
+    }
 
-    if (!incoming)
-	peer_send(p, nb_create_shake(p->tp));
+out:
+    event_add(&p->in_ev, NULL);
+    return nread > 0 ? nread : 0;
 }
 
 int
@@ -660,13 +469,13 @@ net_bw_cb(int sd, short type, void *arg)
 	while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL && btpd.ibw_left > 0) {
 	    BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
 	    p->flags &= ~PF_ON_READQ;
-	    btpd.ibw_left -= p->reader->read(p, btpd.ibw_left);
+	    btpd.ibw_left -= net_read(p, btpd.ibw_left);
 	}
     } else {
 	while ((p = BTPDQ_FIRST(&btpd.readq)) != NULL) {
 	    BTPDQ_REMOVE(&btpd.readq, p, rq_entry);
 	    p->flags &= ~PF_ON_READQ;
-	    p->reader->read(p, 0);
+	    net_read(p, 0);
 	}
     }
 
@@ -685,3 +494,36 @@ net_bw_cb(int sd, short type, void *arg)
     }
     event_add(&btpd.bwlim, (& (struct timeval) { 0, 1000000 / btpd.bw_hz }));
 }
+
+void
+net_read_cb(int sd, short type, void *arg)
+{
+    struct peer *p = (struct peer *)arg;
+    if (btpd.ibwlim == 0)
+	net_read(p, 0);
+    else if (btpd.ibw_left > 0)
+	btpd.ibw_left -= net_read(p, btpd.ibw_left);
+    else {
+	p->flags |= PF_ON_READQ;
+	BTPDQ_INSERT_TAIL(&btpd.readq, p, rq_entry);
+    }
+}
+
+void
+net_write_cb(int sd, short type, void *arg)
+{
+    struct peer *p = (struct peer *)arg;
+    if (type == EV_TIMEOUT) {
+	btpd_log(BTPD_L_CONN, "Write attempt timed out.\n");
+	peer_kill(p);
+	return;
+    }
+    if (btpd.obwlim == 0) {
+	net_write(p, 0);
+    } else if (btpd.obw_left > 0) {
+	btpd.obw_left -= net_write(p, btpd.obw_left);
+    } else {
+	p->flags |= PF_ON_WRITEQ;
+	BTPDQ_INSERT_TAIL(&btpd.writeq, p, wq_entry);
+    }
+}
diff --git a/btpd/net.h b/btpd/net.h
index 58ea819..fd288f9 100644
--- a/btpd/net.h
+++ b/btpd/net.h
@@ -13,25 +13,6 @@
 
 #define WRITE_TIMEOUT (& (struct timeval) { 60, 0 })
 
-struct peer;
-
-struct input_reader {
-    unsigned long (*read)(struct peer *, unsigned long);
-    void (*kill)(struct input_reader *);
-};
-
-struct bitfield_reader {
-    struct input_reader rd;
-    struct io_buffer iob;
-};
-
-struct piece_reader {
-    struct input_reader rd;
-    struct io_buffer iob;
-    uint32_t index;
-    uint32_t begin;
-};
-
 #define SHAKE_LEN 68
 
 enum shake_state {
@@ -39,24 +20,14 @@ enum shake_state {
     SHAKE_PSTR,
     SHAKE_RESERVED,
     SHAKE_INFO,
-    SHAKE_ID
-};
-
-struct handshake {
-    struct input_reader rd;
-    enum shake_state state;
-    int incoming;
-    struct io_buffer in;
-    char _io_buf[SHAKE_LEN];
+    SHAKE_ID,
+    NET_MSGSIZE,
+    NET_MSGHEAD,
+    NET_MSGBODY,
+    NET_MSGPIECE
 };
 
-#define MAX_INPUT_LEFT 16
-
-struct generic_reader {
-    struct input_reader rd;
-    struct io_buffer iob;
-    char _io_buf[MAX_INPUT_LEFT];
-};
+void net_set_state(struct peer *p, int state, size_t size);
 
 void net_connection_cb(int sd, short type, void *arg);
 void net_bw_rate(void);
@@ -65,7 +36,6 @@ void net_bw_cb(int sd, short type, void *arg);
 void net_read_cb(int sd, short type, void *arg);
 void net_write_cb(int sd, short type, void *arg);
 
-void net_handshake(struct peer *p, int incoming);
 int net_connect2(struct sockaddr *sa, socklen_t salen, int *sd);
 int net_connect(const char *ip, int port, int *sd);
 
diff --git a/btpd/peer.c b/btpd/peer.c
index 572e2cc..e95a7b5 100644
--- a/btpd/peer.c
+++ b/btpd/peer.c
@@ -22,7 +22,7 @@ peer_kill(struct peer *p)
 {
     struct nb_link *nl;
 
-    btpd_log(BTPD_L_CONN, "killed peer.\n");
+    btpd_log(BTPD_L_CONN, "killed peer %p\n", p);
 
     if (p->flags & PF_ATTACHED)
 	cm_on_lost_peer(p);
@@ -45,7 +45,8 @@ peer_kill(struct peer *p)
 	nl = next;
     }
 
-    p->reader->kill(p->reader);
+    if (p->net_in.buf != NULL)
+	free(p->net_in.buf);
     if (p->piece_field != NULL)
         free(p->piece_field);
     free(p);
@@ -66,7 +67,6 @@ peer_send(struct peer *p, struct net_buf *nb)
     BTPDQ_INSERT_TAIL(&p->outq, nl, entry);
 }
 
-
 /*
  * Remove a network buffer from the peer's outq.
  * If a part of the buffer already have been written
@@ -250,6 +250,8 @@ peer_create_common(int sd)
     BTPDQ_INIT(&p->my_reqs);
     BTPDQ_INIT(&p->outq);
 
+    net_set_state(p, SHAKE_PSTR, 28);
+
     event_set(&p->out_ev, p->sd, EV_WRITE, net_write_cb, p);
     event_set(&p->in_ev, p->sd, EV_READ, net_read_cb, p);
     event_add(&p->in_ev, NULL);
@@ -263,7 +265,7 @@ void
 peer_create_in(int sd)
 {
     struct peer *p = peer_create_common(sd);
-    net_handshake(p, 1);
+    p->flags |= PF_INCOMING;
 }
 
 void
@@ -278,7 +280,7 @@ peer_create_out(struct torrent *tp, const uint8_t *id,
 
     p = peer_create_common(sd);
     p->tp = tp;
-    net_handshake(p, 0);
+    peer_send(p, nb_create_shake(p->tp));
 }
 
 void
@@ -297,7 +299,7 @@ peer_create_out_compact(struct torrent *tp, const char *compact)
 
     p = peer_create_common(sd);
     p->tp = tp;
-    net_handshake(p, 0);
+    peer_send(p, nb_create_shake(p->tp));
 }
 
 void
diff --git a/btpd/peer.h b/btpd/peer.h
index a8eb80d..c92f925 100644
--- a/btpd/peer.h
+++ b/btpd/peer.h
@@ -10,6 +10,7 @@
 #define PF_ATTACHED	 0x40
 #define PF_WRITE_CLOSE	 0x80	/* Close connection after writing all data */
 #define PF_NO_REQUESTS	0x100
+#define PF_INCOMING	0x200
 
 #define RATEHISTORY 20
 #define MAXPIECEMSGS 128
@@ -46,11 +47,13 @@ struct peer {
     struct event in_ev;
     struct event out_ev;
 
-    struct input_reader *reader;
-
     unsigned long rate_to_me[RATEHISTORY];
     unsigned long rate_from_me[RATEHISTORY];
 
+    size_t state_bytes;
+    uint8_t net_state;
+    struct io_buffer net_in;
+
     BTPDQ_ENTRY(peer) cm_entry;
 
     BTPDQ_ENTRY(peer) rq_entry;