From 005ce9d2c20c03d6fc40b80c087c5fe2c6716e75 Mon Sep 17 00:00:00 2001 From: Richard Nyberg Date: Tue, 29 Nov 2005 13:58:19 +0000 Subject: * Removed the heartbeat and btpd_seconds. Note that this breaks the tracker. * Renamed the policy* files to upload* and download*. * The upload (un)choker is now global instead of per torrent. The algorithm is not yet implemented however. To be continued... --- btpd/Makefile.am | 5 +- btpd/btpd.c | 20 +- btpd/btpd.h | 5 +- btpd/cli_if.c | 1 - btpd/download.c | 228 ++++++++++++++++++++ btpd/download.h | 37 ++++ btpd/download_subr.c | 586 +++++++++++++++++++++++++++++++++++++++++++++++++++ btpd/opts.h | 5 + btpd/peer.c | 4 +- btpd/peer.h | 2 +- btpd/policy.h | 48 ----- btpd/policy_choke.c | 90 -------- btpd/policy_if.c | 287 ------------------------- btpd/policy_subr.c | 586 --------------------------------------------------- btpd/torrent.h | 4 - btpd/tracker_req.c | 4 +- btpd/upload.c | 72 +++++++ btpd/upload.h | 11 + 18 files changed, 950 insertions(+), 1045 deletions(-) create mode 100644 btpd/download.c create mode 100644 btpd/download.h create mode 100644 btpd/download_subr.c delete mode 100644 btpd/policy.h delete mode 100644 btpd/policy_choke.c delete mode 100644 btpd/policy_if.c delete mode 100644 btpd/policy_subr.c create mode 100644 btpd/upload.c create mode 100644 btpd/upload.h diff --git a/btpd/Makefile.am b/btpd/Makefile.am index 718cfbd..0b4d4c8 100644 --- a/btpd/Makefile.am +++ b/btpd/Makefile.am @@ -8,9 +8,10 @@ btpd_SOURCES=\ net_buf.c net_buf.h\ queue.h \ peer.c peer.h\ - policy_choke.c policy_if.c policy_subr.c policy.h\ + download.c download_subr.c download.h\ torrent.c torrent.h\ - tracker_req.c tracker_req.h + tracker_req.c tracker_req.h\ + upload.c upload.h btpd_LDADD=../misc/libmisc.a -levent -lcrypto -lm btpd_CPPFLAGS=-I$(top_srcdir)/misc @event_CPPFLAGS@ @openssl_CPPFLAGS@ diff --git a/btpd/btpd.c b/btpd/btpd.c index 026bb26..0866d5a 100644 --- a/btpd/btpd.c +++ b/btpd/btpd.c @@ -37,7 +37,6 @@ struct child { BTPDQ_HEAD(child_tq, child); static uint8_t m_peer_id[20]; -static struct event m_heartbeat; static struct event m_sigint; static struct event m_sigterm; static struct event m_sigchld; @@ -45,8 +44,6 @@ static struct child_tq m_kids = BTPDQ_HEAD_INITIALIZER(m_kids); static unsigned m_ntorrents; static struct torrent_tq m_torrents = BTPDQ_HEAD_INITIALIZER(m_torrents); -unsigned long btpd_seconds; - void btpd_shutdown(void) { @@ -98,19 +95,6 @@ child_cb(int signal, short type, void *arg) } } -static void -heartbeat_cb(int sd, short type, void *arg) -{ - struct torrent *tp; - - btpd_seconds++; - - BTPDQ_FOREACH(tp, &m_torrents, entry) - dl_by_second(tp); - - evtimer_add(&m_heartbeat, (& (struct timeval) { 1, 0 })); -} - void btpd_add_torrent(struct torrent *tp) { @@ -165,6 +149,7 @@ btpd_init(void) net_init(); ipc_init(); + ul_init(); signal(SIGPIPE, SIG_IGN); @@ -174,7 +159,4 @@ btpd_init(void) signal_add(&m_sigterm, NULL); signal_set(&m_sigchld, SIGCHLD, child_cb, NULL); signal_add(&m_sigchld, NULL); - - evtimer_set(&m_heartbeat, heartbeat_cb, NULL); - evtimer_add(&m_heartbeat, (& (struct timeval) { 1, 0 })); } diff --git a/btpd/btpd.h b/btpd/btpd.h index 4b0a4a0..973976d 100644 --- a/btpd/btpd.h +++ b/btpd/btpd.h @@ -22,15 +22,14 @@ #include "net.h" #include "peer.h" #include "torrent.h" -#include "policy.h" +#include "download.h" +#include "upload.h" #include "subr.h" #include "opts.h" #define BTPD_VERSION (PACKAGE_NAME "/" PACKAGE_VERSION) -extern unsigned long btpd_seconds; - #define BTPD_L_ALL 0xffffffff #define BTPD_L_ERROR 0x00000001 #define BTPD_L_TRACKER 0x00000002 diff --git a/btpd/cli_if.c b/btpd/cli_if.c index a10d60b..99fcb82 100644 --- a/btpd/cli_if.c +++ b/btpd/cli_if.c @@ -38,7 +38,6 @@ cmd_stat(int argc, const char *args, FILE *fp) errdie(buf_swrite(&iob, "d")); errdie(buf_print(&iob, "6:npeersi%ue", net_npeers)); errdie(buf_print(&iob, "9:ntorrentsi%ue", btpd_get_ntorrents())); - errdie(buf_print(&iob, "7:secondsi%lue", btpd_seconds)); errdie(buf_swrite(&iob, "8:torrentsl")); BTPDQ_FOREACH(tp, btpd_get_torrents(), entry) { uint32_t seen_npieces = 0; diff --git a/btpd/download.c b/btpd/download.c new file mode 100644 index 0000000..c093029 --- /dev/null +++ b/btpd/download.c @@ -0,0 +1,228 @@ +#include +#include + +#include "btpd.h" +#include "tracker_req.h" + +/* + * Called when a peer announces it's got a new piece. + * + * If the piece is missing or unfull we increase the peer's + * wanted level and if possible call dl_on_download. + */ +void +dl_on_piece_ann(struct peer *p, uint32_t index) +{ + struct torrent *tp = p->tp; + tp->piece_count[index]++; + if (has_bit(tp->piece_field, index)) + return; + struct piece *pc = dl_find_piece(tp, index); + if (tp->endgame) { + assert(pc != NULL); + peer_want(p, index); + if (!peer_chokes(p) && !peer_laden(p)) + dl_assign_requests_eg(p); + } else if (pc == NULL) { + peer_want(p, index); + if (!peer_chokes(p) && !peer_laden(p)) { + pc = dl_new_piece(tp, index); + if (pc != NULL) + dl_piece_assign_requests(pc, p); + } + } else if (!piece_full(pc)) { + peer_want(p, index); + if (!peer_chokes(p) && !peer_laden(p)) + dl_piece_assign_requests(pc, p); + } +} + +void +dl_on_download(struct peer *p) +{ + assert(peer_wanted(p)); + struct torrent *tp = p->tp; + if (tp->endgame) { + dl_assign_requests_eg(p); + } else { + unsigned count = dl_assign_requests(p); + if (count == 0 && !p->tp->endgame) // We may have entered end game. + assert(!peer_wanted(p) || peer_laden(p)); + } +} + +void +dl_on_unchoke(struct peer *p) +{ + if (peer_wanted(p)) + dl_on_download(p); +} + +void +dl_on_undownload(struct peer *p) +{ + if (!p->tp->endgame) + dl_unassign_requests(p); + else + dl_unassign_requests_eg(p); +} + +void +dl_on_choke(struct peer *p) +{ + if (p->nreqs_out > 0) + dl_on_undownload(p); +} + +/** + * Called when a piece has been tested positively. + */ +void +dl_on_ok_piece(struct piece *pc) +{ + struct peer *p; + struct torrent *tp = pc->tp; + + btpd_log(BTPD_L_POL, "Got piece: %u.\n", pc->index); + + set_bit(tp->piece_field, pc->index); + tp->have_npieces++; + msync(tp->imem, tp->isiz, MS_ASYNC); + + struct net_buf *have = nb_create_have(pc->index); + BTPDQ_FOREACH(p, &tp->peers, p_entry) + peer_send(p, have); + + if (tp->endgame) + BTPDQ_FOREACH(p, &tp->peers, p_entry) + if (peer_has(p, pc->index)) + peer_unwant(p, pc->index); + + assert(pc->nreqs == 0); + piece_free(pc); + + if (torrent_has_all(tp)) { + btpd_log(BTPD_L_BTPD, "Finished: %s.\n", tp->relpath); + tracker_req(tp, TR_COMPLETED); + BTPDQ_FOREACH(p, &tp->peers, p_entry) + assert(p->nwant == 0); + } +} + +/* + * Called when a piece has been tested negatively. + */ +void +dl_on_bad_piece(struct piece *pc) +{ + struct torrent *tp = pc->tp; + + btpd_log(BTPD_L_ERROR, "Bad hash for piece %u of %s.\n", + pc->index, tp->relpath); + + for (uint32_t i = 0; i < pc->nblocks; i++) { + clear_bit(pc->down_field, i); + clear_bit(pc->have_field, i); + } + pc->ngot = 0; + pc->nbusy = 0; + msync(tp->imem, tp->isiz, MS_ASYNC); + + if (tp->endgame) { + struct peer *p; + BTPDQ_FOREACH(p, &tp->peers, p_entry) { + if (peer_has(p, pc->index) && peer_leech_ok(p) && !peer_laden(p)) + dl_assign_requests_eg(p); + } + } else + dl_on_piece_unfull(pc); // XXX: May get bad data again. +} + +void +dl_on_new_peer(struct peer *p) +{ + struct torrent *tp = p->tp; + tp->npeers++; + p->flags |= PF_ATTACHED; + BTPDQ_REMOVE(&net_unattached, p, p_entry); + BTPDQ_INSERT_HEAD(&tp->peers, p, p_entry); + ul_on_new_peer(p); +} + +void +dl_on_lost_peer(struct peer *p) +{ + struct torrent *tp = p->tp; + + assert(tp->npeers > 0 && (p->flags & PF_ATTACHED) != 0); + tp->npeers--; + p->flags &= ~PF_ATTACHED; + + ul_on_lost_peer(p); + + for (uint32_t i = 0; i < tp->meta.npieces; i++) + if (peer_has(p, i)) + tp->piece_count[i]--; + + if (p->nreqs_out > 0) + dl_on_undownload(p); +#if 0 + struct piece *pc = BTPDQ_FIRST(&tp->getlst); + while (pc != NULL) { + struct piece *next = BTPDQ_NEXT(pc, entry); + if (peer_has(p, pc->index) && tp->piece_count[pc->index] == 0) + dl_on_peerless_piece(pc); + pc = next; + } +#endif +} + +void +dl_on_block(struct peer *p, struct block_request *req, + uint32_t index, uint32_t begin, uint32_t length, const char *data) +{ + struct torrent *tp = p->tp; + struct block *blk = req->blk; + struct piece *pc = blk->pc; + + off_t cbegin = index * p->tp->meta.piece_length + begin; + torrent_put_bytes(p->tp, data, cbegin, length); + + set_bit(pc->have_field, begin / PIECE_BLOCKLEN); + pc->ngot++; + + if (tp->endgame) { + struct block_request *req; + struct net_buf *cancel = nb_create_cancel(index, begin, length); + nb_hold(cancel); + BTPDQ_FOREACH(req, &blk->reqs, blk_entry) { + if (req->p != p) + peer_cancel(req->p, req, cancel); + pc->nreqs--; + } + nb_drop(cancel); + dl_piece_reorder_eg(pc); + req = BTPDQ_FIRST(&blk->reqs); + while (req != NULL) { + struct block_request *next = BTPDQ_NEXT(req, blk_entry); + if (peer_leech_ok(req->p) && !peer_laden(req->p)) + dl_assign_requests_eg(req->p); + free(req); + req = next; + } + BTPDQ_INIT(&blk->reqs); + if (pc->ngot == pc->nblocks) + dl_on_piece(pc); + } else { + BTPDQ_REMOVE(&blk->reqs, req, blk_entry); + free(req); + pc->nreqs--; + // XXX: Needs to be looked at if we introduce snubbing. + clear_bit(pc->down_field, begin / PIECE_BLOCKLEN); + pc->nbusy--; + if (pc->ngot == pc->nblocks) + dl_on_piece(pc); + if (peer_leech_ok(p) && !peer_laden(p)) + dl_assign_requests(p); + } +} diff --git a/btpd/download.h b/btpd/download.h new file mode 100644 index 0000000..cbb83f5 --- /dev/null +++ b/btpd/download.h @@ -0,0 +1,37 @@ +#ifndef BTPD_DOWNLOAD_H +#define BTPD_DOWNLOAD_H + +// download_subr.c + +int piece_full(struct piece *pc); +void piece_free(struct piece *pc); + +void dl_on_piece_unfull(struct piece *pc); +void dl_on_piece(struct piece *pc); + +struct piece *dl_new_piece(struct torrent *tp, uint32_t index); +struct piece *dl_find_piece(struct torrent *tp, uint32_t index); +unsigned dl_piece_assign_requests(struct piece *pc, struct peer *p); +unsigned dl_assign_requests(struct peer *p); +void dl_assign_requests_eg(struct peer *p); +void dl_unassign_requests(struct peer *p); +void dl_unassign_requests_eg(struct peer *p); +void dl_piece_reorder_eg(struct piece *pc); + +// download.c + +void dl_on_new_peer(struct peer *p); +void dl_on_lost_peer(struct peer *p); + +void dl_on_choke(struct peer *p); +void dl_on_unchoke(struct peer *p); +void dl_on_download(struct peer *p); +void dl_on_undownload(struct peer *p); +void dl_on_piece_ann(struct peer *p, uint32_t index); +void dl_on_block(struct peer *p, struct block_request *req, + uint32_t index, uint32_t begin, uint32_t length, const char *data); + +void dl_on_ok_piece(struct piece *pc); +void dl_on_bad_piece(struct piece *pc); + +#endif diff --git a/btpd/download_subr.c b/btpd/download_subr.c new file mode 100644 index 0000000..f0429f0 --- /dev/null +++ b/btpd/download_subr.c @@ -0,0 +1,586 @@ +/* + * The commandments: + * + * A peer is wanted except when it only has pieces we've already + * downloaded or fully requested. Thus, a peer's wanted count is + * increased for each missing or unfull piece it announces, or + * when a piece it has becomes unfull. + * + * When a peer we want unchokes us, requests will primarily + * be put on pieces we're already downloading and then on + * possible new pieces. + * + * When choosing between several different new pieces to start + * downloading, the rarest piece will be chosen. + * + * End game mode sets in when all missing blocks are requested. + * In end game mode no piece is counted as full unless it's + * downloaded. + * + */ + +#include +#include +#include +#include + +#include + +#include "btpd.h" +#include "stream.h" + +static struct piece * +piece_alloc(struct torrent *tp, uint32_t index) +{ + assert(!has_bit(tp->busy_field, index) + && tp->npcs_busy < tp->meta.npieces); + struct piece *pc; + size_t mem, field, blocks; + unsigned nblocks; + off_t piece_length = torrent_piece_size(tp, index); + + nblocks = (unsigned)ceil((double)piece_length / PIECE_BLOCKLEN); + blocks = sizeof(pc->blocks[0]) * nblocks; + field = (size_t)ceil(nblocks / 8.0); + mem = sizeof(*pc) + field + blocks; + + pc = btpd_calloc(1, mem); + pc->tp = tp; + pc->down_field = (uint8_t *)(pc + 1); + pc->have_field = + tp->block_field + + index * (size_t)ceil(tp->meta.piece_length / (double)(1 << 17)); + + pc->index = index; + pc->nblocks = nblocks; + + pc->nreqs = 0; + pc->next_block = 0; + + for (unsigned i = 0; i < nblocks; i++) + if (has_bit(pc->have_field, i)) + pc->ngot++; + + pc->blocks = (struct block *)(pc->down_field + field); + for (unsigned i = 0; i < nblocks; i++) { + uint32_t start = i * PIECE_BLOCKLEN; + uint32_t len = torrent_block_size(pc, i); + struct block *blk = &pc->blocks[i]; + blk->pc = pc; + BTPDQ_INIT(&blk->reqs); + blk->msg = nb_create_request(index, start, len); + nb_hold(blk->msg); + } + + tp->npcs_busy++; + set_bit(tp->busy_field, index); + BTPDQ_INSERT_HEAD(&tp->getlst, pc, entry); + return pc; +} + +void +piece_free(struct piece *pc) +{ + struct torrent *tp = pc->tp; + assert(tp->npcs_busy > 0); + tp->npcs_busy--; + clear_bit(tp->busy_field, pc->index); + BTPDQ_REMOVE(&pc->tp->getlst, pc, entry); + for (unsigned i = 0; i < pc->nblocks; i++) { + struct block_request *req = BTPDQ_FIRST(&pc->blocks[i].reqs); + while (req != NULL) { + struct block_request *next = BTPDQ_NEXT(req, blk_entry); + free(req); + req = next; + } + nb_drop(pc->blocks[i].msg); + } + free(pc); +} + +int +piece_full(struct piece *pc) +{ + return pc->ngot + pc->nbusy == pc->nblocks; +} + +static int +dl_should_enter_endgame(struct torrent *tp) +{ + int should; + if (tp->have_npieces + tp->npcs_busy == tp->meta.npieces) { + should = 1; + struct piece *pc; + BTPDQ_FOREACH(pc, &tp->getlst, entry) { + if (!piece_full(pc)) { + should = 0; + break; + } + } + } else + should = 0; + return should; +} + +static void +dl_piece_insert_eg(struct piece *pc) +{ + struct piece_tq *getlst = &pc->tp->getlst; + if (pc->nblocks == pc->ngot) + BTPDQ_INSERT_TAIL(getlst, pc, entry); + else { + unsigned r = pc->nreqs / (pc->nblocks - pc->ngot); + struct piece *it; + BTPDQ_FOREACH(it, getlst, entry) { + if ((it->nblocks == it->ngot + || r < it->nreqs / (it->nblocks - it->ngot))) { + BTPDQ_INSERT_BEFORE(it, pc, entry); + break; + } + } + if (it == NULL) + BTPDQ_INSERT_TAIL(getlst, pc, entry); + } +} + +void +dl_piece_reorder_eg(struct piece *pc) +{ + BTPDQ_REMOVE(&pc->tp->getlst, pc, entry); + dl_piece_insert_eg(pc); +} + +static void +dl_enter_endgame(struct torrent *tp) +{ + struct peer *p; + struct piece *pc; + struct piece *pcs[tp->npcs_busy]; + unsigned pi; + + btpd_log(BTPD_L_POL, "Entering end game\n"); + tp->endgame = 1; + + pi = 0; + BTPDQ_FOREACH(pc, &tp->getlst, entry) { + for (unsigned i = 0; i < pc->nblocks; i++) + clear_bit(pc->down_field, i); + pc->nbusy = 0; + pcs[pi] = pc; + pi++; + } + BTPDQ_INIT(&tp->getlst); + while (pi > 0) { + pi--; + dl_piece_insert_eg(pcs[pi]); + } + BTPDQ_FOREACH(p, &tp->peers, p_entry) { + assert(p->nwant == 0); + BTPDQ_FOREACH(pc, &tp->getlst, entry) { + if (peer_has(p, pc->index)) + peer_want(p, pc->index); + } + if (p->nwant > 0 && peer_leech_ok(p) && !peer_laden(p)) + dl_assign_requests_eg(p); + } +} + +struct piece * +dl_find_piece(struct torrent *tp, uint32_t index) +{ + struct piece *pc; + BTPDQ_FOREACH(pc, &tp->getlst, entry) + if (pc->index == index) + break; + return pc; +} + +static int +test_hash(struct torrent *tp, uint8_t *hash, unsigned long index) +{ + if (tp->meta.piece_hash != NULL) + return memcmp(hash, tp->meta.piece_hash[index], SHA_DIGEST_LENGTH); + else { + char piece_hash[SHA_DIGEST_LENGTH]; + int fd; + int bufi; + int err; + + err = vopen(&fd, O_RDONLY, "%s/torrent", tp->relpath); + if (err != 0) + btpd_err("test_hash: %s\n", strerror(err)); + + err = lseek(fd, tp->meta.pieces_off + index * SHA_DIGEST_LENGTH, + SEEK_SET); + if (err < 0) + btpd_err("test_hash: %s\n", strerror(errno)); + + bufi = 0; + while (bufi < SHA_DIGEST_LENGTH) { + ssize_t nread = + read(fd, piece_hash + bufi, SHA_DIGEST_LENGTH - bufi); + bufi += nread; + } + close(fd); + + return memcmp(hash, piece_hash, SHA_DIGEST_LENGTH); + } +} + +static int +ro_fd_cb(const char *path, int *fd, void *arg) +{ + struct torrent *tp = arg; + return vopen(fd, O_RDONLY, "%s/content/%s", tp->relpath, path); +} + +static void +torrent_test_piece(struct piece *pc) +{ + struct torrent *tp = pc->tp; + int err; + uint8_t hash[20]; + struct bt_stream_ro *bts; + off_t plen = torrent_piece_size(tp, pc->index); + + if ((bts = bts_open_ro(&tp->meta, pc->index * tp->meta.piece_length, + ro_fd_cb, tp)) == NULL) + btpd_err("Out of memory.\n"); + + if ((err = bts_sha(bts, plen, hash)) != 0) + btpd_err("Ouch! %s\n", strerror(err)); + + bts_close_ro(bts); + + if (test_hash(tp, hash, pc->index) == 0) + dl_on_ok_piece(pc); + else + dl_on_bad_piece(pc); +} + +void +dl_on_piece(struct piece *pc) +{ + torrent_test_piece(pc); +} + +static int +dl_piece_startable(struct peer *p, uint32_t index) +{ + return peer_has(p, index) && !has_bit(p->tp->piece_field, index) + && !has_bit(p->tp->busy_field, index); +} + +/* + * Find the rarest piece the peer has, that isn't already allocated + * for download or already downloaded. If no such piece can be found + * return ENOENT. + * + * Return 0 or ENOENT, index in res. + */ +static int +dl_choose_rarest(struct peer *p, uint32_t *res) +{ + uint32_t i; + struct torrent *tp = p->tp; + + assert(tp->endgame == 0); + + for (i = 0; i < tp->meta.npieces && !dl_piece_startable(p, i); i++) + ; + + if (i == tp->meta.npieces) + return ENOENT; + + uint32_t min_i = i; + uint32_t min_c = 1; + for(i++; i < tp->meta.npieces; i++) { + if (dl_piece_startable(p, i)) { + if (tp->piece_count[i] == tp->piece_count[min_i]) + min_c++; + else if (tp->piece_count[i] < tp->piece_count[min_i]) { + min_i = i; + min_c = 1; + } + } + } + if (min_c > 1) { + min_c = 1 + rint((double)random() * (min_c - 1) / RAND_MAX); + for (i = min_i; min_c > 0; i++) { + if (dl_piece_startable(p, i) + && tp->piece_count[i] == tp->piece_count[min_i]) { + min_c--; + min_i = i; + } + } + } + *res = min_i; + return 0; +} + +/* + * Called from either dl_piece_assign_requests or dl_new_piece, + * when a pice becomes full. The wanted level of the peers + * that has this piece will be decreased. This function is + * the only one that may trigger end game. + */ +static void +dl_on_piece_full(struct piece *pc) +{ + struct peer *p; + BTPDQ_FOREACH(p, &pc->tp->peers, p_entry) { + if (peer_has(p, pc->index)) + peer_unwant(p, pc->index); + } + if (dl_should_enter_endgame(pc->tp)) + dl_enter_endgame(pc->tp); +} + +/* + * Allocate the piece indicated by the index for download. + * There's a small possibility that a piece is fully downloaded + * but haven't been tested. If such is the case the piece will + * be tested and NULL will be returned. Also, we might then enter + * end game. + * + * Return the piece or NULL. + */ +struct piece * +dl_new_piece(struct torrent *tp, uint32_t index) +{ + btpd_log(BTPD_L_POL, "Started on piece %u.\n", index); + struct piece *pc = piece_alloc(tp, index); + if (pc->ngot == pc->nblocks) { + dl_on_piece_full(pc); + dl_on_piece(pc); + if (dl_should_enter_endgame(tp)) + dl_enter_endgame(tp); + return NULL; + } else + return pc; +} + +/* + * Called when a previously full piece loses a peer. + * This is needed because we have decreased the wanted + * level for the peers that have this piece when it got + * full. Thus we have to increase the wanted level and + * try to assign requests for this piece. + */ +void +dl_on_piece_unfull(struct piece *pc) +{ + struct torrent *tp = pc->tp; + struct peer *p; + assert(!piece_full(pc) && tp->endgame == 0); + BTPDQ_FOREACH(p, &tp->peers, p_entry) + if (peer_has(p, pc->index)) + peer_want(p, pc->index); + p = BTPDQ_FIRST(&tp->peers); + while (p != NULL && !piece_full(pc)) { + if (peer_leech_ok(p) && !peer_laden(p)) + dl_piece_assign_requests(pc, p); // Cannot provoke end game here. + p = BTPDQ_NEXT(p, p_entry); + } +} + +#define INCNEXTBLOCK(pc) \ + (pc)->next_block = ((pc)->next_block + 1) % (pc)->nblocks + + +/* + * Request as many blocks as possible on this piece from + * the peer. If the piece becomes full we call dl_on_piece_full. + * + * Return the number of requests sent. + */ +unsigned +dl_piece_assign_requests(struct piece *pc, struct peer *p) +{ + assert(!piece_full(pc) && !peer_laden(p)); + unsigned count = 0; + do { + while ((has_bit(pc->have_field, pc->next_block) + || has_bit(pc->down_field, pc->next_block))) + INCNEXTBLOCK(pc); + + struct block *blk = &pc->blocks[pc->next_block]; + struct block_request *req = btpd_malloc(sizeof(*req)); + req->p = p; + req->blk = blk; + BTPDQ_INSERT_TAIL(&blk->reqs, req, blk_entry); + + peer_request(p, req); + + set_bit(pc->down_field, pc->next_block); + pc->nbusy++; + pc->nreqs++; + count++; + INCNEXTBLOCK(pc); + } while (!piece_full(pc) && !peer_laden(p)); + + if (piece_full(pc)) + dl_on_piece_full(pc); + + return count; +} + +/* + * Request as many blocks as possible from the peer. Puts + * requests on already active pieces before starting on new + * ones. Care must be taken since end game mode may be triggered + * by the calls to dl_piece_assign_requests. + * + * Returns number of requests sent. + * + * XXX: should do something smart when deciding on which + * already started piece to put requests on. + */ +unsigned +dl_assign_requests(struct peer *p) +{ + assert(!p->tp->endgame && !peer_laden(p)); + struct piece *pc; + struct torrent *tp = p->tp; + unsigned count = 0; + BTPDQ_FOREACH(pc, &tp->getlst, entry) { + if (piece_full(pc) || !peer_has(p, pc->index)) + continue; + count += dl_piece_assign_requests(pc, p); + if (tp->endgame) + break; + if (!piece_full(pc)) + assert(peer_laden(p)); + if (peer_laden(p)) + break; + } + while (!peer_laden(p) && !tp->endgame) { + uint32_t index; + if (dl_choose_rarest(p, &index) == 0) { + pc = dl_new_piece(tp, index); + if (pc != NULL) + count += dl_piece_assign_requests(pc, p); + } else + break; + } + return count; +} + +void +dl_unassign_requests(struct peer *p) +{ + while (p->nreqs_out > 0) { + struct block_request *req = BTPDQ_FIRST(&p->my_reqs); + struct piece *pc = req->blk->pc; + int was_full = piece_full(pc); + + while (req != NULL) { + struct block_request *next = BTPDQ_NEXT(req, p_entry); + + uint32_t blki = nb_get_begin(req->blk->msg) / PIECE_BLOCKLEN; + struct block *blk = req->blk; + // XXX: Needs to be looked at if we introduce snubbing. + assert(has_bit(pc->down_field, blki)); + clear_bit(pc->down_field, blki); + pc->nbusy--; + BTPDQ_REMOVE(&p->my_reqs, req, p_entry); + p->nreqs_out--; + BTPDQ_REMOVE(&blk->reqs, req, blk_entry); + free(req); + pc->nreqs--; + + while (next != NULL && next->blk->pc != pc) + next = BTPDQ_NEXT(next, p_entry); + req = next; + } + + if (was_full && !piece_full(pc)) + dl_on_piece_unfull(pc); + } + assert(BTPDQ_EMPTY(&p->my_reqs)); +} + +static void +dl_piece_assign_requests_eg(struct piece *pc, struct peer *p) +{ + unsigned first_block = pc->next_block; + do { + if ((has_bit(pc->have_field, pc->next_block) + || peer_requested(p, &pc->blocks[pc->next_block]))) { + INCNEXTBLOCK(pc); + continue; + } + struct block_request *req = btpd_calloc(1, sizeof(*req)); + req->blk = &pc->blocks[pc->next_block]; + req->p = p; + BTPDQ_INSERT_TAIL(&pc->blocks[pc->next_block].reqs, req, blk_entry); + pc->nreqs++; + INCNEXTBLOCK(pc); + peer_request(p, req); + } while (!peer_laden(p) && pc->next_block != first_block); +} + +void +dl_assign_requests_eg(struct peer *p) +{ + assert(!peer_laden(p)); + struct torrent *tp = p->tp; + struct piece_tq tmp; + BTPDQ_INIT(&tmp); + + struct piece *pc = BTPDQ_FIRST(&tp->getlst); + while (!peer_laden(p) && pc != NULL) { + struct piece *next = BTPDQ_NEXT(pc, entry); + if (peer_has(p, pc->index) && pc->nblocks != pc->ngot) { + dl_piece_assign_requests_eg(pc, p); + BTPDQ_REMOVE(&tp->getlst, pc, entry); + BTPDQ_INSERT_HEAD(&tmp, pc, entry); + } + pc = next; + } + + pc = BTPDQ_FIRST(&tmp); + while (pc != NULL) { + struct piece *next = BTPDQ_NEXT(pc, entry); + dl_piece_insert_eg(pc); + pc = next; + } +} + +void +dl_unassign_requests_eg(struct peer *p) +{ + struct block_request *req; + struct piece *pc; + struct piece_tq tmp; + BTPDQ_INIT(&tmp); + + while (p->nreqs_out > 0) { + req = BTPDQ_FIRST(&p->my_reqs); + + pc = req->blk->pc; + BTPDQ_REMOVE(&pc->tp->getlst, pc, entry); + BTPDQ_INSERT_HEAD(&tmp, pc, entry); + + while (req != NULL) { + struct block_request *next = BTPDQ_NEXT(req, p_entry); + BTPDQ_REMOVE(&p->my_reqs, req, p_entry); + p->nreqs_out--; + BTPDQ_REMOVE(&req->blk->reqs, req, blk_entry); + free(req); + pc->nreqs--; + + while (next != NULL && next->blk->pc != pc) + next = BTPDQ_NEXT(next, p_entry); + req = next; + } + } + assert(BTPDQ_EMPTY(&p->my_reqs)); + + pc = BTPDQ_FIRST(&tmp); + while (pc != NULL) { + struct piece *next = BTPDQ_NEXT(pc, entry); + dl_piece_insert_eg(pc); + pc = next; + } +} diff --git a/btpd/opts.h b/btpd/opts.h index d2d6460..cdf698f 100644 --- a/btpd/opts.h +++ b/btpd/opts.h @@ -1,3 +1,6 @@ +#ifndef BTPD_OPTS_H +#define BTPD_OPTS_H + extern short btpd_daemon; extern const char *btpd_dir; extern uint32_t btpd_logmask; @@ -5,3 +8,5 @@ extern unsigned net_max_peers; extern unsigned net_bw_limit_in; extern unsigned net_bw_limit_out; extern int net_port; + +#endif diff --git a/btpd/peer.c b/btpd/peer.c index 74bf7e8..10e18c9 100644 --- a/btpd/peer.c +++ b/btpd/peer.c @@ -383,7 +383,7 @@ peer_on_interest(struct peer *p) return; else { p->flags |= PF_P_WANT; - dl_on_interest(p); + ul_on_interest(p); } } @@ -395,7 +395,7 @@ peer_on_uninterest(struct peer *p) return; else { p->flags &= ~PF_P_WANT; - dl_on_uninterest(p); + ul_on_uninterest(p); } } diff --git a/btpd/peer.h b/btpd/peer.h index a25c73b..38b053c 100644 --- a/btpd/peer.h +++ b/btpd/peer.h @@ -61,7 +61,7 @@ struct peer { } net; BTPDQ_ENTRY(peer) p_entry; - + BTPDQ_ENTRY(peer) ul_entry; BTPDQ_ENTRY(peer) rq_entry; BTPDQ_ENTRY(peer) wq_entry; }; diff --git a/btpd/policy.h b/btpd/policy.h deleted file mode 100644 index 5adf4a1..0000000 --- a/btpd/policy.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef BTPD_POLICY_H -#define BTPD_POLICY_H - -// policy_choke.c - -void choke_alg(struct torrent *tp); -void next_optimistic(struct torrent *tp, struct peer *np); - -// policy_subr.c - -int piece_full(struct piece *pc); -void piece_free(struct piece *pc); - -void dl_on_piece_unfull(struct piece *pc); -void dl_on_piece(struct piece *pc); - -struct piece *dl_new_piece(struct torrent *tp, uint32_t index); -struct piece *dl_find_piece(struct torrent *tp, uint32_t index); -unsigned dl_piece_assign_requests(struct piece *pc, struct peer *p); -unsigned dl_assign_requests(struct peer *p); -void dl_assign_requests_eg(struct peer *p); -void dl_unassign_requests(struct peer *p); -void dl_unassign_requests_eg(struct peer *p); -void dl_piece_reorder_eg(struct piece *pc); - -// policy_if.c - -void dl_by_second(struct torrent *tp); - -void dl_on_new_peer(struct peer *p); -void dl_on_lost_peer(struct peer *p); - -void dl_on_choke(struct peer *p); -void dl_on_unchoke(struct peer *p); -void dl_on_upload(struct peer *p); -void dl_on_unupload(struct peer *p); -void dl_on_interest(struct peer *p); -void dl_on_uninterest(struct peer *p); -void dl_on_download(struct peer *p); -void dl_on_undownload(struct peer *p); -void dl_on_piece_ann(struct peer *p, uint32_t index); -void dl_on_block(struct peer *p, struct block_request *req, - uint32_t index, uint32_t begin, uint32_t length, const char *data); - -void dl_on_ok_piece(struct piece *pc); -void dl_on_bad_piece(struct piece *pc); - -#endif diff --git a/btpd/policy_choke.c b/btpd/policy_choke.c deleted file mode 100644 index 6da3ae3..0000000 --- a/btpd/policy_choke.c +++ /dev/null @@ -1,90 +0,0 @@ -#include "btpd.h" - -static int -rate_cmp(long rate1, long rate2) -{ - if (rate1 < rate2) - return -1; - else if (rate1 == rate2) - return 0; - else - return 1; -} - -static int -dwnrate_cmp(const void *p1, const void *p2) -{ - long rate1 = (*(struct peer **)p1)->rate_dwn; - long rate2 = (*(struct peer **)p2)->rate_dwn; - return rate_cmp(rate1, rate2); -} - -static int -uprate_cmp(const void *p1, const void *p2) -{ - long rate1 = (*(struct peer **)p1)->rate_up; - long rate2 = (*(struct peer **)p2)->rate_up; - return rate_cmp(rate1, rate2); -} - -void -choke_alg(struct torrent *tp) -{ - assert(tp->npeers > 0); - - int i; - struct peer *p; - struct peer *psort[tp->npeers]; - - i = 0; - BTPDQ_FOREACH(p, &tp->peers, p_entry) - psort[i++] = p; - - if (tp->have_npieces == tp->meta.npieces) - qsort(psort, tp->npeers, sizeof(p), uprate_cmp); - else - qsort(psort, tp->npeers, sizeof(p), dwnrate_cmp); - - tp->ndown = 0; - if (tp->optimistic != NULL) { - if (tp->optimistic->flags & PF_I_CHOKE) - peer_unchoke(tp->optimistic); - if (tp->optimistic->flags & PF_P_WANT) - tp->ndown = 1; - } - - for (i = tp->npeers - 1; i >= 0; i--) { - if (psort[i] == tp->optimistic) - continue; - if (tp->ndown < 4) { - if (psort[i]->flags & PF_P_WANT) - tp->ndown++; - if (psort[i]->flags & PF_I_CHOKE) - peer_unchoke(psort[i]); - } else { - if ((psort[i]->flags & PF_I_CHOKE) == 0) - peer_choke(psort[i]); - } - } - - tp->choke_time = btpd_seconds + 10; -} - -void -next_optimistic(struct torrent *tp, struct peer *np) -{ - if (np != NULL) - tp->optimistic = np; - else if (tp->optimistic == NULL) - tp->optimistic = BTPDQ_FIRST(&tp->peers); - else { - np = BTPDQ_NEXT(tp->optimistic, p_entry); - if (np != NULL) - tp->optimistic = np; - else - tp->optimistic = BTPDQ_FIRST(&tp->peers); - } - assert(tp->optimistic != NULL); - choke_alg(tp); - tp->opt_time = btpd_seconds + 30; -} diff --git a/btpd/policy_if.c b/btpd/policy_if.c deleted file mode 100644 index 164e2a3..0000000 --- a/btpd/policy_if.c +++ /dev/null @@ -1,287 +0,0 @@ -#include -#include - -#include "btpd.h" -#include "tracker_req.h" - -void -dl_by_second(struct torrent *tp) -{ - if (btpd_seconds == tp->tracker_time) - tracker_req(tp, TR_EMPTY); - - if (btpd_seconds == tp->opt_time) - next_optimistic(tp, NULL); - - if (btpd_seconds == tp->choke_time) - choke_alg(tp); -} - -/* - * Called when a peer announces it's got a new piece. - * - * If the piece is missing or unfull we increase the peer's - * wanted level and if possible call dl_on_download. - */ -void -dl_on_piece_ann(struct peer *p, uint32_t index) -{ - struct torrent *tp = p->tp; - tp->piece_count[index]++; - if (has_bit(tp->piece_field, index)) - return; - struct piece *pc = dl_find_piece(tp, index); - if (tp->endgame) { - assert(pc != NULL); - peer_want(p, index); - if (!peer_chokes(p) && !peer_laden(p)) - dl_assign_requests_eg(p); - } else if (pc == NULL) { - peer_want(p, index); - if (!peer_chokes(p) && !peer_laden(p)) { - pc = dl_new_piece(tp, index); - if (pc != NULL) - dl_piece_assign_requests(pc, p); - } - } else if (!piece_full(pc)) { - peer_want(p, index); - if (!peer_chokes(p) && !peer_laden(p)) - dl_piece_assign_requests(pc, p); - } -} - -void -dl_on_download(struct peer *p) -{ - assert(peer_wanted(p)); - struct torrent *tp = p->tp; - if (tp->endgame) { - dl_assign_requests_eg(p); - } else { - unsigned count = dl_assign_requests(p); - if (count == 0 && !p->tp->endgame) // We may have entered end game. - assert(!peer_wanted(p) || peer_laden(p)); - } -} - -void -dl_on_unchoke(struct peer *p) -{ - if (peer_wanted(p)) - dl_on_download(p); -} - -void -dl_on_undownload(struct peer *p) -{ - if (!p->tp->endgame) - dl_unassign_requests(p); - else - dl_unassign_requests_eg(p); -} - -void -dl_on_choke(struct peer *p) -{ - if (p->nreqs_out > 0) - dl_on_undownload(p); -} - -void -dl_on_upload(struct peer *p) -{ - choke_alg(p->tp); -} - -void -dl_on_interest(struct peer *p) -{ - if ((p->flags & PF_I_CHOKE) == 0) - dl_on_upload(p); -} - -void -dl_on_unupload(struct peer *p) -{ - choke_alg(p->tp); -} - -void -dl_on_uninterest(struct peer *p) -{ - if ((p->flags & PF_I_CHOKE) == 0) - dl_on_unupload(p); -} - -/** - * Called when a piece has been tested positively. - */ -void -dl_on_ok_piece(struct piece *pc) -{ - struct peer *p; - struct torrent *tp = pc->tp; - - btpd_log(BTPD_L_POL, "Got piece: %u.\n", pc->index); - - set_bit(tp->piece_field, pc->index); - tp->have_npieces++; - msync(tp->imem, tp->isiz, MS_ASYNC); - - struct net_buf *have = nb_create_have(pc->index); - BTPDQ_FOREACH(p, &tp->peers, p_entry) - peer_send(p, have); - - if (tp->endgame) - BTPDQ_FOREACH(p, &tp->peers, p_entry) - if (peer_has(p, pc->index)) - peer_unwant(p, pc->index); - - assert(pc->nreqs == 0); - piece_free(pc); - - if (torrent_has_all(tp)) { - btpd_log(BTPD_L_BTPD, "Finished: %s.\n", tp->relpath); - tracker_req(tp, TR_COMPLETED); - BTPDQ_FOREACH(p, &tp->peers, p_entry) - assert(p->nwant == 0); - } -} - -/* - * Called when a piece has been tested negatively. - */ -void -dl_on_bad_piece(struct piece *pc) -{ - struct torrent *tp = pc->tp; - - btpd_log(BTPD_L_ERROR, "Bad hash for piece %u of %s.\n", - pc->index, tp->relpath); - - for (uint32_t i = 0; i < pc->nblocks; i++) { - clear_bit(pc->down_field, i); - clear_bit(pc->have_field, i); - } - pc->ngot = 0; - pc->nbusy = 0; - msync(tp->imem, tp->isiz, MS_ASYNC); - - if (tp->endgame) { - struct peer *p; - BTPDQ_FOREACH(p, &tp->peers, p_entry) { - if (peer_has(p, pc->index) && peer_leech_ok(p) && !peer_laden(p)) - dl_assign_requests_eg(p); - } - } else - dl_on_piece_unfull(pc); // XXX: May get bad data again. -} - -void -dl_on_new_peer(struct peer *p) -{ - struct torrent *tp = p->tp; - - tp->npeers++; - p->flags |= PF_ATTACHED; - BTPDQ_REMOVE(&net_unattached, p, p_entry); - - if (tp->npeers == 1) { - BTPDQ_INSERT_HEAD(&tp->peers, p, p_entry); - next_optimistic(tp, p); - } else { - if (random() > RAND_MAX / 3) - BTPDQ_INSERT_AFTER(&tp->peers, tp->optimistic, p, p_entry); - else - BTPDQ_INSERT_TAIL(&tp->peers, p, p_entry); - } -} - -void -dl_on_lost_peer(struct peer *p) -{ - struct torrent *tp = p->tp; - - tp->npeers--; - p->flags &= ~PF_ATTACHED; - if (tp->npeers == 0) { - BTPDQ_REMOVE(&tp->peers, p, p_entry); - tp->optimistic = NULL; - tp->choke_time = tp->opt_time = 0; - } else if (tp->optimistic == p) { - struct peer *next = BTPDQ_NEXT(p, p_entry); - BTPDQ_REMOVE(&tp->peers, p, p_entry); - next_optimistic(tp, next); - } else if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) { - BTPDQ_REMOVE(&tp->peers, p, p_entry); - dl_on_unupload(p); - } else { - BTPDQ_REMOVE(&tp->peers, p, p_entry); - } - - for (uint32_t i = 0; i < tp->meta.npieces; i++) - if (peer_has(p, i)) - tp->piece_count[i]--; - - if (p->nreqs_out > 0) - dl_on_undownload(p); -#if 0 - struct piece *pc = BTPDQ_FIRST(&tp->getlst); - while (pc != NULL) { - struct piece *next = BTPDQ_NEXT(pc, entry); - if (peer_has(p, pc->index) && tp->piece_count[pc->index] == 0) - dl_on_peerless_piece(pc); - pc = next; - } -#endif -} - -void -dl_on_block(struct peer *p, struct block_request *req, - uint32_t index, uint32_t begin, uint32_t length, const char *data) -{ - struct torrent *tp = p->tp; - struct block *blk = req->blk; - struct piece *pc = blk->pc; - - off_t cbegin = index * p->tp->meta.piece_length + begin; - torrent_put_bytes(p->tp, data, cbegin, length); - - set_bit(pc->have_field, begin / PIECE_BLOCKLEN); - pc->ngot++; - - if (tp->endgame) { - struct block_request *req; - struct net_buf *cancel = nb_create_cancel(index, begin, length); - nb_hold(cancel); - BTPDQ_FOREACH(req, &blk->reqs, blk_entry) { - if (req->p != p) - peer_cancel(req->p, req, cancel); - pc->nreqs--; - } - nb_drop(cancel); - dl_piece_reorder_eg(pc); - req = BTPDQ_FIRST(&blk->reqs); - while (req != NULL) { - struct block_request *next = BTPDQ_NEXT(req, blk_entry); - if (peer_leech_ok(req->p) && !peer_laden(req->p)) - dl_assign_requests_eg(req->p); - free(req); - req = next; - } - BTPDQ_INIT(&blk->reqs); - if (pc->ngot == pc->nblocks) - dl_on_piece(pc); - } else { - BTPDQ_REMOVE(&blk->reqs, req, blk_entry); - free(req); - pc->nreqs--; - // XXX: Needs to be looked at if we introduce snubbing. - clear_bit(pc->down_field, begin / PIECE_BLOCKLEN); - pc->nbusy--; - if (pc->ngot == pc->nblocks) - dl_on_piece(pc); - if (peer_leech_ok(p) && !peer_laden(p)) - dl_assign_requests(p); - } -} diff --git a/btpd/policy_subr.c b/btpd/policy_subr.c deleted file mode 100644 index f0429f0..0000000 --- a/btpd/policy_subr.c +++ /dev/null @@ -1,586 +0,0 @@ -/* - * The commandments: - * - * A peer is wanted except when it only has pieces we've already - * downloaded or fully requested. Thus, a peer's wanted count is - * increased for each missing or unfull piece it announces, or - * when a piece it has becomes unfull. - * - * When a peer we want unchokes us, requests will primarily - * be put on pieces we're already downloading and then on - * possible new pieces. - * - * When choosing between several different new pieces to start - * downloading, the rarest piece will be chosen. - * - * End game mode sets in when all missing blocks are requested. - * In end game mode no piece is counted as full unless it's - * downloaded. - * - */ - -#include -#include -#include -#include - -#include - -#include "btpd.h" -#include "stream.h" - -static struct piece * -piece_alloc(struct torrent *tp, uint32_t index) -{ - assert(!has_bit(tp->busy_field, index) - && tp->npcs_busy < tp->meta.npieces); - struct piece *pc; - size_t mem, field, blocks; - unsigned nblocks; - off_t piece_length = torrent_piece_size(tp, index); - - nblocks = (unsigned)ceil((double)piece_length / PIECE_BLOCKLEN); - blocks = sizeof(pc->blocks[0]) * nblocks; - field = (size_t)ceil(nblocks / 8.0); - mem = sizeof(*pc) + field + blocks; - - pc = btpd_calloc(1, mem); - pc->tp = tp; - pc->down_field = (uint8_t *)(pc + 1); - pc->have_field = - tp->block_field + - index * (size_t)ceil(tp->meta.piece_length / (double)(1 << 17)); - - pc->index = index; - pc->nblocks = nblocks; - - pc->nreqs = 0; - pc->next_block = 0; - - for (unsigned i = 0; i < nblocks; i++) - if (has_bit(pc->have_field, i)) - pc->ngot++; - - pc->blocks = (struct block *)(pc->down_field + field); - for (unsigned i = 0; i < nblocks; i++) { - uint32_t start = i * PIECE_BLOCKLEN; - uint32_t len = torrent_block_size(pc, i); - struct block *blk = &pc->blocks[i]; - blk->pc = pc; - BTPDQ_INIT(&blk->reqs); - blk->msg = nb_create_request(index, start, len); - nb_hold(blk->msg); - } - - tp->npcs_busy++; - set_bit(tp->busy_field, index); - BTPDQ_INSERT_HEAD(&tp->getlst, pc, entry); - return pc; -} - -void -piece_free(struct piece *pc) -{ - struct torrent *tp = pc->tp; - assert(tp->npcs_busy > 0); - tp->npcs_busy--; - clear_bit(tp->busy_field, pc->index); - BTPDQ_REMOVE(&pc->tp->getlst, pc, entry); - for (unsigned i = 0; i < pc->nblocks; i++) { - struct block_request *req = BTPDQ_FIRST(&pc->blocks[i].reqs); - while (req != NULL) { - struct block_request *next = BTPDQ_NEXT(req, blk_entry); - free(req); - req = next; - } - nb_drop(pc->blocks[i].msg); - } - free(pc); -} - -int -piece_full(struct piece *pc) -{ - return pc->ngot + pc->nbusy == pc->nblocks; -} - -static int -dl_should_enter_endgame(struct torrent *tp) -{ - int should; - if (tp->have_npieces + tp->npcs_busy == tp->meta.npieces) { - should = 1; - struct piece *pc; - BTPDQ_FOREACH(pc, &tp->getlst, entry) { - if (!piece_full(pc)) { - should = 0; - break; - } - } - } else - should = 0; - return should; -} - -static void -dl_piece_insert_eg(struct piece *pc) -{ - struct piece_tq *getlst = &pc->tp->getlst; - if (pc->nblocks == pc->ngot) - BTPDQ_INSERT_TAIL(getlst, pc, entry); - else { - unsigned r = pc->nreqs / (pc->nblocks - pc->ngot); - struct piece *it; - BTPDQ_FOREACH(it, getlst, entry) { - if ((it->nblocks == it->ngot - || r < it->nreqs / (it->nblocks - it->ngot))) { - BTPDQ_INSERT_BEFORE(it, pc, entry); - break; - } - } - if (it == NULL) - BTPDQ_INSERT_TAIL(getlst, pc, entry); - } -} - -void -dl_piece_reorder_eg(struct piece *pc) -{ - BTPDQ_REMOVE(&pc->tp->getlst, pc, entry); - dl_piece_insert_eg(pc); -} - -static void -dl_enter_endgame(struct torrent *tp) -{ - struct peer *p; - struct piece *pc; - struct piece *pcs[tp->npcs_busy]; - unsigned pi; - - btpd_log(BTPD_L_POL, "Entering end game\n"); - tp->endgame = 1; - - pi = 0; - BTPDQ_FOREACH(pc, &tp->getlst, entry) { - for (unsigned i = 0; i < pc->nblocks; i++) - clear_bit(pc->down_field, i); - pc->nbusy = 0; - pcs[pi] = pc; - pi++; - } - BTPDQ_INIT(&tp->getlst); - while (pi > 0) { - pi--; - dl_piece_insert_eg(pcs[pi]); - } - BTPDQ_FOREACH(p, &tp->peers, p_entry) { - assert(p->nwant == 0); - BTPDQ_FOREACH(pc, &tp->getlst, entry) { - if (peer_has(p, pc->index)) - peer_want(p, pc->index); - } - if (p->nwant > 0 && peer_leech_ok(p) && !peer_laden(p)) - dl_assign_requests_eg(p); - } -} - -struct piece * -dl_find_piece(struct torrent *tp, uint32_t index) -{ - struct piece *pc; - BTPDQ_FOREACH(pc, &tp->getlst, entry) - if (pc->index == index) - break; - return pc; -} - -static int -test_hash(struct torrent *tp, uint8_t *hash, unsigned long index) -{ - if (tp->meta.piece_hash != NULL) - return memcmp(hash, tp->meta.piece_hash[index], SHA_DIGEST_LENGTH); - else { - char piece_hash[SHA_DIGEST_LENGTH]; - int fd; - int bufi; - int err; - - err = vopen(&fd, O_RDONLY, "%s/torrent", tp->relpath); - if (err != 0) - btpd_err("test_hash: %s\n", strerror(err)); - - err = lseek(fd, tp->meta.pieces_off + index * SHA_DIGEST_LENGTH, - SEEK_SET); - if (err < 0) - btpd_err("test_hash: %s\n", strerror(errno)); - - bufi = 0; - while (bufi < SHA_DIGEST_LENGTH) { - ssize_t nread = - read(fd, piece_hash + bufi, SHA_DIGEST_LENGTH - bufi); - bufi += nread; - } - close(fd); - - return memcmp(hash, piece_hash, SHA_DIGEST_LENGTH); - } -} - -static int -ro_fd_cb(const char *path, int *fd, void *arg) -{ - struct torrent *tp = arg; - return vopen(fd, O_RDONLY, "%s/content/%s", tp->relpath, path); -} - -static void -torrent_test_piece(struct piece *pc) -{ - struct torrent *tp = pc->tp; - int err; - uint8_t hash[20]; - struct bt_stream_ro *bts; - off_t plen = torrent_piece_size(tp, pc->index); - - if ((bts = bts_open_ro(&tp->meta, pc->index * tp->meta.piece_length, - ro_fd_cb, tp)) == NULL) - btpd_err("Out of memory.\n"); - - if ((err = bts_sha(bts, plen, hash)) != 0) - btpd_err("Ouch! %s\n", strerror(err)); - - bts_close_ro(bts); - - if (test_hash(tp, hash, pc->index) == 0) - dl_on_ok_piece(pc); - else - dl_on_bad_piece(pc); -} - -void -dl_on_piece(struct piece *pc) -{ - torrent_test_piece(pc); -} - -static int -dl_piece_startable(struct peer *p, uint32_t index) -{ - return peer_has(p, index) && !has_bit(p->tp->piece_field, index) - && !has_bit(p->tp->busy_field, index); -} - -/* - * Find the rarest piece the peer has, that isn't already allocated - * for download or already downloaded. If no such piece can be found - * return ENOENT. - * - * Return 0 or ENOENT, index in res. - */ -static int -dl_choose_rarest(struct peer *p, uint32_t *res) -{ - uint32_t i; - struct torrent *tp = p->tp; - - assert(tp->endgame == 0); - - for (i = 0; i < tp->meta.npieces && !dl_piece_startable(p, i); i++) - ; - - if (i == tp->meta.npieces) - return ENOENT; - - uint32_t min_i = i; - uint32_t min_c = 1; - for(i++; i < tp->meta.npieces; i++) { - if (dl_piece_startable(p, i)) { - if (tp->piece_count[i] == tp->piece_count[min_i]) - min_c++; - else if (tp->piece_count[i] < tp->piece_count[min_i]) { - min_i = i; - min_c = 1; - } - } - } - if (min_c > 1) { - min_c = 1 + rint((double)random() * (min_c - 1) / RAND_MAX); - for (i = min_i; min_c > 0; i++) { - if (dl_piece_startable(p, i) - && tp->piece_count[i] == tp->piece_count[min_i]) { - min_c--; - min_i = i; - } - } - } - *res = min_i; - return 0; -} - -/* - * Called from either dl_piece_assign_requests or dl_new_piece, - * when a pice becomes full. The wanted level of the peers - * that has this piece will be decreased. This function is - * the only one that may trigger end game. - */ -static void -dl_on_piece_full(struct piece *pc) -{ - struct peer *p; - BTPDQ_FOREACH(p, &pc->tp->peers, p_entry) { - if (peer_has(p, pc->index)) - peer_unwant(p, pc->index); - } - if (dl_should_enter_endgame(pc->tp)) - dl_enter_endgame(pc->tp); -} - -/* - * Allocate the piece indicated by the index for download. - * There's a small possibility that a piece is fully downloaded - * but haven't been tested. If such is the case the piece will - * be tested and NULL will be returned. Also, we might then enter - * end game. - * - * Return the piece or NULL. - */ -struct piece * -dl_new_piece(struct torrent *tp, uint32_t index) -{ - btpd_log(BTPD_L_POL, "Started on piece %u.\n", index); - struct piece *pc = piece_alloc(tp, index); - if (pc->ngot == pc->nblocks) { - dl_on_piece_full(pc); - dl_on_piece(pc); - if (dl_should_enter_endgame(tp)) - dl_enter_endgame(tp); - return NULL; - } else - return pc; -} - -/* - * Called when a previously full piece loses a peer. - * This is needed because we have decreased the wanted - * level for the peers that have this piece when it got - * full. Thus we have to increase the wanted level and - * try to assign requests for this piece. - */ -void -dl_on_piece_unfull(struct piece *pc) -{ - struct torrent *tp = pc->tp; - struct peer *p; - assert(!piece_full(pc) && tp->endgame == 0); - BTPDQ_FOREACH(p, &tp->peers, p_entry) - if (peer_has(p, pc->index)) - peer_want(p, pc->index); - p = BTPDQ_FIRST(&tp->peers); - while (p != NULL && !piece_full(pc)) { - if (peer_leech_ok(p) && !peer_laden(p)) - dl_piece_assign_requests(pc, p); // Cannot provoke end game here. - p = BTPDQ_NEXT(p, p_entry); - } -} - -#define INCNEXTBLOCK(pc) \ - (pc)->next_block = ((pc)->next_block + 1) % (pc)->nblocks - - -/* - * Request as many blocks as possible on this piece from - * the peer. If the piece becomes full we call dl_on_piece_full. - * - * Return the number of requests sent. - */ -unsigned -dl_piece_assign_requests(struct piece *pc, struct peer *p) -{ - assert(!piece_full(pc) && !peer_laden(p)); - unsigned count = 0; - do { - while ((has_bit(pc->have_field, pc->next_block) - || has_bit(pc->down_field, pc->next_block))) - INCNEXTBLOCK(pc); - - struct block *blk = &pc->blocks[pc->next_block]; - struct block_request *req = btpd_malloc(sizeof(*req)); - req->p = p; - req->blk = blk; - BTPDQ_INSERT_TAIL(&blk->reqs, req, blk_entry); - - peer_request(p, req); - - set_bit(pc->down_field, pc->next_block); - pc->nbusy++; - pc->nreqs++; - count++; - INCNEXTBLOCK(pc); - } while (!piece_full(pc) && !peer_laden(p)); - - if (piece_full(pc)) - dl_on_piece_full(pc); - - return count; -} - -/* - * Request as many blocks as possible from the peer. Puts - * requests on already active pieces before starting on new - * ones. Care must be taken since end game mode may be triggered - * by the calls to dl_piece_assign_requests. - * - * Returns number of requests sent. - * - * XXX: should do something smart when deciding on which - * already started piece to put requests on. - */ -unsigned -dl_assign_requests(struct peer *p) -{ - assert(!p->tp->endgame && !peer_laden(p)); - struct piece *pc; - struct torrent *tp = p->tp; - unsigned count = 0; - BTPDQ_FOREACH(pc, &tp->getlst, entry) { - if (piece_full(pc) || !peer_has(p, pc->index)) - continue; - count += dl_piece_assign_requests(pc, p); - if (tp->endgame) - break; - if (!piece_full(pc)) - assert(peer_laden(p)); - if (peer_laden(p)) - break; - } - while (!peer_laden(p) && !tp->endgame) { - uint32_t index; - if (dl_choose_rarest(p, &index) == 0) { - pc = dl_new_piece(tp, index); - if (pc != NULL) - count += dl_piece_assign_requests(pc, p); - } else - break; - } - return count; -} - -void -dl_unassign_requests(struct peer *p) -{ - while (p->nreqs_out > 0) { - struct block_request *req = BTPDQ_FIRST(&p->my_reqs); - struct piece *pc = req->blk->pc; - int was_full = piece_full(pc); - - while (req != NULL) { - struct block_request *next = BTPDQ_NEXT(req, p_entry); - - uint32_t blki = nb_get_begin(req->blk->msg) / PIECE_BLOCKLEN; - struct block *blk = req->blk; - // XXX: Needs to be looked at if we introduce snubbing. - assert(has_bit(pc->down_field, blki)); - clear_bit(pc->down_field, blki); - pc->nbusy--; - BTPDQ_REMOVE(&p->my_reqs, req, p_entry); - p->nreqs_out--; - BTPDQ_REMOVE(&blk->reqs, req, blk_entry); - free(req); - pc->nreqs--; - - while (next != NULL && next->blk->pc != pc) - next = BTPDQ_NEXT(next, p_entry); - req = next; - } - - if (was_full && !piece_full(pc)) - dl_on_piece_unfull(pc); - } - assert(BTPDQ_EMPTY(&p->my_reqs)); -} - -static void -dl_piece_assign_requests_eg(struct piece *pc, struct peer *p) -{ - unsigned first_block = pc->next_block; - do { - if ((has_bit(pc->have_field, pc->next_block) - || peer_requested(p, &pc->blocks[pc->next_block]))) { - INCNEXTBLOCK(pc); - continue; - } - struct block_request *req = btpd_calloc(1, sizeof(*req)); - req->blk = &pc->blocks[pc->next_block]; - req->p = p; - BTPDQ_INSERT_TAIL(&pc->blocks[pc->next_block].reqs, req, blk_entry); - pc->nreqs++; - INCNEXTBLOCK(pc); - peer_request(p, req); - } while (!peer_laden(p) && pc->next_block != first_block); -} - -void -dl_assign_requests_eg(struct peer *p) -{ - assert(!peer_laden(p)); - struct torrent *tp = p->tp; - struct piece_tq tmp; - BTPDQ_INIT(&tmp); - - struct piece *pc = BTPDQ_FIRST(&tp->getlst); - while (!peer_laden(p) && pc != NULL) { - struct piece *next = BTPDQ_NEXT(pc, entry); - if (peer_has(p, pc->index) && pc->nblocks != pc->ngot) { - dl_piece_assign_requests_eg(pc, p); - BTPDQ_REMOVE(&tp->getlst, pc, entry); - BTPDQ_INSERT_HEAD(&tmp, pc, entry); - } - pc = next; - } - - pc = BTPDQ_FIRST(&tmp); - while (pc != NULL) { - struct piece *next = BTPDQ_NEXT(pc, entry); - dl_piece_insert_eg(pc); - pc = next; - } -} - -void -dl_unassign_requests_eg(struct peer *p) -{ - struct block_request *req; - struct piece *pc; - struct piece_tq tmp; - BTPDQ_INIT(&tmp); - - while (p->nreqs_out > 0) { - req = BTPDQ_FIRST(&p->my_reqs); - - pc = req->blk->pc; - BTPDQ_REMOVE(&pc->tp->getlst, pc, entry); - BTPDQ_INSERT_HEAD(&tmp, pc, entry); - - while (req != NULL) { - struct block_request *next = BTPDQ_NEXT(req, p_entry); - BTPDQ_REMOVE(&p->my_reqs, req, p_entry); - p->nreqs_out--; - BTPDQ_REMOVE(&req->blk->reqs, req, blk_entry); - free(req); - pc->nreqs--; - - while (next != NULL && next->blk->pc != pc) - next = BTPDQ_NEXT(next, p_entry); - req = next; - } - } - assert(BTPDQ_EMPTY(&p->my_reqs)); - - pc = BTPDQ_FIRST(&tmp); - while (pc != NULL) { - struct piece *next = BTPDQ_NEXT(pc, entry); - dl_piece_insert_eg(pc); - pc = next; - } -} diff --git a/btpd/torrent.h b/btpd/torrent.h index f535b4c..5e0b935 100644 --- a/btpd/torrent.h +++ b/btpd/torrent.h @@ -52,10 +52,6 @@ struct torrent { uint64_t uploaded, downloaded; - unsigned long choke_time; - unsigned long opt_time; - unsigned long tracker_time; - short ndown; struct peer *optimistic; diff --git a/btpd/tracker_req.c b/btpd/tracker_req.c index 7f0a74b..32ad5a1 100644 --- a/btpd/tracker_req.c +++ b/btpd/tracker_req.c @@ -100,7 +100,7 @@ tracker_done(pid_t pid, void *arg) goto out; } - tp->tracker_time = btpd_seconds + interval; + //tp->tracker_time = btpd_seconds + interval; int error = 0; size_t length; @@ -134,7 +134,7 @@ out: "Start request failed for %s.\n", tp->relpath); torrent_unload(tp); } else - tp->tracker_time = btpd_seconds + 10; + ;//tp->tracker_time = btpd_seconds + 10; } munmap(req->res, REQ_SIZE); free(req); diff --git a/btpd/upload.c b/btpd/upload.c new file mode 100644 index 0000000..bc3af75 --- /dev/null +++ b/btpd/upload.c @@ -0,0 +1,72 @@ + +#include "btpd.h" + +static struct event m_choke_timer; +static unsigned m_npeers; +static struct peer_tq m_peerq = BTPDQ_HEAD_INITIALIZER(m_peerq); + +static void +choke_do(void) +{ + struct peer *p; + BTPDQ_FOREACH(p, &m_peerq, ul_entry) + if (p->flags & PF_I_CHOKE) + peer_unchoke(p); +} + +static void +choke_cb(int sd, short type, void *arg) +{ + evtimer_add(&m_choke_timer, (& (struct timeval) { 10, 0})); + choke_do(); +} + +void +ul_on_new_peer(struct peer *p) +{ + m_npeers++; + BTPDQ_INSERT_HEAD(&m_peerq, p, ul_entry); + choke_do(); +} + +void +ul_on_lost_peer(struct peer *p) +{ + assert(m_npeers > 0); + BTPDQ_REMOVE(&m_peerq, p, ul_entry); + m_npeers--; + if ((p->flags & (PF_P_WANT|PF_I_CHOKE)) == PF_P_WANT) + choke_do(); +} + +void +ul_on_lost_torrent(struct torrent *tp) +{ + struct peer *p; + BTPDQ_FOREACH(p, &tp->peers, p_entry) { + BTPDQ_REMOVE(&m_peerq, p, ul_entry); + m_npeers--; + } + choke_do(); +} + +void +ul_on_interest(struct peer *p) +{ + if ((p->flags & PF_I_CHOKE) == 0) + choke_do(); +} + +void +ul_on_uninterest(struct peer *p) +{ + if ((p->flags & PF_I_CHOKE) == 0) + choke_do(); +} + +void +ul_init(void) +{ + evtimer_set(&m_choke_timer, choke_cb, NULL); + evtimer_add(&m_choke_timer, (& (struct timeval) { 10, 0 })); +} diff --git a/btpd/upload.h b/btpd/upload.h new file mode 100644 index 0000000..49956ff --- /dev/null +++ b/btpd/upload.h @@ -0,0 +1,11 @@ +#ifndef BTPD_UPLOAD_H +#define BTPD_UPLOAD_H + +void ul_on_new_peer(struct peer *p); +void ul_on_lost_peer(struct peer *p); +void ul_on_lost_torrent(struct torrent *tp); +void ul_on_interest(struct peer *p); +void ul_on_uninterest(struct peer *p); +void ul_init(void); + +#endif -- cgit 1.4.1