about summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard Nyberg <rnyberg@murmeldjur.se>2006-01-22 23:10:29 +0000
committerRichard Nyberg <rnyberg@murmeldjur.se>2006-01-22 23:10:29 +0000
commit7068f34a51847c08567dee3861205a0363c81243 (patch)
treeb1eb582e4cd044ed1b7efb53a13564a251163884
parent476765a7a7809185464067fe19b4b88f6c16812e (diff)
downloadbtpd-7068f34a51847c08567dee3861205a0363c81243.tar.gz
btpd-7068f34a51847c08567dee3861205a0363c81243.zip
* Implemented the full cm_ life cycle.
* Added fast resume support. A resume file is loaded when a torrent is started
  and saved when it's stopped or done. If no resume file is found or the file
  information doesn't match whats on disk, the content is tested for existing
  pieces.
* cm_prealloc now can allocate several adjacent pieces to the given piece.
  This further reduces fragmentation. How many pieces are allocated at a
  time is controlled by cm_alloc_size which can be set by the user with
  the --prealloc option.
* Some changes were also made to the torrent api.

-rw-r--r--btpd/btpd.c2
-rw-r--r--btpd/content.c671
-rw-r--r--btpd/download_subr.c5
-rw-r--r--btpd/torrent.c45
-rw-r--r--btpd/torrent.h17
5 files changed, 569 insertions, 171 deletions
diff --git a/btpd/btpd.c b/btpd/btpd.c
index 1e10415..1ecbae8 100644
--- a/btpd/btpd.c
+++ b/btpd/btpd.c
@@ -115,7 +115,7 @@ load_library(void)
     for (int i = 0; i < ne; i++) {
         struct torrent *tp;
         struct dirent *e = entries[i];
-        if (torrent_create(&tp, e->d_name) == 0)
+        if (torrent_load(&tp, e->d_name) == 0)
             btpd_add_torrent(tp);
         free(e);
     }
diff --git a/btpd/content.c b/btpd/content.c
index 8dbc994..7a3bd96 100644
--- a/btpd/content.c
+++ b/btpd/content.c
@@ -1,6 +1,10 @@
+#include <sys/types.h>
+#include <sys/stat.h>
+
 #include <fcntl.h>
 #include <math.h>
 #include <pthread.h>
+#include <stdio.h>
 #include <string.h>
 #include <unistd.h>
 
@@ -9,24 +13,27 @@
 #include "btpd.h"
 #include "stream.h"
 
