about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--btpd/Makefile.am1
-rw-r--r--btpd/btpd.c9
-rw-r--r--btpd/btpd.h6
-rw-r--r--btpd/net.c276
-rw-r--r--btpd/net.h66
-rw-r--r--btpd/net_buf.c234
-rw-r--r--btpd/net_buf.h59
-rw-r--r--btpd/peer.c124
-rw-r--r--btpd/peer.h5
-rw-r--r--btpd/policy_subr.c31
10 files changed, 431 insertions, 380 deletions
diff --git a/btpd/Makefile.am b/btpd/Makefile.am
index b18b432..7135b63 100644
--- a/btpd/Makefile.am
+++ b/btpd/Makefile.am
@@ -3,6 +3,7 @@ btpd_SOURCES=\
 	btpd.c btpd.h\
 	cli_if.c\
 	net.c net.h\
+	net_buf.c net_buf.h\
 	queue.h \
 	peer.c peer.h\
 	policy_choke.c policy_if.c policy_subr.c policy.h\
diff --git a/btpd/btpd.c b/btpd/btpd.c
index 9082648..365573f 100644
--- a/btpd/btpd.c
+++ b/btpd/btpd.c
@@ -145,6 +145,15 @@ btpd_init(void)
 		 "More could be beneficial to the download performance.\n",
 		 nfiles);
     btpd.maxpeers = nfiles - 20;
+
+    btpd.choke_msg = nb_create_choke();
+    nb_hold(btpd.choke_msg);
+    btpd.unchoke_msg = nb_create_unchoke();
+    nb_hold(btpd.unchoke_msg);
+    btpd.interest_msg = nb_create_interest();
+    nb_hold(btpd.interest_msg);
+    btpd.uninterest_msg = nb_create_uninterest();
+    nb_hold(btpd.uninterest_msg);
 }
 
 void
diff --git a/btpd/btpd.h b/btpd/btpd.h
index cd53e76..ef073a5 100644
--- a/btpd/btpd.h
+++ b/btpd/btpd.h
@@ -17,6 +17,7 @@
 #include "benc.h"
 #include "metainfo.h"
 #include "iobuf.h"
+#include "net_buf.h"
 #include "net.h"
 #include "peer.h"
 #include "torrent.h"
@@ -78,6 +79,11 @@ struct btpd {
     struct event sigint;
     struct event sigterm;
     struct event sigchld;
+
+    struct net_buf *choke_msg;
+    struct net_buf *unchoke_msg;
+    struct net_buf *interest_msg;
+    struct net_buf *uninterest_msg;
 };
 
 extern struct btpd btpd;
diff --git a/btpd/net.c b/btpd/net.c
index 51dc120..081cdc4 100644
--- a/btpd/net.c
+++ b/btpd/net.c
@@ -18,8 +18,6 @@
 
 #include "btpd.h"
 
-#define WRITE_TIMEOUT (& (struct timeval) { 60, 0 })
-
 #define min(x, y) ((x) <= (y) ? (x) : (y))
 
 static unsigned long
@@ -70,101 +68,6 @@ net_read32(void *buf)
     return ntohl(*(uint32_t *)buf);
 }
 
-static void
-kill_buf_no(char *buf, size_t len)
-{
-    //Nothing
-}
-
-static void
-kill_buf_free(char *buf, size_t len)
-{
-    free(buf);
-}
-
-int
-nb_drop(struct net_buf *nb)
-{
-    assert(nb->refs > 0);
-    nb->refs--;
-    if (nb->refs == 0) {
-	nb->kill_buf(nb->buf, nb->len);
-	free(nb);
-	return 1;
-    } else
-	return 0;
-}
-
-void
-nb_hold(struct net_buf *nb)
-{
-    nb->refs++;
-}
-
-struct net_buf *
-nb_create_alloc(short type, size_t len)
-{
-    struct net_buf *nb = btpd_calloc(1, sizeof(*nb) + len);
-    nb->type = type;
-    nb->buf = (char *)(nb + 1);
-    nb->len = len;
-    nb->kill_buf = kill_buf_no;
-    return nb;
-}
-
-struct net_buf *
-nb_create_set(short type, char *buf, size_t len,
-    void (*kill_buf)(char *, size_t))
-{
-    struct net_buf *nb = btpd_calloc(1, sizeof(*nb));
-    nb->type = type;
-    nb->buf = buf;
-    nb->len = len;
-    nb->kill_buf = kill_buf;
-    return nb;
-}
-
-uint32_t
-nb_get_index(struct net_buf *nb)
-{
-    switch (nb->type) {
-    case NB_CANCEL:
-    case NB_HAVE:
-    case NB_PIECE:
-    case NB_REQUEST:
-	return net_read32(nb->buf + 5);
-    default:
-	abort();
-    }
-}
-
-uint32_t
-nb_get_begin(struct net_buf *nb)
-{
-    switch (nb->type) {
-    case NB_CANCEL:
-    case NB_PIECE:
-    case NB_REQUEST:
-	return net_read32(nb->buf + 9);
-    default:
-	abort();
-    }
-}
-
-uint32_t
-nb_get_length(struct net_buf *nb)
-{
-    switch (nb->type) {
-    case NB_CANCEL:
-    case NB_REQUEST:
-	return net_read32(nb->buf + 13);
-    case NB_PIECE:
-	return net_read32(nb->buf) - 9;
-    default:
-	abort();
-    }
-}
-
 void
 kill_shake(struct input_reader *reader)
 {
@@ -255,173 +158,6 @@ net_write(struct peer *p, unsigned long wmax)
     return nwritten;
 }
 
