about summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard Nyberg <rnyberg@murmeldjur.se>2005-09-20 19:24:11 +0000
committerRichard Nyberg <rnyberg@murmeldjur.se>2005-09-20 19:24:11 +0000
commitf31e2d8b897cee6006a1494c4a195caa8c2f2d7e (patch)
tree1b25afd0826a8c6413a95098098e3ef1ae4d02f3
parentd8720e889c6f4e96681d6bc494db6171742e715a (diff)
downloadbtpd-f31e2d8b897cee6006a1494c4a195caa8c2f2d7e.tar.gz
btpd-f31e2d8b897cee6006a1494c4a195caa8c2f2d7e.zip
* Allocate request messages on piece creation. The request objects can
  be shared by several peers. At least in end game.
* Link blocks with the peers we are loading them from and vice versa.
* Limit the number of requests / peer in end game too.
* Improve end game by using some sort of round robin for block requests.

-rw-r--r--btpd/peer.c83
-rw-r--r--btpd/peer.h19
-rw-r--r--btpd/policy.h6
-rw-r--r--btpd/policy_if.c51
-rw-r--r--btpd/policy_subr.c243
-rw-r--r--btpd/torrent.h13
6 files changed, 278 insertions, 137 deletions
diff --git a/btpd/peer.c b/btpd/peer.c
index fdd79ee..71bcd64 100644
--- a/btpd/peer.c
+++ b/btpd/peer.c
@@ -44,13 +44,6 @@ peer_kill(struct peer *p)
 	free(nl);
 	nl = next;
     }
-    nl = BTPDQ_FIRST(&p->my_reqs);
-    while (nl != NULL) {
-	struct nb_link *next = BTPDQ_NEXT(nl, entry);
-	nb_drop(nl->nb);
-	free(nl);
-	nl = next;
-    }
 
     p->reader->kill(p->reader);
     if (p->piece_field != NULL)
@@ -119,49 +112,40 @@ peer_sent(struct peer *p, struct net_buf *nb)
 }
 
 void
-peer_request(struct peer *p, uint32_t index, uint32_t begin, uint32_t len)
+peer_request(struct peer *p, struct block_request *req)
 {
-    if (p->tp->endgame == 0)
-	assert(p->nreqs_out < MAXPIPEDREQUESTS);
+    assert(p->nreqs_out < MAXPIPEDREQUESTS);
     p->nreqs_out++;
-    struct net_buf *nb = nb_create_request(index, begin, len);
-    struct nb_link *nl = btpd_calloc(1, sizeof(*nl));
-    nl->nb = nb;
-    nb_hold(nb);
-    BTPDQ_INSERT_TAIL(&p->my_reqs, nl, entry);
-    peer_send(p, nb);
+    BTPDQ_INSERT_TAIL(&p->my_reqs, req, p_entry);
+    peer_send(p, req->blk->msg);
+}
+
+int
+peer_requested(struct peer *p, struct block *blk)
+{
+    struct block_request *req;
+    BTPDQ_FOREACH(req, &p->my_reqs, p_entry)
+	if (req->blk == blk)
+	    return 1;
+    return 0;
 }
 
 void