-struct data {
+struct cm_write_data {
     uint32_t begin;
     uint8_t *buf;
     size_t len;
-    BTPDQ_ENTRY(data) entry;
+    BTPDQ_ENTRY(cm_write_data) entry;
 };
 
-BTPDQ_HEAD(data_tq, data);
+BTPDQ_HEAD(cm_write_data_tq, cm_write_data);
 
 enum cm_op_type {
     CM_ALLOC,
+    CM_SAVE,
+    CM_START,
     CM_TEST,
     CM_WRITE
 };
 
 struct cm_op {
     struct torrent *tp;
-
+    int error;
+    int received;
     enum cm_op_type type;
     union {
         struct {
@@ -34,6 +41,9 @@ struct cm_op {
             uint32_t pos;
         } alloc;
         struct {
+            volatile sig_atomic_t cancel;
+        } start;
+        struct {
             uint32_t piece;
             uint32_t pos;
             int ok;
@@ -41,13 +51,10 @@ struct cm_op {
         struct {
             uint32_t piece;
             uint32_t pos;
-            struct data_tq q;
+            struct cm_write_data_tq q;
         } write;
     } u;
 
-    int error;
-    char *errmsg;
-
     BTPDQ_ENTRY(cm_op) cm_entry;
     BTPDQ_ENTRY(cm_op) td_entry;
 };
@@ -55,6 +62,8 @@ struct cm_op {
 BTPDQ_HEAD(cm_op_tq, cm_op);
 
 struct content {
+    int active;
+
     uint32_t npieces_got;
 
     off_t ncontent_bytes;
@@ -74,10 +83,14 @@ struct content {
 
 #define ZEROBUFLEN (1 << 14)
 
+struct cm_comm {
+    struct cm_op_tq q;
+    pthread_mutex_t lock;
+    pthread_cond_t cond;
+};
+
+static struct cm_comm m_long_comm, m_short_comm;
 static const uint8_t m_zerobuf[ZEROBUFLEN];
-static struct cm_op_tq m_tdq = BTPDQ_HEAD_INITIALIZER(m_tdq);
-static pthread_mutex_t m_tdq_lock;
-static pthread_cond_t m_tdq_cond;
 
 static int
 fd_cb_rd(const char *path, int *fd, void *arg)
@@ -95,6 +108,27 @@ fd_cb_wr(const char *path, int *fd, void *arg)
 }
 
 static void
+cm_td_post_common(struct cm_comm *comm, struct cm_op *op)
+{
+    pthread_mutex_lock(&comm->lock);
+    BTPDQ_INSERT_TAIL(&comm->q, op, td_entry);
+    pthread_mutex_unlock(&comm->lock);
+    pthread_cond_signal(&comm->cond);
+}
+
+static void
+cm_td_post_long(struct cm_op *op)
+{
+    cm_td_post_common(&m_long_comm, op);
+}
+
+static void
+cm_td_post_short(struct cm_op *op)
+{
+    cm_td_post_common(&m_short_comm, op);
+}
+
+static void
 run_todo(struct content *cm)
 {
     struct cm_op *op = BTPDQ_FIRST(&cm->todoq);
@@ -107,10 +141,65 @@ run_todo(struct content *cm)
         return;
     }
 
-    pthread_mutex_lock(&m_tdq_lock);
-    BTPDQ_INSERT_TAIL(&m_tdq, op, td_entry);
-    pthread_mutex_unlock(&m_tdq_lock);
-    pthread_cond_signal(&m_tdq_cond);
+    if (op->type != CM_START)
+        cm_td_post_short(op);
+    else
+        cm_td_post_long(op);
+}
+
+static void
+add_todo(struct content *cm, struct cm_op *op)
+{
+    int was_empty = BTPDQ_EMPTY(&cm->todoq);
+    BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
+    if (was_empty)
+        run_todo(cm);
+}
+
+void
+cm_destroy(struct torrent *tp)
+{
+    struct content *cm = tp->cm;
+    bts_close(cm->rds);
+    free(cm->piece_field);
+    free(cm->block_field);
+    free(cm->hold_field);
+    free(cm->pos_field);
+    tp->cm = NULL;
+    torrent_on_cm_stopped(tp);
+}
+
+void
+cm_save(struct torrent *tp)
+{
+    struct content *cm = tp->cm;
+    struct cm_op *op = btpd_calloc(1, sizeof(*op));
+    op->tp = tp;
+    op->type = CM_SAVE;
+    add_todo(cm, op);
+}
+
+void
+cm_stop(struct torrent *tp)
+{
+    struct content *cm = tp->cm;
+
+    struct cm_op *op = BTPDQ_FIRST(&cm->todoq);
+    if (op != NULL && op->type == CM_START) {
+        pthread_mutex_lock(&m_long_comm.lock);
+        if (op->received)
+            op->u.start.cancel = 1;
+        else {
+            BTPDQ_REMOVE(&m_long_comm.q, op, td_entry);
+            BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
+            free(op);
+        }
+        pthread_mutex_unlock(&m_long_comm.lock);
+    } else if (cm->npieces_got < tp->meta.npieces)
+        cm_save(tp);
+
+    if (BTPDQ_EMPTY(&cm->todoq))
+        cm_destroy(tp);
 }
 
 static void
@@ -121,19 +210,28 @@ cm_td_cb(void *arg)
     struct content *cm = tp->cm;
 
     if (op->error)
-        btpd_err("%s", op->errmsg);
+        btpd_err("IO error for %s.\n", tp->relpath);
 
     switch (op->type) {
     case CM_ALLOC:
         set_bit(cm->pos_field, op->u.alloc.pos);
         clear_bit(cm->hold_field, op->u.alloc.piece);
         break;
+    case CM_START:
+        if (cm->active) {
+            assert(!op->u.start.cancel);
+            torrent_on_cm_started(tp);
+        }
+        break;
     case CM_TEST:
         if (op->u.test.ok) {
+            assert(cm->npieces_got < tp->meta.npieces);
             cm->npieces_got++;
             set_bit(cm->piece_field, op->u.test.piece);
             if (tp->net != NULL)
                 dl_on_ok_piece(op->tp->net, op->u.test.piece);
+            if (cm_full(tp))
+                cm_save(tp);
         } else {
             cm->ncontent_bytes -= torrent_piece_size(tp, op->u.test.piece);
             bzero(cm->block_field + op->u.test.piece * cm->bppbf, cm->bppbf);
@@ -141,116 +239,16 @@ cm_td_cb(void *arg)
                 dl_on_bad_piece(tp->net, op->u.test.piece);
         }
         break;
-    default:
+    case CM_SAVE:
+    case CM_WRITE:
         break;
     }
     BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
     free(op);
     if (!BTPDQ_EMPTY(&cm->todoq))
         run_todo(cm);
-}
-
-static int
-test_hash(struct torrent *tp, uint8_t *hash, uint32_t index)
-{
-    if (tp->meta.piece_hash != NULL)
-        return bcmp(hash, tp->meta.piece_hash[index], SHA_DIGEST_LENGTH);
-    else {
-        char piece_hash[SHA_DIGEST_LENGTH];
-        int fd;
-        int err;
-
-        err = vopen(&fd, O_RDONLY, "library/%s/torrent", tp->relpath);
-        if (err != 0)
-            btpd_err("test_hash: %s\n", strerror(err));
-
-        lseek(fd, tp->meta.pieces_off + index * SHA_DIGEST_LENGTH, SEEK_SET);
-        read(fd, piece_hash, SHA_DIGEST_LENGTH);
-        close(fd);
-
-        return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
-    }
-}
-
-static void
-cm_td_alloc(struct cm_op *op)
-{
-    struct bt_stream *bts;
-    off_t len = torrent_piece_size(op->tp, op->u.alloc.pos);
-    off_t off = op->tp->meta.piece_length * op->u.alloc.pos;
-    bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp);
-    while (len > 0) {
-        size_t wlen = min(ZEROBUFLEN, len);
-        bts_put(bts, off, m_zerobuf, wlen);
-        len -= wlen;
-        off += wlen;
-    }
-    bts_close(bts);
-}
-
-static void
-cm_td_test(struct cm_op *op)
-{
-    uint8_t hash[SHA_DIGEST_LENGTH];
-    struct bt_stream *bts;
-    bts_open(&bts, &op->tp->meta, fd_cb_rd, op->tp);
-    bts_sha(bts, op->u.test.pos * op->tp->meta.piece_length,
-        torrent_piece_size(op->tp, op->u.test.piece), hash);
-    bts_close(bts);
-    op->u.test.ok = test_hash(op->tp, hash, op->u.test.piece) == 0;
-}
-
-static void
-cm_td_write(struct cm_op *op)
-{
-    struct data *d, *next;
-    off_t base = op->tp->meta.piece_length * op->u.write.pos;
-    struct bt_stream *bts;
-    bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp);
-    BTPDQ_FOREACH(d, &op->u.write.q, entry)
-        bts_put(bts, base + d->begin, d->buf, d->len);
-    bts_close(bts);
-    BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next)
-        free(d);
-}
-
-static void
-cm_td(void *arg)
-{
-    for (;;) {
-        pthread_mutex_lock(&m_tdq_lock);
-        while (BTPDQ_EMPTY(&m_tdq))
-            pthread_cond_wait(&m_tdq_cond, &m_tdq_lock);
-        struct cm_op *op = BTPDQ_FIRST(&m_tdq);
-        BTPDQ_REMOVE(&m_tdq, op, td_entry);
-        pthread_mutex_unlock(&m_tdq_lock);
-
-        switch (op->type) {
-        case CM_ALLOC:
-            cm_td_alloc(op);
-            break;
-        case CM_TEST:
-            cm_td_test(op);
-            break;
-        case CM_WRITE:
-            cm_td_write(op);
-            break;
-        default:
-            abort();
-        }
-        td_post_begin();
-        td_post(cm_td_cb, op);
-        td_post_end();
-    }
-}
-
-void
-cm_init(void)
-{
-    pthread_t td;
-    pthread_mutex_init(&m_tdq_lock, NULL);
-    pthread_cond_init(&m_tdq_cond, NULL);
-    pthread_create(&td, NULL, (void *(*)(void *))cm_td, NULL);
+    else if (!cm->active)
+        cm_destroy(tp);
 }
 
 int
@@ -259,6 +257,7 @@ cm_start(struct torrent *tp)
     int err;
     struct content *cm = btpd_calloc(1, sizeof(*cm));
     size_t pfield_size = ceil(tp->meta.npieces / 8.0);
+    cm->active = 1;
     cm->bppbf = ceil((double)tp->meta.piece_length / (1 << 17));
     cm->piece_field = btpd_calloc(pfield_size, 1);
     cm->hold_field = btpd_calloc(pfield_size, 1);
@@ -269,11 +268,12 @@ cm_start(struct torrent *tp)
 
     if ((err = bts_open(&cm->rds, &tp->meta, fd_cb_rd, tp)) != 0)
         btpd_err("Error opening stream (%s).\n", strerror(err));
-    if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0)
-        btpd_err("Error opening stream (%s).\n", strerror(err));
-
     tp->cm = cm;