-void
-net_send(struct peer *p, struct net_buf *nb)
-{
-    struct nb_link *nl = btpd_calloc(1, sizeof(*nl));
-    nl->nb = nb;
-    nb_hold(nb);
-
-    if (BTPDQ_EMPTY(&p->outq)) {
-	assert(p->outq_off == 0);
-	event_add(&p->out_ev, WRITE_TIMEOUT);
-    }
-    BTPDQ_INSERT_TAIL(&p->outq, nl, entry);
-}
-
-
-/*
- * Remove a network buffer from the peer's outq.
- * If a part of the buffer already have been written
- * to the network it cannot be removed.
- *
- * Returns 1 if the buffer is removed, 0 if not.
- */
-int
-net_unsend(struct peer *p, struct nb_link *nl)
-{
-    if (!(nl == BTPDQ_FIRST(&p->outq) && p->outq_off > 0)) {
-	BTPDQ_REMOVE(&p->outq, nl, entry);
-	nb_drop(nl->nb);
-	free(nl);
-	if (BTPDQ_EMPTY(&p->outq)) {
-	    if (p->flags & PF_ON_WRITEQ) {
-		BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
-		p->flags &= ~PF_ON_WRITEQ;
-	    } else
-		event_del(&p->out_ev);
-	}
-	return 1;
-    } else
-	return 0;
-}
-
-void
-net_send_piece(struct peer *p, uint32_t index, uint32_t begin,
-	       char *block, size_t blen)
-{
-    struct net_buf *head, *piece;
-
-    btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen);
-
-    head = nb_create_alloc(NB_PIECE, 13);
-    net_write32(head->buf, 9 + blen);
-    head->buf[4] = MSG_PIECE;
-    net_write32(head->buf + 5, index);
-    net_write32(head->buf + 9, begin);
-    net_send(p, head);
-
-    piece = nb_create_set(NB_TORRENTDATA, block, blen, kill_buf_free);
-    net_send(p, piece);
-}
-
-void
-net_send_request(struct peer *p, struct piece_req *req)
-{
-    struct net_buf *out = nb_create_alloc(NB_REQUEST, 17);
-    net_write32(out->buf, 13);
-    out->buf[4] = MSG_REQUEST;
-    net_write32(out->buf + 5, req->index);
-    net_write32(out->buf + 9, req->begin);
-    net_write32(out->buf + 13, req->length);
-    net_send(p, out);
-}
-
-void
-net_send_cancel(struct peer *p, struct piece_req *req)
-{
-    struct net_buf *out = nb_create_alloc(NB_CANCEL, 17);
-    net_write32(out->buf, 13);
-    out->buf[4] = MSG_CANCEL;
-    net_write32(out->buf + 5, req->index);
-    net_write32(out->buf + 9, req->begin);
-    net_write32(out->buf + 13, req->length);
-    net_send(p, out);
-}
-
-void
-net_send_have(struct peer *p, uint32_t index)
-{
-    struct net_buf *out = nb_create_alloc(NB_HAVE, 9);
-    net_write32(out->buf, 5);
-    out->buf[4] = MSG_HAVE;
-    net_write32(out->buf + 5, index);
-    net_send(p, out);
-}
-
-void
-net_send_multihave(struct peer *p)
-{
-    struct torrent *tp = p->tp;
-    struct net_buf *out = nb_create_alloc(NB_MULTIHAVE, 9 * tp->have_npieces);
-    for (uint32_t i = 0, count = 0; count < tp->have_npieces; i++) {
-	if (has_bit(tp->piece_field, i)) {
-	    net_write32(out->buf + count * 9, 5);
-	    out->buf[count * 9 + 4] = MSG_HAVE;
-	    net_write32(out->buf + count * 9 + 5, i);
-	    count++;
-	}
-    }
-    net_send(p, out);
-}
-
-void
-net_send_onesized(struct peer *p, char mtype, int btype)
-{
-    struct net_buf *out = nb_create_alloc(btype, 5);
-    net_write32(out->buf, 1);
-    out->buf[4] = mtype;
-    net_send(p, out);    
-}
-
-void
-net_send_unchoke(struct peer *p)
-{
-    net_send_onesized(p, MSG_UNCHOKE, NB_UNCHOKE);
-}
-
-void
-net_send_choke(struct peer *p)
-{
-    net_send_onesized(p, MSG_CHOKE, NB_CHOKE);
-}
-
-void
-net_send_uninterest(struct peer *p)
-{
-    net_send_onesized(p, MSG_UNINTEREST, NB_UNINTEREST);
-}
-
-void
-net_send_interest(struct peer *p)
-{
-    net_send_onesized(p, MSG_INTEREST, NB_INTEREST);
-}
-
-void
-net_send_bitfield(struct peer *p)
-{
-    uint32_t plen = ceil(p->tp->meta.npieces / 8.0);
-
-    struct net_buf *out = nb_create_alloc(NB_BITFIELD, 5);
-    net_write32(out->buf, plen + 1);
-    out->buf[4] = MSG_BITFIELD;
-    net_send(p, out);
-    
-    out = nb_create_set(NB_BITDATA, p->tp->piece_field, plen, kill_buf_no);
-    net_send(p, out);
-}
-
-void
-net_send_shake(struct peer *p)
-{
-    struct net_buf *out = nb_create_alloc(NB_SHAKE, 68);
-    bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->buf, 28);
-    bcopy(p->tp->meta.info_hash, out->buf + 28, 20);
-    bcopy(btpd.peer_id, out->buf + 48, 20);
-    net_send(p, out);
-}
-
 static void
 kill_generic(struct input_reader *reader)
 {
@@ -761,7 +497,7 @@ net_shake_read(struct peer *p, unsigned long rmax)
 	    if (tp != NULL) {
 		hs->state = SHAKE_INFO;
 		p->tp = tp;
-		net_send_shake(p);
+		peer_send(p, nb_create_shake(p->tp));
 	    } else
 		goto bad_shake;
 	} else {
@@ -791,9 +527,11 @@ net_shake_read(struct peer *p, unsigned long rmax)
 	net_generic_reader(p);
 	if (p->tp->have_npieces > 0) {
 	    if (p->tp->have_npieces * 9 < 5 + ceil(p->tp->meta.npieces / 8.0))
-		net_send_multihave(p);
-	    else
-		net_send_bitfield(p);
+		peer_send(p, nb_create_multihave(p->tp));
+	    else {
+		peer_send(p, nb_create_bitfield(p->tp));
+		peer_send(p, nb_create_bitdata(p->tp));
+	    }
 	}
 	cm_on_new_peer(p);
     } else
