about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--btpd/btpd.c2
-rw-r--r--btpd/content.c671
-rw-r--r--btpd/download_subr.c5
-rw-r--r--btpd/torrent.c45
-rw-r--r--btpd/torrent.h17
5 files changed, 569 insertions, 171 deletions
diff --git a/btpd/btpd.c b/btpd/btpd.c
index 1e10415..1ecbae8 100644
--- a/btpd/btpd.c
+++ b/btpd/btpd.c
@@ -115,7 +115,7 @@ load_library(void)
     for (int i = 0; i < ne; i++) {
         struct torrent *tp;
         struct dirent *e = entries[i];
-        if (torrent_create(&tp, e->d_name) == 0)
+        if (torrent_load(&tp, e->d_name) == 0)
             btpd_add_torrent(tp);
         free(e);
     }
diff --git a/btpd/content.c b/btpd/content.c
index 8dbc994..7a3bd96 100644
--- a/btpd/content.c
+++ b/btpd/content.c
@@ -1,6 +1,10 @@
+#include <sys/types.h>
+#include <sys/stat.h>
+
 #include <fcntl.h>
 #include <math.h>
 #include <pthread.h>
+#include <stdio.h>
 #include <string.h>
 #include <unistd.h>
 
@@ -9,24 +13,27 @@
 #include "btpd.h"
 #include "stream.h"
 