-    torrent_cm_cb(tp, CM_STARTED);
+
+    struct cm_op *op = btpd_calloc(1, sizeof(*op));
+    op->tp = tp;
+    op->type = CM_START;
+    add_todo(cm, op);
     return 0;
 }
 
@@ -289,22 +289,18 @@ cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len,
     return 0;
 }
 
-void
-cm_prealloc(struct torrent *tp, uint32_t piece)
+static void
+cm_post_alloc(struct torrent *tp, uint32_t piece)
 {
     struct content *cm = tp->cm;
-    if (has_bit(cm->pos_field, piece))
-        return;
-
     set_bit(cm->hold_field, piece);
 
-    int was_empty = BTPDQ_EMPTY(&cm->todoq);
     struct cm_op *op = btpd_calloc(1, sizeof(*op));
     op->tp = tp;
     op->type = CM_ALLOC;
     op->u.alloc.piece = piece;
     op->u.alloc.pos = piece;
-    BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
+    add_todo(cm, op);
 
     op = btpd_calloc(1, sizeof(*op));
     op->tp = tp;
@@ -312,25 +308,40 @@ cm_prealloc(struct torrent *tp, uint32_t piece)
     op->u.write.piece = piece;
     op->u.write.pos = piece;
     BTPDQ_INIT(&op->u.write.q);
-    BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
+    add_todo(cm, op);
+}
 