@@ -826,7 +564,7 @@ net_handshake(struct peer *p, int incoming)
     hs->rd.kill = kill_shake;
 
     if (!incoming)
-	net_send_shake(p);
+	peer_send(p, nb_create_shake(p->tp));
 }
 
 int
diff --git a/btpd/net.h b/btpd/net.h
index 9fe7b30..58ea819 100644
--- a/btpd/net.h
+++ b/btpd/net.h
@@ -11,43 +11,7 @@
 #define MSG_PIECE	7
 #define MSG_CANCEL	8
 
-#define NB_CHOKE	0
-#define NB_UNCHOKE	1
-#define NB_INTEREST	2
-#define NB_UNINTEREST	3
-#define NB_HAVE		4
-#define NB_BITFIELD	5
-#define NB_REQUEST	6
-#define NB_PIECE	7
-#define NB_CANCEL	8
-#define NB_TORRENTDATA	10
-#define NB_MULTIHAVE	11
-#define NB_BITDATA	12
-#define NB_SHAKE	13
-
-struct net_buf {
-    short type;
-    unsigned refs;
-    char *buf;
-    size_t len;
-    void (*kill_buf)(char *, size_t);
-};
-
-struct nb_link {
-    struct net_buf *nb;
-    BTPDQ_ENTRY(nb_link) entry;
-};
-
-BTPDQ_HEAD(nb_tq, nb_link);
-
-struct net_buf *nb_create_alloc(short type, size_t len);
-struct net_buf *nb_create_set(short type, char *buf, size_t len,
-    void (*kill_buf)(char *, size_t));
-int nb_drop(struct net_buf *nb);
-void nb_hold(struct net_buf *nb);
-uint32_t nb_get_index(struct net_buf *nb);
-uint32_t nb_get_begin(struct net_buf *nb);
-uint32_t nb_get_length(struct net_buf *nb);
+#define WRITE_TIMEOUT (& (struct timeval) { 60, 0 })
 
 struct peer;
 