-struct data {
+struct cm_write_data {
     uint32_t begin;
     uint8_t *buf;
     size_t len;
-    BTPDQ_ENTRY(data) entry;
+    BTPDQ_ENTRY(cm_write_data) entry;
 };
 
-BTPDQ_HEAD(data_tq, data);
+BTPDQ_HEAD(cm_write_data_tq, cm_write_data);
 
 enum cm_op_type {
     CM_ALLOC,
+    CM_SAVE,
+    CM_START,
     CM_TEST,
     CM_WRITE
 };
 
 struct cm_op {
     struct torrent *tp;
-
+    int error;
+    int received;
     enum cm_op_type type;
     union {
         struct {
@@ -34,6 +41,9 @@ struct cm_op {
             uint32_t pos;
         } alloc;
         struct {
+            volatile sig_atomic_t cancel;
+        } start;
+        struct {
             uint32_t piece;
             uint32_t pos;
             int ok;
@@ -41,13 +51,10 @@ struct cm_op {
         struct {
             uint32_t piece;
             uint32_t pos;
-            struct data_tq q;
+            struct cm_write_data_tq q;
         } write;
     } u;
 
-    int error;
-    char *errmsg;
-
     BTPDQ_ENTRY(cm_op) cm_entry;
     BTPDQ_ENTRY(cm_op) td_entry;
 };
@@ -55,6 +62,8 @@ struct cm_op {
 BTPDQ_HEAD(cm_op_tq, cm_op);
 
 struct content {
+    int active;
+
     uint32_t npieces_got;
 
     off_t ncontent_bytes;
@@ -74,10 +83,14 @@ struct content {
 
 #define ZEROBUFLEN (1 << 14)
 
+struct cm_comm {
+    struct cm_op_tq q;
+    pthread_mutex_t lock;
+    pthread_cond_t cond;
+};
+
+static struct cm_comm m_long_comm, m_short_comm;
 static const uint8_t m_zerobuf[ZEROBUFLEN];
-static struct cm_op_tq m_tdq = BTPDQ_HEAD_INITIALIZER(m_tdq);
-static pthread_mutex_t m_tdq_lock;
-static pthread_cond_t m_tdq_cond;
 
 static int
 fd_cb_rd(const char *path, int *fd, void *arg)
@@ -95,6 +108,27 @@ fd_cb_wr(const char *path, int *fd, void *arg)
 }
 
 static void
+cm_td_post_common(struct cm_comm *comm, struct cm_op *op)
+{
+    pthread_mutex_lock(&comm->lock);
+    BTPDQ_INSERT_TAIL(&comm->q, op, td_entry);
+    pthread_mutex_unlock(&comm->lock);
+    pthread_cond_signal(&comm->cond);
+}
+
+static void
+cm_td_post_long(struct cm_op *op)
+{
+    cm_td_post_common(&m_long_comm, op);
+}
+
+static void
+cm_td_post_short(struct cm_op *op)
+{
+    cm_td_post_common(&m_short_comm, op);
+}
+
+static void
 run_todo(struct content *cm)
 {
     struct cm_op *op = BTPDQ_FIRST(&cm->todoq);
@@ -107,10 +141,65 @@ run_todo(struct content *cm)
         return;
     }
 
-    pthread_mutex_lock(&m_tdq_lock);
-    BTPDQ_INSERT_TAIL(&m_tdq, op, td_entry);
-    pthread_mutex_unlock(&m_tdq_lock);
-    pthread_cond_signal(&m_tdq_cond);
+    if (op->type != CM_START)
+        cm_td_post_short(op);
+    else
+        cm_td_post_long(op);
+}
+
+static void
+add_todo(struct content *cm, struct cm_op *op)
+{
+    int was_empty = BTPDQ_EMPTY(&cm->todoq);
+    BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
+    if (was_empty)
+        run_todo(cm);
+}
+
+void
+cm_destroy(struct torrent *tp)
+{
+    struct content *cm = tp->cm;
+    bts_close(cm->rds);
+    free(cm->piece_field);
+    free(cm->block_field);
+    free(cm->hold_field);
+    free(cm->pos_field);
+    tp->cm = NULL;
+    torrent_on_cm_stopped(tp);
+}
+
+void
+cm_save(struct torrent *tp)
+{
+    struct content *cm = tp->cm;
+    struct cm_op *op = btpd_calloc(1, sizeof(*op));
+    op->tp = tp;
+    op->type = CM_SAVE;
+    add_todo(cm, op);
+}
+
+void
+cm_stop(struct torrent *tp)
+{
+    struct content *cm = tp->cm;
+
+    struct cm_op *op = BTPDQ_FIRST(&cm->todoq);
+    if (op != NULL && op->type == CM_START) {
+        pthread_mutex_lock(&m_long_comm.lock);
+        if (op->received)
+            op->u.start.cancel = 1;
+        else {
+            BTPDQ_REMOVE(&m_long_comm.q, op, td_entry);
+            BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
+            free(op);
+        }
+        pthread_mutex_unlock(&m_long_comm.lock);
+    } else if (cm->npieces_got < tp->meta.npieces)
+        cm_save(tp);
+
+    if (BTPDQ_EMPTY(&cm->todoq))
+        cm_destroy(tp);
 }
 
 static void
@@ -121,19 +210,28 @@ cm_td_cb(void *arg)
     struct content *cm = tp->cm;
 
     if (op->error)
-        btpd_err("%s", op->errmsg);
+        btpd_err("IO error for %s.\n", tp->relpath);
 
     switch (op->type) {
     case CM_ALLOC:
         set_bit(cm->pos_field, op->u.alloc.pos);
         clear_bit(cm->hold_field, op->u.alloc.piece);
         break;
+    case CM_START:
+        if (cm->active) {
+            assert(!op->u.start.cancel);
+            torrent_on_cm_started(tp);
+        }
+        break;
     case CM_TEST:
         if (op->u.test.ok) {
+            assert(cm->npieces_got < tp->meta.npieces);
             cm->npieces_got++;
             set_bit(cm->piece_field, op->u.test.piece);
             if (tp->net != NULL)
                 dl_on_ok_piece(op->tp->net, op->u.test.piece);
+            if (cm_full(tp))
+                cm_save(tp);
         } else {
             cm->ncontent_bytes -= torrent_piece_size(tp, op->u.test.piece);
             bzero(cm->block_field + op->u.test.piece * cm->bppbf, cm->bppbf);
@@ -141,116 +239,16 @@ cm_td_cb(void *arg)
                 dl_on_bad_piece(tp->net, op->u.test.piece);
         }
         break;
-    default:
+    case CM_SAVE:
+    case CM_WRITE:
         break;
     }
     BTPDQ_REMOVE(&cm->todoq, op, cm_entry);
     free(op);
     if (!BTPDQ_EMPTY(&cm->todoq))
         run_todo(cm);
-}
-
-static int
-test_hash(struct torrent *tp, uint8_t *hash, uint32_t index)
-{
-    if (tp->meta.piece_hash != NULL)
-        return bcmp(hash, tp->meta.piece_hash[index], SHA_DIGEST_LENGTH);
-    else {
-        char piece_hash[SHA_DIGEST_LENGTH];
-        int fd;
-        int err;
-
-        err = vopen(&fd, O_RDONLY, "library/%s/torrent", tp->relpath);
-        if (err != 0)
-            btpd_err("test_hash: %s\n", strerror(err));
-
-        lseek(fd, tp->meta.pieces_off + index * SHA_DIGEST_LENGTH, SEEK_SET);
-        read(fd, piece_hash, SHA_DIGEST_LENGTH);
-        close(fd);
-
-        return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
-    }
-}
-
-static void
-cm_td_alloc(struct cm_op *op)
-{
-    struct bt_stream *bts;
-    off_t len = torrent_piece_size(op->tp, op->u.alloc.pos);
-    off_t off = op->tp->meta.piece_length * op->u.alloc.pos;
-    bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp);
-    while (len > 0) {
-        size_t wlen = min(ZEROBUFLEN, len);
-        bts_put(bts, off, m_zerobuf, wlen);
-        len -= wlen;
-        off += wlen;
-    }
-    bts_close(bts);
-}
-
-static void
-cm_td_test(struct cm_op *op)
-{
-    uint8_t hash[SHA_DIGEST_LENGTH];
-    struct bt_stream *bts;
-    bts_open(&bts, &op->tp->meta, fd_cb_rd, op->tp);
-    bts_sha(bts, op->u.test.pos * op->tp->meta.piece_length,
-        torrent_piece_size(op->tp, op->u.test.piece), hash);
-    bts_close(bts);
-    op->u.test.ok = test_hash(op->tp, hash, op->u.test.piece) == 0;
-}
-
-static void
-cm_td_write(struct cm_op *op)
-{
-    struct data *d, *next;
-    off_t base = op->tp->meta.piece_length * op->u.write.pos;
-    struct bt_stream *bts;
-    bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp);
-    BTPDQ_FOREACH(d, &op->u.write.q, entry)
-        bts_put(bts, base + d->begin, d->buf, d->len);
-    bts_close(bts);
-    BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next)
-        free(d);
-}
-
-static void
-cm_td(void *arg)
-{
-    for (;;) {
-        pthread_mutex_lock(&m_tdq_lock);
-        while (BTPDQ_EMPTY(&m_tdq))
-            pthread_cond_wait(&m_tdq_cond, &m_tdq_lock);
-        struct cm_op *op = BTPDQ_FIRST(&m_tdq);
-        BTPDQ_REMOVE(&m_tdq, op, td_entry);
-        pthread_mutex_unlock(&m_tdq_lock);
-
-        switch (op->type) {
-        case CM_ALLOC:
-            cm_td_alloc(op);
-            break;
-        case CM_TEST:
-            cm_td_test(op);
-            break;
-        case CM_WRITE:
-            cm_td_write(op);
-            break;
-        default:
-            abort();
-        }
-        td_post_begin();
-        td_post(cm_td_cb, op);
-        td_post_end();
-    }
-}
-
-void
-cm_init(void)
-{
-    pthread_t td;
-    pthread_mutex_init(&m_tdq_lock, NULL);
-    pthread_cond_init(&m_tdq_cond, NULL);
-    pthread_create(&td, NULL, (void *(*)(void *))cm_td, NULL);
+    else if (!cm->active)
+        cm_destroy(tp);
 }
 
 int
@@ -259,6 +257,7 @@ cm_start(struct torrent *tp)
     int err;
     struct content *cm = btpd_calloc(1, sizeof(*cm));
     size_t pfield_size = ceil(tp->meta.npieces / 8.0);
+    cm->active = 1;
     cm->bppbf = ceil((double)tp->meta.piece_length / (1 << 17));
     cm->piece_field = btpd_calloc(pfield_size, 1);
     cm->hold_field = btpd_calloc(pfield_size, 1);
@@ -269,11 +268,12 @@ cm_start(struct torrent *tp)
 
     if ((err = bts_open(&cm->rds, &tp->meta, fd_cb_rd, tp)) != 0)
         btpd_err("Error opening stream (%s).\n", strerror(err));
-    if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0)
-        btpd_err("Error opening stream (%s).\n", strerror(err));
-
     tp->cm = cm;