-    if (was_empty)
-        run_todo(cm);
+void
+cm_prealloc(struct torrent *tp, uint32_t piece)
+{
+    struct content *cm = tp->cm;
+
+    if (cm_alloc_size == 0)
+        set_bit(cm->pos_field, piece);
+    else {
+        unsigned npieces = ceil((double)cm_alloc_size / tp->meta.piece_length);
+        uint32_t start = piece - piece % npieces;
+        uint32_t end = min(start + npieces, tp->meta.npieces);
+
+        while (start < end) {
+            if ((!has_bit(cm->pos_field, start)
+                    && !has_bit(cm->hold_field, start)))
+                cm_post_alloc(tp, start);
+            start++;
+        }
+    }
 }
 
 void
 cm_test_piece(struct torrent *tp, uint32_t piece)
 {
     struct content *cm = tp->cm;
-    int was_empty = BTPDQ_EMPTY(&cm->todoq);
     struct cm_op *op = btpd_calloc(1, sizeof(*op));
     op->tp = tp;
     op->type = CM_TEST;
     op->u.test.piece = piece;
     op->u.test.pos = piece;
-    BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
-    if (was_empty)
-        run_todo(cm);
+    add_todo(cm, op);
 }
 
 int
@@ -341,7 +352,7 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
     struct content *cm = tp->cm;
 
     if (has_bit(cm->hold_field, piece)) {
-        struct data *d = btpd_calloc(1, sizeof(*d) + len);
+        struct cm_write_data *d = btpd_calloc(1, sizeof(*d) + len);
         d->begin = begin;
         d->len = len;
         d->buf = (uint8_t *)(d + 1);
@@ -350,7 +361,7 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
         BTPDQ_FOREACH(op, &cm->todoq, cm_entry)
             if (op->type == CM_WRITE && op->u.write.piece == piece)
                 break;
-        struct data *it;
+        struct cm_write_data *it;
         BTPDQ_FOREACH(it, &op->u.write.q, entry)
             if (it->begin > begin) {
                 BTPDQ_INSERT_BEFORE(it, d, entry);
@@ -407,3 +418,383 @@ cm_has_piece(struct torrent *tp, uint32_t piece)
 {
     return has_bit(tp->cm->piece_field, piece);
 }
+
+static int
+test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece)
+{
+    if (tp->meta.piece_hash != NULL)
+        return bcmp(hash, tp->meta.piece_hash[piece], SHA_DIGEST_LENGTH);
+    else {
+        char piece_hash[SHA_DIGEST_LENGTH];
+        int fd;
+        int err;
+
+        err = vopen(&fd, O_RDONLY, "library/%s/torrent", tp->relpath);
+        if (err != 0)
+            btpd_err("test_hash: %s\n", strerror(err));
+
+        lseek(fd, tp->meta.pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET);
+        read(fd, piece_hash, SHA_DIGEST_LENGTH);
+        close(fd);
+
+        return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
+    }
+}
+
+static int
+test_piece(struct torrent *tp, uint32_t pos, uint32_t piece, int *ok)
+{
+    int err;
+    uint8_t hash[SHA_DIGEST_LENGTH];
+    struct bt_stream *bts;
+    if ((err = bts_open(&bts, &tp->meta, fd_cb_rd, tp)) != 0)
+        return err;
+    if ((err = bts_sha(bts, pos * tp->meta.piece_length,
+             torrent_piece_size(tp, piece), hash)) != 0)
+        return err;;
+    bts_close(bts);
+    *ok = test_hash(tp, hash, piece) == 0;
+    return 0;
+}
+
+static void
+cm_td_alloc(struct cm_op *op)
+{
+    struct torrent *tp = op->tp;
+    struct content *cm = tp->cm;
+    uint32_t pos = op->u.alloc.pos;
+    struct bt_stream *bts;
+    int err;
+
+    assert(!has_bit(cm->pos_field, pos));
+
+    if ((err = bts_open(&bts, &tp->meta, fd_cb_wr, tp)) != 0)
+        goto out;
+
+    off_t len = torrent_piece_size(tp, pos);
+    off_t off = tp->meta.piece_length * pos;
+    while (len > 0) {
+        size_t wlen = min(ZEROBUFLEN, len);
+        if ((err = bts_put(bts, off, m_zerobuf, wlen)) != 0) {
+            bts_close(bts);
+            goto out;
+        }
+        len -= wlen;
+        off += wlen;
+    }
+    err = bts_close(bts);
+out:
+    if (err != 0)
+        op->error = 1;
+}
+
+static int
+test_torrent(struct torrent *tp, volatile sig_atomic_t *cancel)
+{
+    int err;
+    FILE *fp;
+    uint8_t (*hashes)[SHA_DIGEST_LENGTH];
+    uint8_t hash[SHA_DIGEST_LENGTH];
+
+    if ((err = vfopen(&fp, "r", "library/%s/torrent", tp->relpath)) != 0)
+        return err;
+
+    hashes = btpd_malloc(tp->meta.npieces * SHA_DIGEST_LENGTH);
+    fseek(fp, tp->meta.pieces_off, SEEK_SET);
+    fread(hashes, SHA_DIGEST_LENGTH, tp->meta.npieces, fp);
+    fclose(fp);
+
+    tp->meta.piece_hash = hashes;
+
+    struct content *cm = tp->cm;
+    for (uint32_t piece = 0; piece < tp->meta.npieces; piece++) {
+        if (!has_bit(cm->pos_field, piece))
+            continue;
+        err = bts_sha(cm->rds, piece * tp->meta.piece_length,
+            torrent_piece_size(tp, piece), hash);
+        if (err != 0)
+            break;
+        if (test_hash(tp, hash, piece) == 0)
+            set_bit(tp->cm->piece_field, piece);
+        if (*cancel) {
+            err = EINTR;
+            break;
+        }
+    }
+
+    tp->meta.piece_hash = NULL;
+    free(hashes);
+    return err;
+}
+
+int
+stat_and_adjust(struct torrent *tp, struct stat ret[])
+{
+    char path[PATH_MAX];
+    for (int i = 0; i < tp->meta.nfiles; i++) {
+        snprintf(path, PATH_MAX, "library/%s/content/%s", tp->relpath,
+            tp->meta.files[i].path);
+again:
+        if (stat(path, &ret[i]) == -1) {
+            if (errno == ENOENT) {
+                ret[i].st_mtime = -1;
+                ret[i].st_size = -1;
+            } else
+                return errno;
+        }
+        if (ret[i].st_size > tp->meta.files[i].length) {
+            if (truncate(path, tp->meta.files[i].length) != 0)
+                return errno;
+            goto again;
+        }
+    }
+    return 0;
+}
+
+static int
+load_resume(struct torrent *tp, struct stat sbs[])
+{
+    int err, ver;
+    FILE *fp;
+    size_t pfsiz = ceil(tp->meta.npieces / 8.0);
+    size_t bfsiz = tp->meta.npieces * tp->cm->bppbf;
+
+    if ((err = vfopen(&fp, "r" , "library/%s/resume", tp->relpath)) != 0)
+        return err;
+
+    if (fscanf(fp, "%d\n", &ver) != 1)
+        goto invalid;
+    if (ver != 1)
+        goto invalid;
+    for (int i = 0; i < tp->meta.nfiles; i++) {
+        long long size;
+        time_t time;
+        if (fscanf(fp, "%qd %ld\n", &size, &time) != 2)
+            goto invalid;
+        if (sbs[i].st_size != size || sbs[i].st_mtime != time)
+            goto invalid;
+    }
+    if (fread(tp->cm->piece_field, 1, pfsiz, fp) != pfsiz)
+        goto invalid;
+    if (fread(tp->cm->block_field, 1, bfsiz, fp) != bfsiz)
+        goto invalid;
+    fclose(fp);
+    return 0;
+invalid:
+    fclose(fp);
+    bzero(tp->cm->piece_field, pfsiz);
+    bzero(tp->cm->block_field, bfsiz);
+    return EINVAL;
+}
+
+static int
+save_resume(struct torrent *tp)
+{
+    int err;
+    FILE *fp;
+    struct stat sbs[tp->meta.nfiles];
+    if ((err = stat_and_adjust(tp, sbs)) != 0)
+        return err;
+    if ((err = vfopen(&fp, "wb", "library/%s/resume", tp->relpath)) != 0)
+        return err;
+    fprintf(fp, "%d\n", 1);
+    for (int i = 0; i < tp->meta.nfiles; i++)
+        fprintf(fp, "%qd %ld\n", (long long)sbs[i].st_size, sbs[i].st_mtime);
+    fwrite(tp->cm->piece_field, 1, ceil(tp->meta.npieces / 8.0), fp);
+    fwrite(tp->cm->block_field, 1, tp->meta.npieces * tp->cm->bppbf, fp);
+    if (fclose(fp) != 0)
+        err = errno;
+    return err;
+}
+
+static void
+cm_td_save(struct cm_op *op)
+{
+    int err;
+    struct torrent *tp = op->tp;
+    struct content *cm = tp->cm;
+    struct bt_stream *bts = cm->wrs;
+
+    cm->wrs = NULL;
+    if ((err = bts_close(bts)) != 0)
+        goto out;
+
+    for (int i = 0; i < tp->meta.nfiles; i++) {
+        int lerr;
+        if ((lerr = vfsync("library/%s/content/%s", tp->relpath,
+                 tp->meta.files[i])) != 0 && lerr != ENOENT)
+            err = lerr;
+    }
+    if (err != 0)
+        goto out;
+    save_resume(tp);
+out:
+    if (err != 0)
+        op->error = 1;
+}
+
+static void
+cm_td_start(struct cm_op *op)
+{
+    int err;
+    struct stat sbs[op->tp->meta.nfiles];
+    struct torrent *tp = op->tp;
+    struct content *cm = tp->cm;
+
+    if ((err = stat_and_adjust(op->tp, sbs)) != 0)
+        goto out;
+
+    if (load_resume(tp, sbs) != 0) {
+        memset(cm->pos_field, 0xff, ceil(tp->meta.npieces / 8.0));
+        off_t off = 0;
+        for (int i = 0; i < tp->meta.nfiles; i++) {
+            if (sbs[i].st_size == -1 || sbs[i].st_size == 0) {
+                uint32_t start = off / tp->meta.piece_length;
+                uint32_t end = (off + tp->meta.files[i].length - 1) /
+                    tp->meta.piece_length;
+                while (start <= end) {
+                    clear_bit(cm->pos_field, start);
+                    start++;
+                }
+            } else if (sbs[i].st_size < tp->meta.files[i].length) {
+                uint32_t start = (off + sbs[i].st_size) /
+                    tp->meta.piece_length;
+                uint32_t end = (off + tp->meta.files[i].length - 1) /
+                    tp->meta.piece_length;
+                while (start <= end) {
+                    clear_bit(cm->pos_field, start);
+                    start++;
+                }
+            }
+            off += tp->meta.files[i].length;
+        }
+        if (op->u.start.cancel)
+            goto out;
+        if ((err = test_torrent(tp, &op->u.start.cancel)) != 0)
+            goto out;
+        save_resume(tp);
+    }
+
+    bzero(cm->pos_field, ceil(tp->meta.npieces / 8.0));
+    for (uint32_t piece = 0; piece < tp->meta.npieces; piece++) {
+        if (cm_has_piece(tp, piece)) {
+            cm->ncontent_bytes += torrent_piece_size(tp, piece);
+            cm->npieces_got++;
+            set_bit(cm->pos_field, piece);
+            continue;
+        }
+        uint8_t *bf = cm->block_field + cm->bppbf * piece;
+        uint32_t nblocks = torrent_piece_blocks(tp, piece);
+        uint32_t nblocks_got = 0;
+        for (uint32_t i = 0; i < nblocks; i++) {
+            if (has_bit(bf, i)) {
+                nblocks_got++;
+                cm->ncontent_bytes +=
+                    torrent_block_size(tp, piece, nblocks, i);
+            }
+        }
+        if (nblocks_got == nblocks) {
+            int ok;
+            if (((err = test_piece(tp, piece, piece, &ok)) != 0
+                    || op->u.start.cancel))
+                goto out;
+            if (ok) {
+                set_bit(cm->pos_field, piece);
+                set_bit(cm->piece_field, piece);
+            } else
+                bzero(bf, cm->bppbf);
+        } else if (nblocks_got > 0)
+            set_bit(cm->pos_field, piece);
+    }
+
+    if (cm->npieces_got < tp->meta.npieces)
+        if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0)
+            goto out;
+out:
+    if (!op->u.start.cancel && err != 0)
+        op->error = 1;
+}
+
+static void
+cm_td_test(struct cm_op *op)
+{
+    if (test_piece(op->tp, op->u.test.pos, op->u.test.piece,
+            &op->u.test.ok) != 0)
+        op->error = 1;
+}
+
+static void
+cm_td_write(struct cm_op *op)
+{
+    int err;
+    struct cm_write_data *d, *next;
+    off_t base = op->tp->meta.piece_length * op->u.write.pos;
+    struct bt_stream *bts;
+    if ((err = bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp)) != 0)
+        goto out;
+    BTPDQ_FOREACH(d, &op->u.write.q, entry)
+        if ((err = bts_put(bts, base + d->begin, d->buf, d->len)) != 0) {
+            bts_close(bts);
+            goto out;
+        }
+    err = bts_close(bts);
+out:
+    BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next)
+        free(d);
+    if (err)
+        op->error = 1;
+}
+
+static void
+cm_td(void *arg)
+{
+    struct cm_comm *comm = arg;
+    struct cm_op *op;
+    for (;;) {
+        pthread_mutex_lock(&comm->lock);
+        while (BTPDQ_EMPTY(&comm->q))
+            pthread_cond_wait(&comm->cond, &comm->lock);
+
+        op = BTPDQ_FIRST(&comm->q);
+        BTPDQ_REMOVE(&comm->q, op, td_entry);
+        op->received = 1;
+        pthread_mutex_unlock(&comm->lock);
+
+        switch (op->type) {
+        case CM_ALLOC:
+            cm_td_alloc(op);
+            break;
+        case CM_SAVE:
+            cm_td_save(op);
+            break;
+        case CM_START:
+            cm_td_start(op);
+            break;
+        case CM_TEST:
+            cm_td_test(op);
+            break;
+        case CM_WRITE:
+            cm_td_write(op);
+            break;
+        default:
+            abort();
+        }
+        td_post_begin();
+        td_post(cm_td_cb, op);
+        td_post_end();
+    }
+}
+
+void
+cm_init(void)
+{
+    pthread_t td;
+    BTPDQ_INIT(&m_long_comm.q);
+    pthread_mutex_init(&m_long_comm.lock, NULL);
+    pthread_cond_init(&m_long_comm.cond, NULL);
+    pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_long_comm);
+    BTPDQ_INIT(&m_short_comm.q);
+    pthread_mutex_init(&m_short_comm.lock, NULL);
+    pthread_cond_init(&m_short_comm.cond, NULL);
+    pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_short_comm);
+}
diff --git a/btpd/download_subr.c b/btpd/download_subr.c
index 9164e72..df2aaaf 100644
--- a/btpd/download_subr.c
+++ b/btpd/download_subr.c
@@ -37,9 +37,8 @@ piece_alloc(struct net *n, uint32_t index)
     struct piece *pc;
     size_t mem, field, blocks;
     unsigned nblocks;
