about summary refs log tree commit diff
path: root/btpd/download_subr.c
diff options
context:
space:
mode:
Diffstat (limited to 'btpd/download_subr.c')
-rw-r--r--btpd/download_subr.c586
1 files changed, 586 insertions, 0 deletions
diff --git a/btpd/download_subr.c b/btpd/download_subr.c
new file mode 100644
index 0000000..f0429f0
--- /dev/null
+++ b/btpd/download_subr.c
@@ -0,0 +1,586 @@
+/*
+ * The commandments:
+ *
+ * A peer is wanted except when it only has pieces we've already
+ * downloaded or fully requested. Thus, a peer's wanted count is
+ * increased for each missing or unfull piece it announces, or
+ * when a piece it has becomes unfull.
+ *
+ * When a peer we want unchokes us, requests will primarily
+ * be put on pieces we're already downloading and then on
+ * possible new pieces.
+ *
+ * When choosing between several different new pieces to start
+ * downloading, the rarest piece will be chosen.
+ *
+ * End game mode sets in when all missing blocks are requested.
+ * In end game mode no piece is counted as full unless it's
+ * downloaded.
+ *
+ */
+
+#include <fcntl.h>
+#include <math.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <openssl/sha.h>
+
+#include "btpd.h"
+#include "stream.h"
+
+static struct piece *
+piece_alloc(struct torrent *tp, uint32_t index)
+{
+    assert(!has_bit(tp->busy_field, index)
+	&& tp->npcs_busy < tp->meta.npieces);
+    struct piece *pc;
+    size_t mem, field, blocks;
+    unsigned nblocks;
+    off_t piece_length = torrent_piece_size(tp, index);
+
+    nblocks = (unsigned)ceil((double)piece_length / PIECE_BLOCKLEN);
+    blocks = sizeof(pc->blocks[0]) * nblocks;
+    field = (size_t)ceil(nblocks / 8.0);
+    mem = sizeof(*pc) + field + blocks;
+
+    pc = btpd_calloc(1, mem);
+    pc->tp = tp;
+    pc->down_field = (uint8_t *)(pc + 1);
+    pc->have_field =
+	tp->block_field +
+	index * (size_t)ceil(tp->meta.piece_length / (double)(1 << 17));
+
+    pc->index = index;
+    pc->nblocks = nblocks;
+
+    pc->nreqs = 0;
+    pc->next_block = 0;
+
+    for (unsigned i = 0; i < nblocks; i++)
+	if (has_bit(pc->have_field, i))
+	    pc->ngot++;
+
+    pc->blocks = (struct block *)(pc->down_field + field);
+    for (unsigned i = 0; i < nblocks; i++) {
+	uint32_t start = i * PIECE_BLOCKLEN;
+	uint32_t len = torrent_block_size(pc, i);
+	struct block *blk = &pc->blocks[i];
+	blk->pc = pc;
+	BTPDQ_INIT(&blk->reqs);
+	blk->msg = nb_create_request(index, start, len);
+	nb_hold(blk->msg);
+    }	
+
+    tp->npcs_busy++;
+    set_bit(tp->busy_field, index);
+    BTPDQ_INSERT_HEAD(&tp->getlst, pc, entry);
+    return pc;
+}
+
+void
+piece_free(struct piece *pc)
+{
+    struct torrent *tp = pc->tp;
+    assert(tp->npcs_busy > 0);
+    tp->npcs_busy--;
+    clear_bit(tp->busy_field, pc->index);
+    BTPDQ_REMOVE(&pc->tp->getlst, pc, entry);
+    for (unsigned i = 0; i < pc->nblocks; i++) {
+	struct block_request *req = BTPDQ_FIRST(&pc->blocks[i].reqs);
+	while (req != NULL) {
+	    struct block_request *next = BTPDQ_NEXT(req, blk_entry);
+	    free(req);
+	    req = next;
+	}
+	nb_drop(pc->blocks[i].msg);
+    }
+    free(pc);
+}
+
+int
+piece_full(struct piece *pc)
+{
+    return pc->ngot + pc->nbusy == pc->nblocks;
+}
+
+static int
+dl_should_enter_endgame(struct torrent *tp)
+{
+    int should;
+    if (tp->have_npieces + tp->npcs_busy == tp->meta.npieces) {
+	should = 1;
+	struct piece *pc;
+	BTPDQ_FOREACH(pc, &tp->getlst, entry) {
+	    if (!piece_full(pc)) {
+		should = 0;
+		break;
+	    }
+	}
+    } else
+	should = 0;
+    return should;
+}
+
+static void
+dl_piece_insert_eg(struct piece *pc)
+{
+    struct piece_tq *getlst = &pc->tp->getlst;
+    if (pc->nblocks == pc->ngot)
+	BTPDQ_INSERT_TAIL(getlst, pc, entry);
+    else {
+	unsigned r = pc->nreqs / (pc->nblocks - pc->ngot);
+	struct piece *it;
+	BTPDQ_FOREACH(it, getlst, entry) {
+	    if ((it->nblocks == it->ngot
+		    || r < it->nreqs / (it->nblocks - it->ngot))) {
+		BTPDQ_INSERT_BEFORE(it, pc, entry);
+		break;
+	    }
+	}
+	if (it == NULL)
+	    BTPDQ_INSERT_TAIL(getlst, pc, entry);
+    }
+}
+
+void
+dl_piece_reorder_eg(struct piece *pc)
+{
+    BTPDQ_REMOVE(&pc->tp->getlst, pc, entry);
+    dl_piece_insert_eg(pc);
+}
+
+static void
+dl_enter_endgame(struct torrent *tp)
+{
+    struct peer *p;
+    struct piece *pc;
+    struct piece *pcs[tp->npcs_busy];
+    unsigned pi;
+
+    btpd_log(BTPD_L_POL, "Entering end game\n");
+    tp->endgame = 1;
+
+    pi = 0;
+    BTPDQ_FOREACH(pc, &tp->getlst, entry) {
+	for (unsigned i = 0; i < pc->nblocks; i++)
+	    clear_bit(pc->down_field, i);
+	pc->nbusy = 0;
+	pcs[pi] = pc;
+	pi++;
+    }
+    BTPDQ_INIT(&tp->getlst);
+    while (pi > 0) {
+	pi--;
+	dl_piece_insert_eg(pcs[pi]);
+    }
+    BTPDQ_FOREACH(p, &tp->peers, p_entry) {
+	assert(p->nwant == 0);
+	BTPDQ_FOREACH(pc, &tp->getlst, entry) {
+	    if (peer_has(p, pc->index))
+		peer_want(p, pc->index);
+	}
+	if (p->nwant > 0 && peer_leech_ok(p) && !peer_laden(p))
+	    dl_assign_requests_eg(p);
+    }
+}
+
+struct piece *
+dl_find_piece(struct torrent *tp, uint32_t index)
+{
+    struct piece *pc;
+    BTPDQ_FOREACH(pc, &tp->getlst, entry)
+	if (pc->index == index)
+	    break;
+    return pc;
+}
+
+static int
+test_hash(struct torrent *tp, uint8_t *hash, unsigned long index)
+{
+    if (tp->meta.piece_hash != NULL)
+	return memcmp(hash, tp->meta.piece_hash[index], SHA_DIGEST_LENGTH);
+    else {
+	char piece_hash[SHA_DIGEST_LENGTH];
+	int fd;
+	int bufi;
+	int err;
+
+	err = vopen(&fd, O_RDONLY, "%s/torrent", tp->relpath);
+	if (err != 0)
+	    btpd_err("test_hash: %s\n", strerror(err));
+
+	err = lseek(fd, tp->meta.pieces_off + index * SHA_DIGEST_LENGTH,
+	    SEEK_SET);
+	if (err < 0)
+	    btpd_err("test_hash: %s\n", strerror(errno));
+
+	bufi = 0;
+	while (bufi < SHA_DIGEST_LENGTH) {
+	    ssize_t nread =
+		read(fd, piece_hash + bufi, SHA_DIGEST_LENGTH - bufi);
+	    bufi += nread;
+	}
+	close(fd);
+
+	return memcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
+    }
+}
+
+static int
+ro_fd_cb(const char *path, int *fd, void *arg)
+{
+    struct torrent *tp = arg;
+    return vopen(fd, O_RDONLY, "%s/content/%s", tp->relpath, path);
+}
+
+static void
+torrent_test_piece(struct piece *pc)
+{
+    struct torrent *tp = pc->tp;
+    int err;
+    uint8_t hash[20];
+    struct bt_stream_ro *bts;
+    off_t plen = torrent_piece_size(tp, pc->index);
+
+    if ((bts = bts_open_ro(&tp->meta, pc->index * tp->meta.piece_length,
+	     ro_fd_cb, tp)) == NULL)
+	btpd_err("Out of memory.\n");
+
+    if ((err = bts_sha(bts, plen, hash)) != 0)
+	btpd_err("Ouch! %s\n", strerror(err));
+
+    bts_close_ro(bts);
+
+    if (test_hash(tp, hash, pc->index) == 0)
+	dl_on_ok_piece(pc);
+    else
+	dl_on_bad_piece(pc);
+}
+
+void
+dl_on_piece(struct piece *pc)
+{
+    torrent_test_piece(pc);
+}
+
+static int
+dl_piece_startable(struct peer *p, uint32_t index)
+{
+    return peer_has(p, index) && !has_bit(p->tp->piece_field, index)
+	&& !has_bit(p->tp->busy_field, index);
+}
+
+/*
+ * Find the rarest piece the peer has, that isn't already allocated
+ * for download or already downloaded. If no such piece can be found
+ * return ENOENT.
+ *
+ * Return 0 or ENOENT, index in res.
+ */
+static int
+dl_choose_rarest(struct peer *p, uint32_t *res)
+{
+    uint32_t i;
+    struct torrent *tp = p->tp;
+
+    assert(tp->endgame == 0);
+
+    for (i = 0; i < tp->meta.npieces && !dl_piece_startable(p, i); i++)
+	;
+
+    if (i == tp->meta.npieces)
+	return ENOENT;
+    
+    uint32_t min_i = i;
+    uint32_t min_c = 1;
+    for(i++; i < tp->meta.npieces; i++) {
+	if (dl_piece_startable(p, i)) {
+	    if (tp->piece_count[i] == tp->piece_count[min_i])
+		min_c++;
+	    else if (tp->piece_count[i] < tp->piece_count[min_i]) {
+		min_i = i;
+		min_c = 1;
+	    }
+	}
+    }
+    if (min_c > 1) {
+	min_c = 1 + rint((double)random() * (min_c - 1) / RAND_MAX);
+	for (i = min_i; min_c > 0; i++) {
+	    if (dl_piece_startable(p, i)
+		&& tp->piece_count[i] == tp->piece_count[min_i]) {
+		min_c--;
+		min_i = i;
+	    }
+	}
+    }
+    *res = min_i;
+    return 0;
+}
+
+/*
+ * Called from either dl_piece_assign_requests or dl_new_piece, 
+ * when a pice becomes full. The wanted level of the peers
+ * that has this piece will be decreased. This function is
+ * the only one that may trigger end game.
+ */
+static void
+dl_on_piece_full(struct piece *pc)
+{
+    struct peer *p;
+    BTPDQ_FOREACH(p, &pc->tp->peers, p_entry) {
+	if (peer_has(p, pc->index))
+	    peer_unwant(p, pc->index);
+    }
+    if (dl_should_enter_endgame(pc->tp))
+	dl_enter_endgame(pc->tp);
+}
+
+/*
+ * Allocate the piece indicated by the index for download.
+ * There's a small possibility that a piece is fully downloaded
+ * but haven't been tested. If such is the case the piece will
+ * be tested and NULL will be returned. Also, we might then enter
+ * end game.
+ *
+ * Return the piece or NULL.
+ */
+struct piece *
+dl_new_piece(struct torrent *tp, uint32_t index)
+{
+    btpd_log(BTPD_L_POL, "Started on piece %u.\n", index);
+    struct piece *pc = piece_alloc(tp, index);
+    if (pc->ngot == pc->nblocks) {
+	dl_on_piece_full(pc);
+	dl_on_piece(pc);
+	if (dl_should_enter_endgame(tp))
+	    dl_enter_endgame(tp);
+	return NULL;
+    } else
+	return pc;
+}
+
+/*
+ * Called when a previously full piece loses a peer.
+ * This is needed because we have decreased the wanted
+ * level for the peers that have this piece when it got
+ * full. Thus we have to increase the wanted level and
+ * try to assign requests for this piece.
+ */
+void
+dl_on_piece_unfull(struct piece *pc)
+{
+    struct torrent *tp = pc->tp;
+    struct peer *p;
+    assert(!piece_full(pc) && tp->endgame == 0);
+    BTPDQ_FOREACH(p, &tp->peers, p_entry)
+	if (peer_has(p, pc->index))
+	    peer_want(p, pc->index);
+    p = BTPDQ_FIRST(&tp->peers);
+    while (p != NULL && !piece_full(pc)) {
+	if (peer_leech_ok(p) && !peer_laden(p))
+	    dl_piece_assign_requests(pc, p); // Cannot provoke end game here.
+	p = BTPDQ_NEXT(p, p_entry);
+    }
+}
+
+#define INCNEXTBLOCK(pc) \
+    (pc)->next_block = ((pc)->next_block + 1) % (pc)->nblocks
+
+
+/*
+ * Request as many blocks as possible on this piece from
+ * the peer. If the piece becomes full we call dl_on_piece_full.
+ *
+ * Return the number of requests sent.
+ */
+unsigned
+dl_piece_assign_requests(struct piece *pc, struct peer *p)
+{
+    assert(!piece_full(pc) && !peer_laden(p));
+    unsigned count = 0;
+    do {
+	while ((has_bit(pc->have_field, pc->next_block)
+		   || has_bit(pc->down_field, pc->next_block)))
+	    INCNEXTBLOCK(pc);
+
+	struct block *blk = &pc->blocks[pc->next_block];
+	struct block_request *req = btpd_malloc(sizeof(*req));
+	req->p = p;
+	req->blk = blk;
+	BTPDQ_INSERT_TAIL(&blk->reqs, req, blk_entry);
+	
+	peer_request(p, req);
+
+	set_bit(pc->down_field, pc->next_block);
+	pc->nbusy++;
+	pc->nreqs++;
+	count++;
+	INCNEXTBLOCK(pc);
+    } while (!piece_full(pc) && !peer_laden(p));
+
+    if (piece_full(pc))
+	dl_on_piece_full(pc);
+
+    return count;
+}
+
+/*
+ * Request as many blocks as possible from the peer. Puts
+ * requests on already active pieces before starting on new
+ * ones. Care must be taken since end game mode may be triggered
+ * by the calls to dl_piece_assign_requests.
+ *
+ * Returns number of requests sent.
+ *
+ * XXX: should do something smart when deciding on which
+ *      already started piece to put requests on.
+ */
+unsigned
+dl_assign_requests(struct peer *p)
+{
+    assert(!p->tp->endgame && !peer_laden(p));
+    struct piece *pc;
+    struct torrent *tp = p->tp;
+    unsigned count = 0;
+    BTPDQ_FOREACH(pc, &tp->getlst, entry) {
+	if (piece_full(pc) || !peer_has(p, pc->index))
+	    continue;
+	count += dl_piece_assign_requests(pc, p);
+	if (tp->endgame)
+	    break;
+	if (!piece_full(pc))
+	    assert(peer_laden(p));
+	if (peer_laden(p))
+	    break;
+    }
+    while (!peer_laden(p) && !tp->endgame) {
+	uint32_t index;
+	if (dl_choose_rarest(p, &index) == 0) {
+	    pc = dl_new_piece(tp, index);
+	    if (pc != NULL)
+		count += dl_piece_assign_requests(pc, p);
+	} else
+	    break;
+    }
+    return count;
+}
+
+void
+dl_unassign_requests(struct peer *p)
+{
+    while (p->nreqs_out > 0) {
+	struct block_request *req = BTPDQ_FIRST(&p->my_reqs);
+	struct piece *pc = req->blk->pc;
+	int was_full = piece_full(pc);
+
+	while (req != NULL) {
+	    struct block_request *next = BTPDQ_NEXT(req, p_entry);
+
+	    uint32_t blki = nb_get_begin(req->blk->msg) / PIECE_BLOCKLEN;
+	    struct block *blk = req->blk;
+	    // XXX: Needs to be looked at if we introduce snubbing.
+	    assert(has_bit(pc->down_field, blki));
+	    clear_bit(pc->down_field, blki);
+	    pc->nbusy--;
+	    BTPDQ_REMOVE(&p->my_reqs, req, p_entry);
+	    p->nreqs_out--;
+	    BTPDQ_REMOVE(&blk->reqs, req, blk_entry);
+	    free(req);
+	    pc->nreqs--;
+
+	    while (next != NULL && next->blk->pc != pc)
+		next = BTPDQ_NEXT(next, p_entry);
+	    req = next;
+	}
+
+	if (was_full && !piece_full(pc))
+	    dl_on_piece_unfull(pc);
+    }
+    assert(BTPDQ_EMPTY(&p->my_reqs));
+}
+
+static void
+dl_piece_assign_requests_eg(struct piece *pc, struct peer *p)
+{
+    unsigned first_block = pc->next_block;
+    do {
+	if ((has_bit(pc->have_field, pc->next_block)
+		|| peer_requested(p, &pc->blocks[pc->next_block]))) {
+	    INCNEXTBLOCK(pc);
+	    continue;
+	}
+	struct block_request *req = btpd_calloc(1, sizeof(*req));
+	req->blk = &pc->blocks[pc->next_block];
+	req->p = p;
+	BTPDQ_INSERT_TAIL(&pc->blocks[pc->next_block].reqs, req, blk_entry);
+	pc->nreqs++;
+	INCNEXTBLOCK(pc);
+	peer_request(p, req);
+    } while (!peer_laden(p) && pc->next_block != first_block);
+}
+
+void
+dl_assign_requests_eg(struct peer *p)
+{
+    assert(!peer_laden(p));
+    struct torrent *tp = p->tp;
+    struct piece_tq tmp;
+    BTPDQ_INIT(&tmp);
+
+    struct piece *pc = BTPDQ_FIRST(&tp->getlst);
+    while (!peer_laden(p) && pc != NULL) {
+	struct piece *next = BTPDQ_NEXT(pc, entry);
+	if (peer_has(p, pc->index) && pc->nblocks != pc->ngot) {
+	    dl_piece_assign_requests_eg(pc, p);
+	    BTPDQ_REMOVE(&tp->getlst, pc, entry);
+	    BTPDQ_INSERT_HEAD(&tmp, pc, entry);
+	}
+	pc = next;
+    }
+
+    pc = BTPDQ_FIRST(&tmp);
+    while (pc != NULL) {
+	struct piece *next = BTPDQ_NEXT(pc, entry);
+	dl_piece_insert_eg(pc);
+	pc = next;
+    }
+}
+
+void
+dl_unassign_requests_eg(struct peer *p)
+{
+    struct block_request *req;
+    struct piece *pc;
+    struct piece_tq tmp;
+    BTPDQ_INIT(&tmp);
+
+    while (p->nreqs_out > 0) {
+	req = BTPDQ_FIRST(&p->my_reqs);
+
+	pc = req->blk->pc;
+	BTPDQ_REMOVE(&pc->tp->getlst, pc, entry);
+	BTPDQ_INSERT_HEAD(&tmp, pc, entry);
+	
+	while (req != NULL) {
+	    struct block_request *next = BTPDQ_NEXT(req, p_entry);
+	    BTPDQ_REMOVE(&p->my_reqs, req, p_entry);
+	    p->nreqs_out--;
+	    BTPDQ_REMOVE(&req->blk->reqs, req, blk_entry);
+	    free(req);
+	    pc->nreqs--;
+
+	    while (next != NULL && next->blk->pc != pc)
+		next = BTPDQ_NEXT(next, p_entry);
+	    req = next;
+	}
+    }
+    assert(BTPDQ_EMPTY(&p->my_reqs));
+
+    pc = BTPDQ_FIRST(&tmp);
+    while (pc != NULL) {
+	struct piece *next = BTPDQ_NEXT(pc, entry);
+	dl_piece_insert_eg(pc);
+	pc = next;
+    }
+}