-    torrent_cm_cb(tp, CM_STARTED);
+
+    struct cm_op *op = btpd_calloc(1, sizeof(*op));
+    op->tp = tp;
+    op->type = CM_START;
+    add_todo(cm, op);
     return 0;
 }
 
@@ -289,22 +289,18 @@ cm_get_bytes(struct torrent *tp, uint32_t piece, uint32_t begin, size_t len,
     return 0;
 }
 
-void
-cm_prealloc(struct torrent *tp, uint32_t piece)
+static void
+cm_post_alloc(struct torrent *tp, uint32_t piece)
 {
     struct content *cm = tp->cm;
-    if (has_bit(cm->pos_field, piece))
-        return;
-
     set_bit(cm->hold_field, piece);
 
-    int was_empty = BTPDQ_EMPTY(&cm->todoq);
     struct cm_op *op = btpd_calloc(1, sizeof(*op));
     op->tp = tp;
     op->type = CM_ALLOC;
     op->u.alloc.piece = piece;
     op->u.alloc.pos = piece;
-    BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
+    add_todo(cm, op);
 
     op = btpd_calloc(1, sizeof(*op));
     op->tp = tp;
@@ -312,25 +308,40 @@ cm_prealloc(struct torrent *tp, uint32_t piece)
     op->u.write.piece = piece;
     op->u.write.pos = piece;
     BTPDQ_INIT(&op->u.write.q);
-    BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
+    add_todo(cm, op);
+}
 