@@ -94,36 +58,18 @@ struct generic_reader {
     char _io_buf[MAX_INPUT_LEFT];
 };
 
-struct piece_req {
-    uint32_t index, begin, length;
-    struct iob_link *head; /* Pointer to outgoing piece. */
-    BTPDQ_ENTRY(piece_req) entry;
-};
-
-BTPDQ_HEAD(piece_req_tq, piece_req);
-
 void net_connection_cb(int sd, short type, void *arg);
 void net_bw_rate(void);
 void net_bw_cb(int sd, short type, void *arg);
 
-struct peer;
-
-void net_send_uninterest(struct peer *p);
-void net_send_interest(struct peer *p);
-void net_send_unchoke(struct peer *p);
-void net_send_choke(struct peer *p);
-
-void net_send_have(struct peer *p, uint32_t index);
-void net_send_request(struct peer *p, struct piece_req *req);
-void net_send_piece(struct peer *p, uint32_t index, uint32_t begin,
-    char *block, size_t blen);
-void net_send_cancel(struct peer *p, struct piece_req *req);
-int net_unsend(struct peer *p, struct nb_link *nl);
-void net_handshake(struct peer *p, int incoming);
-
 void net_read_cb(int sd, short type, void *arg);
 void net_write_cb(int sd, short type, void *arg);
+
+void net_handshake(struct peer *p, int incoming);
 int net_connect2(struct sockaddr *sa, socklen_t salen, int *sd);
 int net_connect(const char *ip, int port, int *sd);
 
+void net_write32(void *buf, uint32_t num);
+uint32_t net_read32(void *buf);
+
 #endif