-    off_t piece_length = torrent_piece_size(n->tp, index);
 
-    nblocks = (unsigned)ceil((double)piece_length / PIECE_BLOCKLEN);
+    nblocks = torrent_piece_blocks(n->tp, index);
     blocks = sizeof(pc->blocks[0]) * nblocks;
     field = (size_t)ceil(nblocks / 8.0);
     mem = sizeof(*pc) + field + blocks;
@@ -63,7 +62,7 @@ piece_alloc(struct net *n, uint32_t index)
     pc->blocks = (struct block *)(pc->down_field + field);
     for (unsigned i = 0; i < nblocks; i++) {
         uint32_t start = i * PIECE_BLOCKLEN;
-        uint32_t len = torrent_block_size(pc, i);
+        uint32_t len = torrent_block_size(n->tp, index, nblocks, i);
         struct block *blk = &pc->blocks[i];
         blk->pc = pc;
         BTPDQ_INIT(&blk->reqs);
diff --git a/btpd/torrent.c b/btpd/torrent.c
index 909b18b..c362ad5 100644
--- a/btpd/torrent.c
+++ b/btpd/torrent.c
@@ -31,13 +31,20 @@ torrent_piece_size(struct torrent *tp, uint32_t index)
 }
 
 uint32_t
-torrent_block_size(struct piece *pc, uint32_t index)
+torrent_piece_blocks(struct torrent *tp, uint32_t piece)
 {
-    if (index < pc->nblocks - 1)
+    return ceil(torrent_piece_size(tp, piece) / (double)PIECE_BLOCKLEN);
+}
+
+uint32_t
+torrent_block_size(struct torrent *tp, uint32_t piece, uint32_t nblocks,
+    uint32_t block)
+{
+    if (block < nblocks - 1)
         return PIECE_BLOCKLEN;
     else {
-        uint32_t allbutlast = PIECE_BLOCKLEN * (pc->nblocks - 1);
-        return torrent_piece_size(pc->n->tp, pc->index) - allbutlast;
+        uint32_t allbutlast = PIECE_BLOCKLEN * (nblocks - 1);
+        return torrent_piece_size(tp, piece) - allbutlast;
     }
 }
 