-peer_cancel(struct peer *p, uint32_t index, uint32_t begin, uint32_t len)
+peer_cancel(struct peer *p, struct block_request *req, struct net_buf *nb)
 {
-    struct net_buf *nb = NULL;
+    BTPDQ_REMOVE(&p->my_reqs, req, p_entry);
+    p->nreqs_out--;
+
+    int removed = 0;
     struct nb_link *nl;
-again:
-    BTPDQ_FOREACH(nl, &p->my_reqs, entry) {
-	int match = nb_get_begin(nl->nb) == begin
-	    && nb_get_index(nl->nb) == index
-	    && nb_get_length(nl->nb) == len;
-	if (match)
+    BTPDQ_FOREACH(nl, &p->outq, entry) {
+	if (nl->nb == req->blk->msg) {
+	    removed = peer_unsend(p, nl);
 	    break;
-    }
-    if (nl != NULL) {
-	if (nb == NULL) {
-	    nb =  nb_create_cancel(index, begin, len);
-	    peer_send(p, nb);
 	}
-	BTPDQ_REMOVE(&p->my_reqs, nl, entry);
-	nb_drop(nl->nb);
-	free(nl);
-	p->nreqs_out--;
-	goto again;
     }
-}
-
-void
-peer_have(struct peer *p, uint32_t index)
-{
-    peer_send(p, nb_create_have(index));
+    if (!removed)
+	peer_send(p, nb);
 }
 
 void
@@ -343,19 +327,18 @@ void
 peer_on_piece(struct peer *p, uint32_t index, uint32_t begin,
     uint32_t length, const char *data)
 {
-    struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
-    if (nl != NULL &&
-	nb_get_begin(nl->nb) == begin &&
-	nb_get_index(nl->nb) == index &&
-	nb_get_length(nl->nb) == length) {
+    struct block_request *req = BTPDQ_FIRST(&p->my_reqs);
+    if (req == NULL)
+	return;
+    struct net_buf *nb = req->blk->msg;
+    if (nb_get_begin(nb) == begin &&
+	nb_get_index(nb) == index &&
+	nb_get_length(nb) == length) {
 
 	assert(p->nreqs_out > 0);
 	p->nreqs_out--;
-	BTPDQ_REMOVE(&p->my_reqs, nl, entry);
-	nb_drop(nl->nb);
-	free(nl);
-	
-	cm_on_block(p, index, begin, length, data);
+	BTPDQ_REMOVE(&p->my_reqs, req, p_entry);
+	cm_on_block(p, req, index, begin, length, data);
     }
 }
 
diff --git a/btpd/peer.h b/btpd/peer.h
index 675c9a8..a8eb80d 100644
--- a/btpd/peer.h
+++ b/btpd/peer.h
@@ -15,6 +15,15 @@
 #define MAXPIECEMSGS 128
 #define MAXPIPEDREQUESTS 10
 
+struct block_request {
+    struct peer *p;
+    struct block *blk;
+    BTPDQ_ENTRY(block_request) p_entry;
+    BTPDQ_ENTRY(block_request) blk_entry;
+};
+
+BTPDQ_HEAD(block_request_tq, block_request);
+
 struct peer {
     int sd;
     uint16_t flags;
@@ -26,7 +35,7 @@ struct peer {
 
     struct torrent *tp;
 
-    struct nb_tq my_reqs;
+    struct block_request_tq my_reqs;
 
     unsigned nreqs_out;
     unsigned npiece_msgs;
@@ -58,11 +67,11 @@ void peer_unchoke(struct peer *p);
 void peer_choke(struct peer *p);
 void peer_unwant(struct peer *p, uint32_t index);
 void peer_want(struct peer *p, uint32_t index);
-void peer_request(struct peer *p, uint32_t index,
-    uint32_t begin, uint32_t len);
-void peer_cancel(struct peer *p, uint32_t index, uint32_t begin, uint32_t len);
+void peer_request(struct peer *p, struct block_request *req);
+void peer_cancel(struct peer *p, struct block_request *req,
+    struct net_buf *nb);
 
-void peer_have(struct peer *p, uint32_t index);
+int peer_requested(struct peer *p, struct block *blk);
 
 unsigned long peer_get_rate(unsigned long *rates);
 
diff --git a/btpd/policy.h b/btpd/policy.h
index 76369a0..c039644 100644
--- a/btpd/policy.h
+++ b/btpd/policy.h
@@ -17,11 +17,11 @@ void cm_on_piece(struct piece *pc);
 struct piece *cm_new_piece(struct torrent *tp, uint32_t index);
 struct piece *cm_find_piece(struct torrent *tp, uint32_t index);
 unsigned cm_piece_assign_requests(struct piece *pc, struct peer *p);
-void cm_piece_assign_requests_eg(struct piece *pc, struct peer *p);
 unsigned  cm_assign_requests(struct peer *p);
 void cm_assign_requests_eg(struct peer *p);
 void cm_unassign_requests(struct peer *p);
 void cm_unassign_requests_eg(struct peer *p);
+void cm_piece_reorder_eg(struct piece *pc);
 
 // policy_if.c
 
@@ -39,8 +39,8 @@ void cm_on_uninterest(struct peer *p);
 void cm_on_download(struct peer *p);
 void cm_on_undownload(struct peer *p);
 void cm_on_piece_ann(struct peer *p, uint32_t index);
-void cm_on_block(struct peer *p, uint32_t index, uint32_t begin,
-    uint32_t length, const char *data);
+void cm_on_block(struct peer *p, struct block_request *req,
+    uint32_t index, uint32_t begin, uint32_t length, const char *data);
 
 void cm_on_ok_piece(struct piece *pc);
 void cm_on_bad_piece(struct piece *pc);
diff --git a/btpd/policy_if.c b/btpd/policy_if.c
index f1b2e44..2b49357 100644
--- a/btpd/policy_if.c
+++ b/btpd/policy_if.c
@@ -40,11 +40,10 @@ cm_on_piece_ann(struct peer *p, uint32_t index)
 	return;
     struct piece *pc = cm_find_piece(tp, index);
     if (tp->endgame) {
-	if (pc != NULL) {
-	    peer_want(p, index);
-	    if (!peer_chokes(p))
-		cm_piece_assign_requests_eg(pc, p);
-	}
+	assert(pc != NULL);
+	peer_want(p, index);
+	if (!peer_chokes(p) && !peer_laden(p))
+	    cm_assign_requests_eg(p);
     } else if (pc == NULL) {
 	peer_want(p, index);
 	if (!peer_chokes(p) && !peer_laden(p)) {
@@ -146,6 +145,7 @@ cm_on_ok_piece(struct piece *pc)
 	    if (peer_has(p, pc->index))
 		peer_unwant(p, pc->index);
 
+    assert(pc->nreqs == 0);
     piece_free(pc);
 
     if (torrent_has_all(tp)) {
@@ -178,8 +178,8 @@ cm_on_bad_piece(struct piece *pc)
     if (tp->endgame) {
 	struct peer *p;
 	BTPDQ_FOREACH(p, &tp->peers, cm_entry) {
-	    if (peer_has(p, pc->index) && peer_leech_ok(p))
-		cm_piece_assign_requests_eg(pc, p);
+	    if (peer_has(p, pc->index) && peer_leech_ok(p) && !peer_laden(p))
+		cm_assign_requests_eg(p);
 	}
     } else
 	cm_on_piece_unfull(pc); // XXX: May get bad data again.
@@ -245,31 +245,46 @@ cm_on_lost_peer(struct peer *p)
 }
 
 void
-cm_on_block(struct peer *p, uint32_t index, uint32_t begin, uint32_t length,
-    const char *data)
+cm_on_block(struct peer *p, struct block_request *req,
+    uint32_t index, uint32_t begin, uint32_t length, const char *data)
 {
     struct torrent *tp = p->tp;
+    struct block *blk = req->blk;
+    struct piece *pc = blk->pc;
+
+    BTPDQ_REMOVE(&blk->reqs, req, blk_entry);
+    free(req);
+    pc->nreqs--;
 
     off_t cbegin = index * p->tp->meta.piece_length + begin;
     torrent_put_bytes(p->tp, data, cbegin, length);
 
-    struct piece *pc = cm_find_piece(tp, index);
-    assert(pc != NULL);
-
-    uint32_t block = begin / PIECE_BLOCKLEN;
-    set_bit(pc->have_field, block);
+    set_bit(pc->have_field, begin / PIECE_BLOCKLEN);
     pc->ngot++;
 
     if (tp->endgame) {
-	BTPDQ_FOREACH(p, &tp->peers, cm_entry) {
-	    if (peer_has(p, index) && p->nreqs_out > 0)
-		peer_cancel(p, index, begin, length);
+	if (!BTPDQ_EMPTY(&blk->reqs)) {
+	    struct net_buf *nb = nb_create_cancel(index, begin, length);
+	    nb_hold(nb);
+	    struct block_request *req = BTPDQ_FIRST(&blk->reqs);
+	    while (req != NULL) {
+		struct block_request *next = BTPDQ_NEXT(req, blk_entry);
+		peer_cancel(req->p, req, nb);
+		free(req);
+		pc->nreqs--;
+		req = next;
+	    }
+	    BTPDQ_INIT(&blk->reqs);
+	    nb_drop(nb);
 	}
+	cm_piece_reorder_eg(pc);
 	if (pc->ngot == pc->nblocks)
 	    cm_on_piece(pc);
+	if (peer_leech_ok(p) && !peer_laden(p))
+	    cm_assign_requests_eg(p);
     } else {
 	// XXX: Needs to be looked at if we introduce snubbing.
-	clear_bit(pc->down_field, block);
+	clear_bit(pc->down_field, begin / PIECE_BLOCKLEN);
 	pc->nbusy--;
 	if (pc->ngot == pc->nblocks)
 	    cm_on_piece(pc);
diff --git a/btpd/policy_subr.c b/btpd/policy_subr.c
index 142abf8..5683ed8 100644
--- a/btpd/policy_subr.c
+++ b/btpd/policy_subr.c
@@ -35,13 +35,14 @@ piece_alloc(struct torrent *tp, uint32_t index)
     assert(!has_bit(tp->busy_field, index)
 	&& tp->npcs_busy < tp->meta.npieces);
     struct piece *pc;
-    size_t mem, field;
+    size_t mem, field, blocks;
     unsigned nblocks;
     off_t piece_length = torrent_piece_size(tp, index);
 
     nblocks = (unsigned)ceil((double)piece_length / PIECE_BLOCKLEN);
+    blocks = sizeof(pc->blocks[0]) * nblocks;
     field = (size_t)ceil(nblocks / 8.0);
-    mem = sizeof(*pc) + field;
+    mem = sizeof(*pc) + field + blocks;
 
     pc = btpd_calloc(1, mem);
     pc->tp = tp;
@@ -49,13 +50,28 @@ piece_alloc(struct torrent *tp, uint32_t index)
     pc->have_field =
 	tp->block_field +
 	index * (size_t)ceil(tp->meta.piece_length / (double)(1 << 17));
-    pc->nblocks = nblocks;
+
     pc->index = index;
+    pc->nblocks = nblocks;
+
+    pc->nreqs = 0;
+    pc->next_block = 0;
 
     for (unsigned i = 0; i < nblocks; i++)
 	if (has_bit(pc->have_field, i))
 	    pc->ngot++;
 
+    pc->blocks = (struct block *)(pc->down_field + field);
+    for (unsigned i = 0; i < nblocks; i++) {
+	uint32_t start = i * PIECE_BLOCKLEN;
+	uint32_t len = torrent_block_size(pc, i);
+	struct block *blk = &pc->blocks[i];
+	blk->pc = pc;
+	BTPDQ_INIT(&blk->reqs);
+	blk->msg = nb_create_request(index, start, len);
+	nb_hold(blk->msg);
+    }	
+
     tp->npcs_busy++;
     set_bit(tp->busy_field, index);
     BTPDQ_INSERT_HEAD(&tp->getlst, pc, entry);
@@ -70,6 +86,15 @@ piece_free(struct piece *pc)
     tp->npcs_busy--;
     clear_bit(tp->busy_field, pc->index);
     BTPDQ_REMOVE(&pc->tp->getlst, pc, entry);
+    for (unsigned i = 0; i < pc->nblocks; i++) {
+	struct block_request *req = BTPDQ_FIRST(&pc->blocks[i].reqs);
+	while (req != NULL) {
+	    struct block_request *next = BTPDQ_NEXT(req, blk_entry);
+	    free(req);
+	    req = next;
+	}
+	nb_drop(pc->blocks[i].msg);
+    }
     free(pc);
 }
 
@@ -98,26 +123,65 @@ cm_should_enter_endgame(struct torrent *tp)
 }
 
 static void
+cm_piece_insert_eg(struct piece *pc)
+{
+    struct piece_tq *getlst = &pc->tp->getlst;
+    if (pc->nblocks == pc->ngot)
+	BTPDQ_INSERT_TAIL(getlst, pc, entry);
+    else {
+	unsigned r = pc->nreqs / (pc->nblocks - pc->ngot);
+	struct piece *it;
+	BTPDQ_FOREACH(it, getlst, entry) {
+	    if ((it->nblocks == it->ngot
+		    || r < it->nreqs / (it->nblocks - it->ngot))) {
+		BTPDQ_INSERT_BEFORE(it, pc, entry);
+		break;
+	    }
+	}
+	if (it == NULL)
+	    BTPDQ_INSERT_TAIL(getlst, pc, entry);
+    }
+}
+
+void
+cm_piece_reorder_eg(struct piece *pc)
+{
+    BTPDQ_REMOVE(&pc->tp->getlst, pc, entry);
+    cm_piece_insert_eg(pc);
+}
+
+static void
 cm_enter_endgame(struct torrent *tp)
 {
     struct peer *p;
     struct piece *pc;
+    struct piece *pcs[tp->npcs_busy];
+    unsigned pi;
+
     btpd_log(BTPD_L_POL, "Entering end game\n");
     tp->endgame = 1;
+
+    pi = 0;
     BTPDQ_FOREACH(pc, &tp->getlst, entry) {
-	for (uint32_t i = 0; i < pc->nblocks; i++)
+	for (unsigned i = 0; i < pc->nblocks; i++)
 	    clear_bit(pc->down_field, i);
 	pc->nbusy = 0;
+	pcs[pi] = pc;
+	pi++;
+    }
+    BTPDQ_INIT(&tp->getlst);
+    while (pi > 0) {
+	pi--;
+	cm_piece_insert_eg(pcs[pi]);
     }
     BTPDQ_FOREACH(p, &tp->peers, cm_entry) {
 	assert(p->nwant == 0);
 	BTPDQ_FOREACH(pc, &tp->getlst, entry) {
-	    if (peer_has(p, pc->index)) {
+	    if (peer_has(p, pc->index))
 		peer_want(p, pc->index);
-		if (peer_leech_ok(p))
-		    cm_piece_assign_requests_eg(pc, p);
-	    }
 	}
+	if (p->nwant > 0 && peer_leech_ok(p) && !peer_laden(p))
+	    cm_assign_requests_eg(p);
     }
 }
 
@@ -320,6 +384,10 @@ cm_on_piece_unfull(struct piece *pc)
     }
 }
 
+#define INCNEXTBLOCK(pc) \
+    (pc)->next_block = ((pc)->next_block + 1) % (pc)->nblocks
+
+
 /*
  * Request as many blocks as possible on this piece from
  * the peer. If the piece becomes full we call cm_on_piece_full.
@@ -331,18 +399,29 @@ cm_piece_assign_requests(struct piece *pc, struct peer *p)
 {
     assert(!piece_full(pc) && !peer_laden(p));
     unsigned count = 0;
-    for (uint32_t i = 0; !piece_full(pc) && !peer_laden(p); i++) {
-	if (has_bit(pc->have_field, i) || has_bit(pc->down_field, i))
-	    continue;
-	set_bit(pc->down_field, i);
+    do {
+	while ((has_bit(pc->have_field, pc->next_block)
+		   || has_bit(pc->down_field, pc->next_block)))
+	    INCNEXTBLOCK(pc);
+
+	struct block *blk = &pc->blocks[pc->next_block];
+	struct block_request *req = btpd_malloc(sizeof(*req));
+	req->p = p;
+	req->blk = blk;
+	BTPDQ_INSERT_TAIL(&blk->reqs, req, blk_entry);
+	
+	peer_request(p, req);
+
+	set_bit(pc->down_field, pc->next_block);
 	pc->nbusy++;
-	uint32_t start = i * PIECE_BLOCKLEN;
-	uint32_t len = torrent_block_size(pc, i);
-	peer_request(p, pc->index, start, len);
+	pc->nreqs++;
 	count++;
-    }
+	INCNEXTBLOCK(pc);
+    } while (!piece_full(pc) && !peer_laden(p));
+
     if (piece_full(pc))
 	cm_on_piece_full(pc);
+
     return count;
 }
 
@@ -390,74 +469,118 @@ cm_assign_requests(struct peer *p)
 void
 cm_unassign_requests(struct peer *p)
 {
-    struct torrent *tp = p->tp;
-
-    struct piece *pc = BTPDQ_FIRST(&tp->getlst);
-    while (pc != NULL) {
+    while (p->nreqs_out > 0) {
+	struct block_request *req = BTPDQ_FIRST(&p->my_reqs);
+	struct piece *pc = req->blk->pc;
 	int was_full = piece_full(pc);
 
-	struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
-	while (nl != NULL) {
-	    struct nb_link *next = BTPDQ_NEXT(nl, entry);
-
-	    if (pc->index == nb_get_index(nl->nb)) {
-		uint32_t block = nb_get_begin(nl->nb) / PIECE_BLOCKLEN;
-		// XXX: Needs to be looked at if we introduce snubbing.
-		assert(has_bit(pc->down_field, block));
-		clear_bit(pc->down_field, block);
-		pc->nbusy--;
-		BTPDQ_REMOVE(&p->my_reqs, nl, entry);
-		nb_drop(nl->nb);
-		free(nl);
-	    }
-	    
-	    nl = next;
+	while (req != NULL) {
+	    struct block_request *next = BTPDQ_NEXT(req, p_entry);
+
+	    uint32_t blki = nb_get_begin(req->blk->msg) / PIECE_BLOCKLEN;
+	    struct block *blk = req->blk;
+	    // XXX: Needs to be looked at if we introduce snubbing.
+	    assert(has_bit(pc->down_field, blki));
+	    clear_bit(pc->down_field, blki);
+	    pc->nbusy--;
+	    BTPDQ_REMOVE(&p->my_reqs, req, p_entry);
+	    p->nreqs_out--;
+	    BTPDQ_REMOVE(&blk->reqs, req, blk_entry);
+	    free(req);
+	    pc->nreqs--;
+
+	    while (next != NULL && next->blk->pc != pc)
+		next = BTPDQ_NEXT(next, p_entry);
+	    req = next;
 	}
-	
+
 	if (was_full && !piece_full(pc))
 	    cm_on_piece_unfull(pc);
-
-	pc = BTPDQ_NEXT(pc, entry);
     }
-
     assert(BTPDQ_EMPTY(&p->my_reqs));
-    p->nreqs_out = 0;
 }
 
-
-void
+static void
 cm_piece_assign_requests_eg(struct piece *pc, struct peer *p)
 {
-    for (uint32_t i = 0; i < pc->nblocks; i++) {
-	if (!has_bit(pc->have_field, i)) {
-	    uint32_t start = i * PIECE_BLOCKLEN;
-	    uint32_t len = torrent_block_size(pc, i);
-	    peer_request(p, pc->index, start, len);
+    unsigned first_block = pc->next_block;
+    do {
+	if ((has_bit(pc->have_field, pc->next_block)
+		|| peer_requested(p, &pc->blocks[pc->next_block]))) {
+	    INCNEXTBLOCK(pc);
+	    continue;
 	}
-    }
+	struct block_request *req = btpd_calloc(1, sizeof(*req));
+	req->blk = &pc->blocks[pc->next_block];
+	req->p = p;
+	BTPDQ_INSERT_TAIL(&pc->blocks[pc->next_block].reqs, req, blk_entry);
+	pc->nreqs++;
+	INCNEXTBLOCK(pc);
+	peer_request(p, req);
+    } while (!peer_laden(p) && pc->next_block != first_block);
 }
 
 void
 cm_assign_requests_eg(struct peer *p)
 {
+    assert(!peer_laden(p));
     struct torrent *tp = p->tp;
-    struct piece *pc;
-    BTPDQ_FOREACH(pc, &tp->getlst, entry) {
-	if (peer_has(p, pc->index))
+    struct piece_tq tmp;
+    BTPDQ_INIT(&tmp);
+
+    struct piece *pc = BTPDQ_FIRST(&tp->getlst);
+    while (!peer_laden(p) && pc != NULL) {
+	struct piece *next = BTPDQ_NEXT(pc, entry);
+	if (peer_has(p, pc->index) && pc->nblocks != pc->ngot) {
 	    cm_piece_assign_requests_eg(pc, p);
+	    BTPDQ_REMOVE(&tp->getlst, pc, entry);
+	    BTPDQ_INSERT_HEAD(&tmp, pc, entry);
+	}
+	pc = next;
+    }
+
+    pc = BTPDQ_FIRST(&tmp);
+    while (pc != NULL) {
+	struct piece *next = BTPDQ_NEXT(pc, entry);
+	cm_piece_insert_eg(pc);
+	pc = next;
     }
 }
 
 void
 cm_unassign_requests_eg(struct peer *p)
 {
-    struct nb_link *nl = BTPDQ_FIRST(&p->my_reqs);
-    while (nl != NULL) {
-	struct nb_link *next = BTPDQ_NEXT(nl, entry);
-	nb_drop(nl->nb);
-	free(nl);
-	nl = next;
+    struct block_request *req;
+    struct piece *pc;
+    struct piece_tq tmp;
+    BTPDQ_INIT(&tmp);
+
+    while (p->nreqs_out > 0) {
+	req = BTPDQ_FIRST(&p->my_reqs);
+
+	pc = req->blk->pc;
+	BTPDQ_REMOVE(&pc->tp->getlst, pc, entry);
+	BTPDQ_INSERT_HEAD(&tmp, pc, entry);
+	
+	while (req != NULL) {
+	    struct block_request *next = BTPDQ_NEXT(req, p_entry);
+	    BTPDQ_REMOVE(&p->my_reqs, req, p_entry);
+	    p->nreqs_out--;
+	    BTPDQ_REMOVE(&req->blk->reqs, req, blk_entry);
+	    free(req);
+	    pc->nreqs--;
+
+	    while (next != NULL && next->blk->pc != pc)
+		next = BTPDQ_NEXT(next, p_entry);
+	    req = next;
+	}
+    }
+    assert(BTPDQ_EMPTY(&p->my_reqs));
+
+    pc = BTPDQ_FIRST(&tmp);
+    while (pc != NULL) {
+	struct piece *next = BTPDQ_NEXT(pc, entry);
+	cm_piece_insert_eg(pc);
+	pc = next;
     }
-    BTPDQ_INIT(&p->my_reqs);
-    p->nreqs_out = 0;
 }
diff --git a/btpd/torrent.h b/btpd/torrent.h
index 6398eb4..e5b7b2c 100644
--- a/btpd/torrent.h
+++ b/btpd/torrent.h
@@ -3,14 +3,25 @@
 
 #define PIECE_BLOCKLEN (1 << 14)
 
+struct block {
+    struct piece *pc;
+    struct net_buf *msg;
+    struct block_request_tq reqs;
+};
+
 struct piece {
     struct torrent *tp;
 
     uint32_t index;
-    unsigned nblocks;
 
+    unsigned nreqs;
+
+    unsigned nblocks;
     unsigned ngot;
     unsigned nbusy;
+    unsigned next_block;
+
+    struct block *blocks;
 
     uint8_t *have_field;
     uint8_t *down_field;