diff --git a/btpd/net_buf.c b/btpd/net_buf.c
new file mode 100644
index 0000000..d39f393
--- /dev/null
+++ b/btpd/net_buf.c
@@ -0,0 +1,234 @@
+#include <math.h>
+#include <string.h>
+
+#include "btpd.h"
+
+static void
+kill_buf_no(char *buf, size_t len)
+{
+
+}
+
+static void
+kill_buf_free(char *buf, size_t len)
+{
+    free(buf);
+}
+
+static struct net_buf *
+nb_create_alloc(short type, size_t len)
+{
+    struct net_buf *nb = btpd_calloc(1, sizeof(*nb) + len);
+    nb->type = type;
+    nb->buf = (char *)(nb + 1);
+    nb->len = len;
+    nb->kill_buf = kill_buf_no;
+    return nb;
+}
+
+static struct net_buf *
+nb_create_set(short type, char *buf, size_t len,
+    void (*kill_buf)(char *, size_t))
+{
+    struct net_buf *nb = btpd_calloc(1, sizeof(*nb));
+    nb->type = type;
+    nb->buf = buf;
+    nb->len = len;
+    nb->kill_buf = kill_buf;
+    return nb;
+}
+
+static struct net_buf *
+nb_create_onesized(char mtype, int btype)
+{
+    struct net_buf *out = nb_create_alloc(btype, 5);
+    net_write32(out->buf, 1);
+    out->buf[4] = mtype;
+    return out;
+}
+
+struct net_buf *
+nb_create_piece(uint32_t index, uint32_t begin, size_t blen)
+{
+    struct net_buf *out;
+
+    btpd_log(BTPD_L_MSG, "send piece: %u, %u, %u\n", index, begin, blen);
+
+    out = nb_create_alloc(NB_PIECE, 13);
+    net_write32(out->buf, 9 + blen);
+    out->buf[4] = MSG_PIECE;
+    net_write32(out->buf + 5, index);
+    net_write32(out->buf + 9, begin);
+    return out;
+}
+
+struct net_buf *
+nb_create_torrentdata(char *block, size_t blen)
+{
+    struct net_buf *out;
+    out = nb_create_set(NB_TORRENTDATA, block, blen, kill_buf_free);
+    return out;
+}
+
+struct net_buf *
+nb_create_request(uint32_t index, uint32_t begin, uint32_t length)
+{
+    struct net_buf *out = nb_create_alloc(NB_REQUEST, 17);
+    net_write32(out->buf, 13);
+    out->buf[4] = MSG_REQUEST;
+    net_write32(out->buf + 5, index);
+    net_write32(out->buf + 9, begin);
+    net_write32(out->buf + 13, length);
+    return out;
+}
+
+struct net_buf *
+nb_create_cancel(uint32_t index, uint32_t begin, uint32_t length)
+{
+    struct net_buf *out = nb_create_alloc(NB_CANCEL, 17);
+    net_write32(out->buf, 13);
+    out->buf[4] = MSG_CANCEL;
+    net_write32(out->buf + 5, index);
+    net_write32(out->buf + 9, begin);
+    net_write32(out->buf + 13, length);
+    return out;
+}
+
+struct net_buf *
+nb_create_have(uint32_t index)
+{
+    struct net_buf *out = nb_create_alloc(NB_HAVE, 9);
+    net_write32(out->buf, 5);
+    out->buf[4] = MSG_HAVE;
+    net_write32(out->buf + 5, index);
+    return out;
+}
+
+struct net_buf *
+nb_create_multihave(struct torrent *tp)
+{
+    struct net_buf *out = nb_create_alloc(NB_MULTIHAVE, 9 * tp->have_npieces);
+    for (uint32_t i = 0, count = 0; count < tp->have_npieces; i++) {
+	if (has_bit(tp->piece_field, i)) {
+	    net_write32(out->buf + count * 9, 5);
+	    out->buf[count * 9 + 4] = MSG_HAVE;
+	    net_write32(out->buf + count * 9 + 5, i);
+	    count++;
+	}
+    }
+    return out;
+}
+
+struct net_buf *
+nb_create_unchoke(void)
+{
+    return nb_create_onesized(MSG_UNCHOKE, NB_UNCHOKE);
+}
+
+struct net_buf *
+nb_create_choke(void)
+{
+    return nb_create_onesized(MSG_CHOKE, NB_CHOKE);
+}
+
+struct net_buf *
+nb_create_uninterest(void)
+{
+    return nb_create_onesized(MSG_UNINTEREST, NB_UNINTEREST);
+}
+
+struct net_buf *
+nb_create_interest(void)
+{
+    return nb_create_onesized(MSG_INTEREST, NB_INTEREST);
+}
+
+struct net_buf *
+nb_create_bitfield(struct torrent *tp)
+{
+    uint32_t plen = ceil(tp->meta.npieces / 8.0);
+
+    struct net_buf *out = nb_create_alloc(NB_BITFIELD, 5);
+    net_write32(out->buf, plen + 1);
+    out->buf[4] = MSG_BITFIELD;
+    return out;
+}
+
+struct net_buf *
+nb_create_bitdata(struct torrent *tp)
+{
+    uint32_t plen = ceil(tp->meta.npieces / 8.0);
+    struct net_buf *out =
+	nb_create_set(NB_BITDATA, tp->piece_field, plen, kill_buf_no);
+    return out;
+}
+
+struct net_buf *
+nb_create_shake(struct torrent *tp)
+{
+    struct net_buf *out = nb_create_alloc(NB_SHAKE, 68);
+    bcopy("\x13""BitTorrent protocol\0\0\0\0\0\0\0\0", out->buf, 28);
+    bcopy(tp->meta.info_hash, out->buf + 28, 20);
+    bcopy(btpd.peer_id, out->buf + 48, 20);
+    return out;
+}
+
+uint32_t
+nb_get_index(struct net_buf *nb)
+{
+    switch (nb->type) {
+    case NB_CANCEL:
+    case NB_HAVE:
+    case NB_PIECE:
+    case NB_REQUEST:
+	return net_read32(nb->buf + 5);
+    default:
+	abort();
+    }
+}
+
+uint32_t
+nb_get_begin(struct net_buf *nb)
+{
+    switch (nb->type) {
+    case NB_CANCEL:
+    case NB_PIECE:
+    case NB_REQUEST:
+	return net_read32(nb->buf + 9);
+    default:
+	abort();
+    }
+}
+
+uint32_t
+nb_get_length(struct net_buf *nb)
+{
+    switch (nb->type) {
+    case NB_CANCEL:
+    case NB_REQUEST:
+	return net_read32(nb->buf + 13);
+    case NB_PIECE:
+	return net_read32(nb->buf) - 9;
+    default:
+	abort();
+    }
+}
+
+int
+nb_drop(struct net_buf *nb)
+{
+    assert(nb->refs > 0);
+    nb->refs--;
+    if (nb->refs == 0) {
+	nb->kill_buf(nb->buf, nb->len);
+	free(nb);
+	return 1;
+    } else
+	return 0;
+}
+
+void
+nb_hold(struct net_buf *nb)
+{
+    nb->refs++;
+}
diff --git a/btpd/net_buf.h b/btpd/net_buf.h
new file mode 100644
index 0000000..7082c88
--- /dev/null
+++ b/btpd/net_buf.h
@@ -0,0 +1,59 @@
+#ifndef BTPD_NET_BUF_H
+#define BTPD_NET_BUF_H
+
+#define NB_CHOKE	0
+#define NB_UNCHOKE	1
+#define NB_INTEREST	2
+#define NB_UNINTEREST	3
+#define NB_HAVE		4
+#define NB_BITFIELD	5
+#define NB_REQUEST	6
+#define NB_PIECE	7
+#define NB_CANCEL	8
+#define NB_TORRENTDATA	10
+#define NB_MULTIHAVE	11
+#define NB_BITDATA	12
+#define NB_SHAKE	13
+
+struct net_buf {
+    short type;
+    unsigned refs;
+    char *buf;
+    size_t len;
+    void (*kill_buf)(char *, size_t);
+};
+
+struct nb_link {
+    struct net_buf *nb;
+    BTPDQ_ENTRY(nb_link) entry;
+};
+
+BTPDQ_HEAD(nb_tq, nb_link);
+
+struct torrent;
+struct peer;
+
+struct net_buf *nb_create_piece(uint32_t index, uint32_t begin, size_t blen);
+struct net_buf *nb_create_torrentdata(char *block, size_t blen);
+struct net_buf *nb_create_request(uint32_t index,
+    uint32_t begin, uint32_t length);
+struct net_buf *nb_create_cancel(uint32_t index,
+    uint32_t begin, uint32_t length);
+struct net_buf *nb_create_have(uint32_t index);
+struct net_buf *nb_create_multihave(struct torrent *tp);
+struct net_buf *nb_create_unchoke(void);
+struct net_buf *nb_create_choke(void);
+struct net_buf *nb_create_uninterest(void);
+struct net_buf *nb_create_interest(void);
+struct net_buf *nb_create_bitfield(struct torrent *tp);
+struct net_buf *nb_create_bitdata(struct torrent *tp);
+struct net_buf *nb_create_shake(struct torrent *tp);
+
+int nb_drop(struct net_buf *nb);
+void nb_hold(struct net_buf *nb);
+
+uint32_t nb_get_index(struct net_buf *nb);
+uint32_t nb_get_begin(struct net_buf *nb);
+uint32_t nb_get_length(struct net_buf *nb);
+
+#endif
diff --git a/btpd/peer.c b/btpd/peer.c
index 7126bdc..6f1eb3d 100644
--- a/btpd/peer.c
+++ b/btpd/peer.c
@@ -21,7 +21,6 @@ void
 peer_kill(struct peer *p)
 {
     struct nb_link *nl;
-    struct piece_req *req;
 
     btpd_log(BTPD_L_CONN, "killed peer.\n");
 
@@ -45,11 +44,12 @@ peer_kill(struct peer *p)
 	free(nl);
 	nl = next;
     }