@@ -52,11 +59,14 @@ torrent_activate(struct torrent *tp)
 void
 torrent_deactivate(struct torrent *tp)
 {
-
+    tp->state = T_STOPPING;
+    tr_stop(tp);
+    net_del_torrent(tp);
+    cm_stop(tp);
 }
 
 int
-torrent_create(struct torrent **res, const char *path)
+torrent_load(struct torrent **res, const char *path)
 {
     struct metainfo *mi;
     int error;
@@ -88,16 +98,17 @@ torrent_create(struct torrent **res, const char *path)
     return error;
 }
 
-void torrent_cm_cb(struct torrent *tp, enum cm_state state)
+void
+torrent_on_cm_started(struct torrent *tp)
 {
-    switch (state) {
-    case CM_STARTED:
-        net_add_torrent(tp);
-        tr_start(tp);
-        break;
-    case CM_STOPPED:
-        break;
-    case CM_ERROR:
-        break;
-    }
+    net_add_torrent(tp);
+    tr_start(tp);
+    tp->state = T_ACTIVE;
+}
+
+void
+torrent_on_cm_stopped(struct torrent *tp)
+{
+    assert(tp->state == T_STOPPING);
+    tp->state = T_INACTIVE;
 }
diff --git a/btpd/torrent.h b/btpd/torrent.h
index 6ce07a4..73a26c7 100644
--- a/btpd/torrent.h
+++ b/btpd/torrent.h
@@ -25,19 +25,16 @@ struct torrent {
 
 BTPDQ_HEAD(torrent_tq, torrent);
 
-int torrent_create(struct torrent **res, const char *path);
+int torrent_load(struct torrent **res, const char *path);
 void torrent_activate(struct torrent *tp);
 void torrent_deactivate(struct torrent *tp);
 
-off_t torrent_piece_size(struct torrent *tp, uint32_t index);
-uint32_t torrent_block_size(struct piece *pc, uint32_t index);
+off_t torrent_piece_size(struct torrent *tp, uint32_t piece);
+uint32_t torrent_piece_blocks(struct torrent *tp, uint32_t piece);
+uint32_t torrent_block_size(struct torrent *tp, uint32_t piece,
+    uint32_t nblocks, uint32_t block);
 
-enum cm_state {
-    CM_STARTED,
-    CM_STOPPED,
-    CM_ERROR
-};
-
-void torrent_cm_cb(struct torrent *tp, enum cm_state state);
+void torrent_on_cm_stopped(struct torrent *tp);
+void torrent_on_cm_started(struct torrent *tp);
 
 #endif