-    if (was_empty)
-        run_todo(cm);
+void
+cm_prealloc(struct torrent *tp, uint32_t piece)
+{
+    struct content *cm = tp->cm;
+
+    if (cm_alloc_size == 0)
+        set_bit(cm->pos_field, piece);
+    else {
+        unsigned npieces = ceil((double)cm_alloc_size / tp->meta.piece_length);
+        uint32_t start = piece - piece % npieces;
+        uint32_t end = min(start + npieces, tp->meta.npieces);
+
+        while (start < end) {
+            if ((!has_bit(cm->pos_field, start)
+                    && !has_bit(cm->hold_field, start)))
+                cm_post_alloc(tp, start);
+            start++;
+        }
+    }
 }
 
 void
 cm_test_piece(struct torrent *tp, uint32_t piece)
 {
     struct content *cm = tp->cm;
-    int was_empty = BTPDQ_EMPTY(&cm->todoq);
     struct cm_op *op = btpd_calloc(1, sizeof(*op));
     op->tp = tp;
     op->type = CM_TEST;
     op->u.test.piece = piece;
     op->u.test.pos = piece;
-    BTPDQ_INSERT_TAIL(&cm->todoq, op, cm_entry);
-    if (was_empty)
-        run_todo(cm);
+    add_todo(cm, op);
 }
 
 int