-    req = BTPDQ_FIRST(&p->my_reqs);
-    while (req != NULL) {
-	struct piece_req *next = BTPDQ_NEXT(req, entry);
-	free(req);
-	req = next;
+    nl = BTPDQ_FIRST(&p->my_reqs);
+    while (nl != NULL) {
+	struct nb_link *next = BTPDQ_NEXT(nl, entry);
+	nb_drop(nl->nb);
+	free(nl);
+	nl = next;
     }
 
     p->reader->kill(p->reader);
@@ -60,31 +60,81 @@ peer_kill(struct peer *p)
 }
 
 void
+peer_send(struct peer *p, struct net_buf *nb)
+{
+    struct nb_link *nl = btpd_calloc(1, sizeof(*nl));
+    nl->nb = nb;
+    nb_hold(nb);
+
+    if (BTPDQ_EMPTY(&p->outq)) {
+	assert(p->outq_off == 0);
+	event_add(&p->out_ev, WRITE_TIMEOUT);
+    }
+    BTPDQ_INSERT_TAIL(&p->outq, nl, entry);
+}
+
+
+/*
+ * Remove a network buffer from the peer's outq.
+ * If a part of the buffer already have been written
+ * to the network it cannot be removed.
+ *
+ * Returns 1 if the buffer is removed, 0 if not.
+ */
+int
+peer_unsend(struct peer *p, struct nb_link *nl)
+{
+    if (!(nl == BTPDQ_FIRST(&p->outq) && p->outq_off > 0)) {
+	BTPDQ_REMOVE(&p->outq, nl, entry);
+	nb_drop(nl->nb);
+	free(nl);
+	if (BTPDQ_EMPTY(&p->outq)) {
+	    if (p->flags & PF_ON_WRITEQ) {
+		BTPDQ_REMOVE(&btpd.writeq, p, wq_entry);
+		p->flags &= ~PF_ON_WRITEQ;
+	    } else
+		event_del(&p->out_ev);
+	}
+	return 1;
+    } else
+	return 0;
+}
+
+void
 peer_request(struct peer *p, uint32_t index, uint32_t begin, uint32_t len)
 {
     if (p->tp->endgame == 0)
 	assert(p->nreqs_out < MAXPIPEDREQUESTS);
     p->nreqs_out++;
-    struct piece_req *req = btpd_calloc(1, sizeof(*req));
-    req->index = index;
-    req->begin = begin;
-    req->length = len;
-    BTPDQ_INSERT_TAIL(&p->my_reqs, req, entry);
-    net_send_request(p, req);
+    struct net_buf *nb = nb_create_request(index, begin, len);
+    struct nb_link *nl = btpd_calloc(1, sizeof(*nl));
+    nl->nb = nb;
+    nb_hold(nb);
+    BTPDQ_INSERT_TAIL(&p->my_reqs, nl, entry);
+    peer_send(p, nb);
 }
 
 void
 peer_cancel(struct peer *p, uint32_t index, uint32_t begin, uint32_t len)
 {
-    struct piece_req *req;
+    struct net_buf *nb = NULL;
+    struct nb_link *nl;
 again:
-    BTPDQ_FOREACH(req, &p->my_reqs, entry)
-	if (index == req->index && begin == req->begin && len == req->length)
+    BTPDQ_FOREACH(nl, &p->my_reqs, entry) {
+	int match = nb_get_begin(nl->nb) == begin
+	    && nb_get_index(nl->nb) == index
+	    && nb_get_length(nl->nb) == len;
+	if (match)
 	    break;
-    if (req != NULL) {
-	net_send_cancel(p, req);
-	BTPDQ_REMOVE(&p->my_reqs, req, entry);
-	free(req);
+    }
+    if (nl != NULL) {
+	if (nb == NULL) {
+	    nb =  nb_create_cancel(index, begin, len);
+	    peer_send(p, nb);
+	}
+	BTPDQ_REMOVE(&p->my_reqs, nl, entry);
+	nb_drop(nl->nb);
+	free(nl);
 	p->nreqs_out--;
 	goto again;
     }
@@ -93,14 +143,14 @@ again:
 void
 peer_have(struct peer *p, uint32_t index)
 {
-    net_send_have(p, index);
+    peer_send(p, nb_create_have(index));
 }
 
 void
 peer_unchoke(struct peer *p)
 {
     p->flags &= ~PF_I_CHOKE;
-    net_send_unchoke(p);
+    peer_send(p, btpd.unchoke_msg);
 }
 
 void
@@ -112,14 +162,14 @@ peer_choke(struct peer *p)
 	if (nl->nb->type == NB_PIECE) {
 	    struct nb_link *data = next;
 	    next = BTPDQ_NEXT(next, entry);
-	    if (net_unsend(p, nl))
-		net_unsend(p, data);
+	    if (peer_unsend(p, nl))
+		peer_unsend(p, data);
 	}
 	nl = next;
     }
 
     p->flags |= PF_I_CHOKE;
-    net_send_choke(p);    
+    peer_send(p, btpd.choke_msg);
 }
 
 void
@@ -129,7 +179,7 @@ peer_want(struct peer *p, uint32_t index)
     p->nwant++;
     if (p->nwant == 1) {
 	p->flags |= PF_I_WANT;
-	net_send_interest(p);
+	peer_send(p, btpd.interest_msg);
     }
 }
 
@@ -140,7 +190,7 @@ peer_unwant(struct peer *p, uint32_t index)
     p->nwant--;
     if (p->nwant == 0) {
 	p->flags &= ~PF_I_WANT;
-	net_send_uninterest(p);
+	peer_send(p, btpd.uninterest_msg);
     }
 }
 
@@ -275,16 +325,17 @@ void
 peer_on_piece(struct peer *p, uint32_t index, uint32_t begin,
     uint32_t length, const char *data)
 {
-    struct piece_req *req = BTPDQ_FIRST(&p->my_reqs);
-    if (req != NULL &&
-	req->index == index &&
-	req->begin == begin &&
-	req->length == length) {
+    struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
+    if (nl != NULL &&
+	nb_get_begin(nl->nb) == begin &&
+	nb_get_index(nl->nb) == index &&
+	nb_get_length(nl->nb) == length) {
 
 	assert(p->nreqs_out > 0);
 	p->nreqs_out--;
-	BTPDQ_REMOVE(&p->my_reqs, req, entry);
-	free(req);
+	BTPDQ_REMOVE(&p->my_reqs, nl, entry);
+	nb_drop(nl->nb);
+	free(nl);
 	
 	cm_on_block(p, index, begin, length, data);
     }
@@ -296,7 +347,8 @@ peer_on_request(struct peer *p, uint32_t index, uint32_t begin,
 {
     off_t cbegin = index * p->tp->meta.piece_length + begin;
     char * content = torrent_get_bytes(p->tp, cbegin, length);
-    net_send_piece(p, index, begin, content, length);
+    peer_send(p, nb_create_piece(index, begin, length));
+    peer_send(p, nb_create_torrentdata(content, length));
 }
 
 void
@@ -310,8 +362,8 @@ peer_on_cancel(struct peer *p, uint32_t index, uint32_t begin,
 	    && nb_get_index(nl->nb) == index
 	    && nb_get_length(nl->nb) == length) {
 	    struct nb_link *data = BTPDQ_NEXT(nl, entry);
-	    if (net_unsend(p, nl))
-		net_unsend(p, data);
+	    if (peer_unsend(p, nl))
+		peer_unsend(p, data);
 	    break;
 	}
 }
diff --git a/btpd/peer.h b/btpd/peer.h
index 66cbfd2..dd9db0f 100644
--- a/btpd/peer.h
+++ b/btpd/peer.h
@@ -25,7 +25,7 @@ struct peer {
 
     struct torrent *tp;
 
-    struct piece_req_tq my_reqs;
+    struct nb_tq my_reqs;
 
     unsigned nreqs_out;
 
@@ -48,6 +48,9 @@ struct peer {
 
 BTPDQ_HEAD(peer_tq, peer);
 
+void peer_send(struct peer *p, struct net_buf *nb);
+int peer_unsend(struct peer *p, struct nb_link *nl);
+
 void peer_unchoke(struct peer *p);
 void peer_choke(struct peer *p);
 void peer_unwant(struct peer *p, uint32_t index);
diff --git a/btpd/policy_subr.c b/btpd/policy_subr.c
index 8f8e88e..142abf8 100644
--- a/btpd/policy_subr.c
+++ b/btpd/policy_subr.c
@@ -396,20 +396,22 @@ cm_unassign_requests(struct peer *p)
     while (pc != NULL) {
 	int was_full = piece_full(pc);
 
-	struct piece_req *req = BTPDQ_FIRST(&p->my_reqs);
-	while (req != NULL) {
-	    struct piece_req *next = BTPDQ_NEXT(req, entry);
+	struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
+	while (nl != NULL) {
+	    struct nb_link *next = BTPDQ_NEXT(nl, entry);
 
-	    if (pc->index == req->index) {
+	    if (pc->index == nb_get_index(nl->nb)) {
+		uint32_t block = nb_get_begin(nl->nb) / PIECE_BLOCKLEN;
 		// XXX: Needs to be looked at if we introduce snubbing.
-		assert(has_bit(pc->down_field, req->begin / PIECE_BLOCKLEN));
-		clear_bit(pc->down_field, req->begin / PIECE_BLOCKLEN);
+		assert(has_bit(pc->down_field, block));
+		clear_bit(pc->down_field, block);
 		pc->nbusy--;
-		BTPDQ_REMOVE(&p->my_reqs, req, entry);
-		free(req);
+		BTPDQ_REMOVE(&p->my_reqs, nl, entry);
+		nb_drop(nl->nb);
+		free(nl);
 	    }
 	    
-	    req = next;
+	    nl = next;
 	}
 	
 	if (was_full && !piece_full(pc))
@@ -449,11 +451,12 @@ cm_assign_requests_eg(struct peer *p)
 void
 cm_unassign_requests_eg(struct peer *p)
 {
-    struct piece_req *req = BTPDQ_FIRST(&p->my_reqs);
-    while (req != NULL) {
-	struct piece_req *next = BTPDQ_NEXT(req, entry);
-	free(req);
-	req = next;
+    struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
+    while (nl != NULL) {
+	struct nb_link *next = BTPDQ_NEXT(nl, entry);
+	nb_drop(nl->nb);
+	free(nl);
+	nl = next;
     }
     BTPDQ_INIT(&p->my_reqs);
     p->nreqs_out = 0;