@@ -341,7 +352,7 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
     struct content *cm = tp->cm;
 
     if (has_bit(cm->hold_field, piece)) {
-        struct data *d = btpd_calloc(1, sizeof(*d) + len);
+        struct cm_write_data *d = btpd_calloc(1, sizeof(*d) + len);
         d->begin = begin;
         d->len = len;
         d->buf = (uint8_t *)(d + 1);
@@ -350,7 +361,7 @@ cm_put_bytes(struct torrent *tp, uint32_t piece, uint32_t begin,
         BTPDQ_FOREACH(op, &cm->todoq, cm_entry)
             if (op->type == CM_WRITE && op->u.write.piece == piece)
                 break;
-        struct data *it;
+        struct cm_write_data *it;
         BTPDQ_FOREACH(it, &op->u.write.q, entry)
             if (it->begin > begin) {
                 BTPDQ_INSERT_BEFORE(it, d, entry);
@@ -407,3 +418,383 @@ cm_has_piece(struct torrent *tp, uint32_t piece)
 {
     return has_bit(tp->cm->piece_field, piece);
 }
+
+static int
+test_hash(struct torrent *tp, uint8_t *hash, uint32_t piece)
+{
+    if (tp->meta.piece_hash != NULL)
+        return bcmp(hash, tp->meta.piece_hash[piece], SHA_DIGEST_LENGTH);
+    else {
+        char piece_hash[SHA_DIGEST_LENGTH];
+        int fd;
+        int err;
+
+        err = vopen(&fd, O_RDONLY, "library/%s/torrent", tp->relpath);
+        if (err != 0)
+            btpd_err("test_hash: %s\n", strerror(err));
+
+        lseek(fd, tp->meta.pieces_off + piece * SHA_DIGEST_LENGTH, SEEK_SET);
+        read(fd, piece_hash, SHA_DIGEST_LENGTH);
+        close(fd);
+
+        return bcmp(hash, piece_hash, SHA_DIGEST_LENGTH);
+    }
+}
+
+static int
+test_piece(struct torrent *tp, uint32_t pos, uint32_t piece, int *ok)
+{
+    int err;
+    uint8_t hash[SHA_DIGEST_LENGTH];
+    struct bt_stream *bts;
+    if ((err = bts_open(&bts, &tp->meta, fd_cb_rd, tp)) != 0)
+        return err;
+    if ((err = bts_sha(bts, pos * tp->meta.piece_length,
+             torrent_piece_size(tp, piece), hash)) != 0)
+        return err;;
+    bts_close(bts);
+    *ok = test_hash(tp, hash, piece) == 0;
+    return 0;
+}
+
+static void
+cm_td_alloc(struct cm_op *op)
+{
+    struct torrent *tp = op->tp;
+    struct content *cm = tp->cm;
+    uint32_t pos = op->u.alloc.pos;
+    struct bt_stream *bts;
+    int err;
+
+    assert(!has_bit(cm->pos_field, pos));
+
+    if ((err = bts_open(&bts, &tp->meta, fd_cb_wr, tp)) != 0)
+        goto out;
+
+    off_t len = torrent_piece_size(tp, pos);
+    off_t off = tp->meta.piece_length * pos;
+    while (len > 0) {
+        size_t wlen = min(ZEROBUFLEN, len);
+        if ((err = bts_put(bts, off, m_zerobuf, wlen)) != 0) {
+            bts_close(bts);
+            goto out;
+        }
+        len -= wlen;
+        off += wlen;
+    }
+    err = bts_close(bts);
+out:
+    if (err != 0)
+        op->error = 1;
+}
+
+static int
+test_torrent(struct torrent *tp, volatile sig_atomic_t *cancel)
+{
+    int err;
+    FILE *fp;
+    uint8_t (*hashes)[SHA_DIGEST_LENGTH];
+    uint8_t hash[SHA_DIGEST_LENGTH];
+
+    if ((err = vfopen(&fp, "r", "library/%s/torrent", tp->relpath)) != 0)
+        return err;
+
+    hashes = btpd_malloc(tp->meta.npieces * SHA_DIGEST_LENGTH);
+    fseek(fp, tp->meta.pieces_off, SEEK_SET);
+    fread(hashes, SHA_DIGEST_LENGTH, tp->meta.npieces, fp);
+    fclose(fp);
+
+    tp->meta.piece_hash = hashes;
+
+    struct content *cm = tp->cm;
+    for (uint32_t piece = 0; piece < tp->meta.npieces; piece++) {
+        if (!has_bit(cm->pos_field, piece))
+            continue;
+        err = bts_sha(cm->rds, piece * tp->meta.piece_length,
+            torrent_piece_size(tp, piece), hash);
+        if (err != 0)
+            break;
+        if (test_hash(tp, hash, piece) == 0)
+            set_bit(tp->cm->piece_field, piece);
+        if (*cancel) {
+            err = EINTR;
+            break;
+        }
+    }
+
+    tp->meta.piece_hash = NULL;
+    free(hashes);
+    return err;
+}
+
+int
+stat_and_adjust(struct torrent *tp, struct stat ret[])
+{
+    char path[PATH_MAX];
+    for (int i = 0; i < tp->meta.nfiles; i++) {
+        snprintf(path, PATH_MAX, "library/%s/content/%s", tp->relpath,
+            tp->meta.files[i].path);
+again:
+        if (stat(path, &ret[i]) == -1) {
+            if (errno == ENOENT) {
+                ret[i].st_mtime = -1;
+                ret[i].st_size = -1;
+            } else
+                return errno;
+        }
+        if (ret[i].st_size > tp->meta.files[i].length) {
+            if (truncate(path, tp->meta.files[i].length) != 0)
+                return errno;
+            goto again;
+        }
+    }
+    return 0;
+}
+
+static int
+load_resume(struct torrent *tp, struct stat sbs[])
+{
+    int err, ver;
+    FILE *fp;
+    size_t pfsiz = ceil(tp->meta.npieces / 8.0);
+    size_t bfsiz = tp->meta.npieces * tp->cm->bppbf;
+
+    if ((err = vfopen(&fp, "r" , "library/%s/resume", tp->relpath)) != 0)
+        return err;
+
+    if (fscanf(fp, "%d\n", &ver) != 1)
+        goto invalid;
+    if (ver != 1)
+        goto invalid;
+    for (int i = 0; i < tp->meta.nfiles; i++) {
+        long long size;
+        time_t time;
+        if (fscanf(fp, "%qd %ld\n", &size, &time) != 2)
+            goto invalid;
+        if (sbs[i].st_size != size || sbs[i].st_mtime != time)
+            goto invalid;
+    }
+    if (fread(tp->cm->piece_field, 1, pfsiz, fp) != pfsiz)
+        goto invalid;
+    if (fread(tp->cm->block_field, 1, bfsiz, fp) != bfsiz)
+        goto invalid;
+    fclose(fp);
+    return 0;
+invalid:
+    fclose(fp);
+    bzero(tp->cm->piece_field, pfsiz);
+    bzero(tp->cm->block_field, bfsiz);
+    return EINVAL;
+}
+
+static int
+save_resume(struct torrent *tp)
+{
+    int err;
+    FILE *fp;
+    struct stat sbs[tp->meta.nfiles];
+    if ((err = stat_and_adjust(tp, sbs)) != 0)
+        return err;
+    if ((err = vfopen(&fp, "wb", "library/%s/resume", tp->relpath)) != 0)
+        return err;
+    fprintf(fp, "%d\n", 1);
+    for (int i = 0; i < tp->meta.nfiles; i++)
+        fprintf(fp, "%qd %ld\n", (long long)sbs[i].st_size, sbs[i].st_mtime);
+    fwrite(tp->cm->piece_field, 1, ceil(tp->meta.npieces / 8.0), fp);
+    fwrite(tp->cm->block_field, 1, tp->meta.npieces * tp->cm->bppbf, fp);
+    if (fclose(fp) != 0)
+        err = errno;
+    return err;
+}
+
+static void
+cm_td_save(struct cm_op *op)
+{
+    int err;
+    struct torrent *tp = op->tp;
+    struct content *cm = tp->cm;
+    struct bt_stream *bts = cm->wrs;
+
+    cm->wrs = NULL;
+    if ((err = bts_close(bts)) != 0)
+        goto out;
+
+    for (int i = 0; i < tp->meta.nfiles; i++) {
+        int lerr;
+        if ((lerr = vfsync("library/%s/content/%s", tp->relpath,
+                 tp->meta.files[i])) != 0 && lerr != ENOENT)
+            err = lerr;
+    }
+    if (err != 0)
+        goto out;
+    save_resume(tp);
+out:
+    if (err != 0)
+        op->error = 1;
+}
+
+static void
+cm_td_start(struct cm_op *op)
+{
+    int err;
+    struct stat sbs[op->tp->meta.nfiles];
+    struct torrent *tp = op->tp;
+    struct content *cm = tp->cm;
+
+    if ((err = stat_and_adjust(op->tp, sbs)) != 0)
+        goto out;
+
+    if (load_resume(tp, sbs) != 0) {
+        memset(cm->pos_field, 0xff, ceil(tp->meta.npieces / 8.0));
+        off_t off = 0;
+        for (int i = 0; i < tp->meta.nfiles; i++) {
+            if (sbs[i].st_size == -1 || sbs[i].st_size == 0) {
+                uint32_t start = off / tp->meta.piece_length;
+                uint32_t end = (off + tp->meta.files[i].length - 1) /
+                    tp->meta.piece_length;
+                while (start <= end) {
+                    clear_bit(cm->pos_field, start);
+                    start++;
+                }
+            } else if (sbs[i].st_size < tp->meta.files[i].length) {
+                uint32_t start = (off + sbs[i].st_size) /
+                    tp->meta.piece_length;
+                uint32_t end = (off + tp->meta.files[i].length - 1) /
+                    tp->meta.piece_length;
+                while (start <= end) {
+                    clear_bit(cm->pos_field, start);
+                    start++;
+                }
+            }
+            off += tp->meta.files[i].length;
+        }
+        if (op->u.start.cancel)
+            goto out;
+        if ((err = test_torrent(tp, &op->u.start.cancel)) != 0)
+            goto out;
+        save_resume(tp);
+    }
+
+    bzero(cm->pos_field, ceil(tp->meta.npieces / 8.0));
+    for (uint32_t piece = 0; piece < tp->meta.npieces; piece++) {
+        if (cm_has_piece(tp, piece)) {
+            cm->ncontent_bytes += torrent_piece_size(tp, piece);
+            cm->npieces_got++;
+            set_bit(cm->pos_field, piece);
+            continue;
+        }
+        uint8_t *bf = cm->block_field + cm->bppbf * piece;
+        uint32_t nblocks = torrent_piece_blocks(tp, piece);
+        uint32_t nblocks_got = 0;
+        for (uint32_t i = 0; i < nblocks; i++) {
+            if (has_bit(bf, i)) {
+                nblocks_got++;
+                cm->ncontent_bytes +=
+                    torrent_block_size(tp, piece, nblocks, i);
+            }
+        }
+        if (nblocks_got == nblocks) {
+            int ok;
+            if (((err = test_piece(tp, piece, piece, &ok)) != 0
+                    || op->u.start.cancel))
+                goto out;
+            if (ok) {
+                set_bit(cm->pos_field, piece);
+                set_bit(cm->piece_field, piece);
+            } else
+                bzero(bf, cm->bppbf);
+        } else if (nblocks_got > 0)
+            set_bit(cm->pos_field, piece);
+    }
+
+    if (cm->npieces_got < tp->meta.npieces)
+        if ((err = bts_open(&cm->wrs, &tp->meta, fd_cb_wr, tp)) != 0)
+            goto out;
+out:
+    if (!op->u.start.cancel && err != 0)
+        op->error = 1;
+}
+
+static void
+cm_td_test(struct cm_op *op)
+{
+    if (test_piece(op->tp, op->u.test.pos, op->u.test.piece,
+            &op->u.test.ok) != 0)
+        op->error = 1;
+}
+
+static void
+cm_td_write(struct cm_op *op)
+{
+    int err;
+    struct cm_write_data *d, *next;
+    off_t base = op->tp->meta.piece_length * op->u.write.pos;
+    struct bt_stream *bts;
+    if ((err = bts_open(&bts, &op->tp->meta, fd_cb_wr, op->tp)) != 0)
+        goto out;
+    BTPDQ_FOREACH(d, &op->u.write.q, entry)
+        if ((err = bts_put(bts, base + d->begin, d->buf, d->len)) != 0) {
+            bts_close(bts);
+            goto out;
+        }
+    err = bts_close(bts);
+out:
+    BTPDQ_FOREACH_MUTABLE(d, &op->u.write.q, entry, next)
+        free(d);
+    if (err)
+        op->error = 1;
+}
+
+static void
+cm_td(void *arg)
+{
+    struct cm_comm *comm = arg;
+    struct cm_op *op;
+    for (;;) {
+        pthread_mutex_lock(&comm->lock);
+        while (BTPDQ_EMPTY(&comm->q))
+            pthread_cond_wait(&comm->cond, &comm->lock);
+
+        op = BTPDQ_FIRST(&comm->q);
+        BTPDQ_REMOVE(&comm->q, op, td_entry);
+        op->received = 1;
+        pthread_mutex_unlock(&comm->lock);
+
+        switch (op->type) {
+        case CM_ALLOC:
+            cm_td_alloc(op);
+            break;
+        case CM_SAVE:
+            cm_td_save(op);
+            break;
+        case CM_START:
+            cm_td_start(op);
+            break;
+        case CM_TEST:
+            cm_td_test(op);
+            break;
+        case CM_WRITE:
+            cm_td_write(op);
+            break;
+        default:
+            abort();
+        }
+        td_post_begin();
+        td_post(cm_td_cb, op);
+        td_post_end();
+    }
+}
+
+void
+cm_init(void)
+{
+    pthread_t td;
+    BTPDQ_INIT(&m_long_comm.q);
+    pthread_mutex_init(&m_long_comm.lock, NULL);
+    pthread_cond_init(&m_long_comm.cond, NULL);
+    pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_long_comm);
+    BTPDQ_INIT(&m_short_comm.q);
+    pthread_mutex_init(&m_short_comm.lock, NULL);
+    pthread_cond_init(&m_short_comm.cond, NULL);
+    pthread_create(&td, NULL, (void *(*)(void *))cm_td, &m_short_comm);
+}
diff --git a/btpd/download_subr.c b/btpd/download_subr.c
index 9164e72..df2aaaf 100644
--- a/btpd/download_subr.c
+++ b/btpd/download_subr.c
@@ -37,9 +37,8 @@ piece_alloc(struct net *n, uint32_t index)
     struct piece *pc;
     size_t mem, field, blocks;
     unsigned nblocks;
-    off_t piece_length = torrent_piece_size(n->tp, index);
 
-    nblocks = (unsigned)ceil((double)piece_length / PIECE_BLOCKLEN);
+    nblocks = torrent_piece_blocks(n->tp, index);
     blocks = sizeof(pc->blocks[0]) * nblocks;
     field = (size_t)ceil(nblocks / 8.0);
     mem = sizeof(*pc) + field + blocks;
@@ -63,7 +62,7 @@ piece_alloc(struct net *n, uint32_t index)
     pc->blocks = (struct block *)(pc->down_field + field);
     for (unsigned i = 0; i < nblocks; i++) {
         uint32_t start = i * PIECE_BLOCKLEN;
-        uint32_t len = torrent_block_size(pc, i);
+        uint32_t len = torrent_block_size(n->tp, index, nblocks, i);
         struct block *blk = &pc->blocks[i];
         blk->pc = pc;
         BTPDQ_INIT(&blk->reqs);
diff --git a/btpd/torrent.c b/btpd/torrent.c
index 909b18b..c362ad5 100644
--- a/btpd/torrent.c
+++ b/btpd/torrent.c
@@ -31,13 +31,20 @@ torrent_piece_size(struct torrent *tp, uint32_t index)
 }
 
 uint32_t
-torrent_block_size(struct piece *pc, uint32_t index)
+torrent_piece_blocks(struct torrent *tp, uint32_t piece)
 {
-    if (index < pc->nblocks - 1)
+    return ceil(torrent_piece_size(tp, piece) / (double)PIECE_BLOCKLEN);
+}
+
+uint32_t
+torrent_block_size(struct torrent *tp, uint32_t piece, uint32_t nblocks,
+    uint32_t block)
+{
+    if (block < nblocks - 1)
         return PIECE_BLOCKLEN;
     else {
-        uint32_t allbutlast = PIECE_BLOCKLEN * (pc->nblocks - 1);
-        return torrent_piece_size(pc->n->tp, pc->index) - allbutlast;
+        uint32_t allbutlast = PIECE_BLOCKLEN * (nblocks - 1);
+        return torrent_piece_size(tp, piece) - allbutlast;
     }
 }
 
@@ -52,11 +59,14 @@ torrent_activate(struct torrent *tp)
 void
 torrent_deactivate(struct torrent *tp)
 {
-
+    tp->state = T_STOPPING;
+    tr_stop(tp);
+    net_del_torrent(tp);
+    cm_stop(tp);
 }
 
 int
-torrent_create(struct torrent **res, const char *path)
+torrent_load(struct torrent **res, const char *path)
 {
     struct metainfo *mi;
     int error;
@@ -88,16 +98,17 @@ torrent_create(struct torrent **res, const char *path)
     return error;
 }
 
-void torrent_cm_cb(struct torrent *tp, enum cm_state state)
+void
+torrent_on_cm_started(struct torrent *tp)
 {
-    switch (state) {
-    case CM_STARTED:
-        net_add_torrent(tp);
-        tr_start(tp);
-        break;
-    case CM_STOPPED:
-        break;
-    case CM_ERROR:
-        break;
-    }
+    net_add_torrent(tp);
+    tr_start(tp);
+    tp->state = T_ACTIVE;
+}
+
+void
+torrent_on_cm_stopped(struct torrent *tp)
+{
+    assert(tp->state == T_STOPPING);
+    tp->state = T_INACTIVE;
 }
diff --git a/btpd/torrent.h b/btpd/torrent.h
index 6ce07a4..73a26c7 100644
--- a/btpd/torrent.h
+++ b/btpd/torrent.h
@@ -25,19 +25,16 @@ struct torrent {
 
 BTPDQ_HEAD(torrent_tq, torrent);
 
-int torrent_create(struct torrent **res, const char *path);
+int torrent_load(struct torrent **res, const char *path);
 void torrent_activate(struct torrent *tp);
 void torrent_deactivate(struct torrent *tp);
 
-off_t torrent_piece_size(struct torrent *tp, uint32_t index);
-uint32_t torrent_block_size(struct piece *pc, uint32_t index);
+off_t torrent_piece_size(struct torrent *tp, uint32_t piece);
+uint32_t torrent_piece_blocks(struct torrent *tp, uint32_t piece);
+uint32_t torrent_block_size(struct torrent *tp, uint32_t piece,
+    uint32_t nblocks, uint32_t block);
 
-enum cm_state {
-    CM_STARTED,
-    CM_STOPPED,
-    CM_ERROR
-};
-
-void torrent_cm_cb(struct torrent *tp, enum cm_state state);
+void torrent_on_cm_stopped(struct torrent *tp);
+void torrent_on_cm_started(struct torrent *tp);
 
 #endif