summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard Nyberg <rnyberg@murmeldjur.se>2006-01-06 22:24:25 +0000
committerRichard Nyberg <rnyberg@murmeldjur.se>2006-01-06 22:24:25 +0000
commit7f8f5dd03bcb53923520a4b11770797478e8e113 (patch)
tree75398352b47a28bc79484db0d3deb35f0f6e3ef7
parent74b665efe18365fabac87bfbaa34e1e2fb815c1d (diff)
downloadbtpd-7f8f5dd03bcb53923520a4b11770797478e8e113.tar.gz
btpd-7f8f5dd03bcb53923520a4b11770797478e8e113.zip
* Added a method for other threads to execute callbacks in the main thread.
* Added a thread and api for http requests. The thread uses the curl multi
  interface and will be used by the tracker code. The tracker code currently
  use forked processes to do its bidding. 

-rw-r--r--btpd/Makefile.am1
-rw-r--r--btpd/btpd.c86
-rw-r--r--btpd/btpd.h7
-rw-r--r--btpd/http.c245
-rw-r--r--btpd/http.h29
5 files changed, 367 insertions, 1 deletions
diff --git a/btpd/Makefile.am b/btpd/Makefile.am
index 967f889..cd3870f 100644
--- a/btpd/Makefile.am
+++ b/btpd/Makefile.am
@@ -3,6 +3,7 @@ btpd_SOURCES=\
 	btpd.c btpd.h\
 	content.c content.h\
 	download.c download_subr.c download.h\
+	http.c http.h\
 	main.c\
 	net.c net.h\
 	net_buf.c net_buf.h\
diff --git a/btpd/btpd.c b/btpd/btpd.c
index 516602e..146cba0 100644
--- a/btpd/btpd.c
+++ b/btpd/btpd.c
@@ -17,6 +17,7 @@
 #include <getopt.h>
 #include <math.h>
 #include <locale.h>
+#include <pthread.h>
 #include <pwd.h>
 #include <signal.h>
 #include <stdio.h>
@@ -26,6 +27,7 @@
 #include <unistd.h>
 
 #include "btpd.h"
+#include "http.h"
 
 struct child {
     pid_t pid;
@@ -160,7 +162,87 @@ load_library(void)
     free(entries);
 }
 
-extern void ipc_init(void);
+struct td_cb {
+    void (*cb)(void *);
+    void *arg;
+    BTPDQ_ENTRY(td_cb) entry;
+};
+
+BTPDQ_HEAD(td_cb_tq, td_cb);
+
+static int m_td_rd, m_td_wr;
+static struct event m_td_ev;
+static struct td_cb_tq m_td_cbs = BTPDQ_HEAD_INITIALIZER(m_td_cbs);
+static pthread_mutex_t m_td_lock;
+
+void
+td_acquire_lock(void)
+{
+    pthread_mutex_lock(&m_td_lock);
+}
+
+void
+td_release_lock(void)
+{
+    pthread_mutex_unlock(&m_td_lock);
+}
+
+void
+td_post(void (*fun)(void *), void *arg)
+{
+    struct td_cb *cb = btpd_calloc(1, sizeof(*cb));
+    cb->cb = fun;
+    cb->arg = arg;
+    BTPDQ_INSERT_TAIL(&m_td_cbs, cb, entry);
+}
+
+void
+td_post_end(void)
+{
+    char c = '1';
+    td_release_lock();
+    write(m_td_wr, &c, sizeof(c));
+}
+
+static void
+td_cb(int fd, short type, void *arg)
+{
+    char buf[1024];
+    struct td_cb_tq tmpq =  BTPDQ_HEAD_INITIALIZER(tmpq);
+    struct td_cb *cb, *next;
+
+    read(fd, buf, sizeof(buf));
+    td_acquire_lock();
+    BTPDQ_FOREACH_MUTABLE(cb, &m_td_cbs, entry, next)
+        BTPDQ_INSERT_TAIL(&tmpq, cb, entry);
+    BTPDQ_INIT(&m_td_cbs);
+    td_release_lock();
+
+    BTPDQ_FOREACH_MUTABLE(cb, &tmpq, entry, next) {
+        cb->cb(cb->arg);
+        free(cb);
+    }
+}
+
+static void
+td_init(void)
+{
+    int err;
+    int fds[2];
+    if (pipe(fds) == -1) {
+        btpd_err("Couldn't create thread callback pipe (%s).\n",
+            strerror(errno));
+    }
+    m_td_rd = fds[0];
+    m_td_wr = fds[1];
+    if ((err = pthread_mutex_init(&m_td_lock, NULL)) != 0)
+        btpd_err("Couldn't create mutex (%s).\n", strerror(err));
+
+    event_set(&m_td_ev, m_td_rd, EV_READ|EV_PERSIST, td_cb, NULL);
+    event_add(&m_td_ev, NULL);
+}
+
+void ipc_init(void);
 
 void
 btpd_init(void)
@@ -171,6 +253,8 @@ btpd_init(void)
     for (int i = sizeof(BTPD_VERSION); i < 20; i++)
         m_peer_id[i] = rand_between(0, 255);
 
+    td_init();
+    http_init();
     net_init();
     //ipc_init();
     ul_init();
diff --git a/btpd/btpd.h b/btpd/btpd.h
index 68aaa0b..323c939 100644
--- a/btpd/btpd.h
+++ b/btpd/btpd.h
@@ -58,4 +58,11 @@ void btpd_del_torrent(struct torrent *tp);
 unsigned btpd_get_ntorrents(void);
 const uint8_t *btpd_get_peer_id(void);
 
+void td_acquire_lock(void);
+void td_release_lock(void);
+
+#define td_post_begin td_acquire_lock
+void td_post(void (*fun)(void *), void *arg);
+void td_post_end(void);
+
 #endif
diff --git a/btpd/http.c b/btpd/http.c
new file mode 100644
index 0000000..d01ec6b
--- /dev/null
+++ b/btpd/http.c
@@ -0,0 +1,245 @@
+#include <pthread.h>
+#include <stdarg.h>
+#include <string.h>
+#include <unistd.h>
+#include <curl/curl.h>
+
+#include "btpd.h"
+#include "http.h"
+
+#define MAX_DOWNLOAD (256 << 10)
+
+enum http_state {
+    HS_ADD,
+    HS_ACTIVE,
+    HS_DONE,
+    HS_NOADD,
+    HS_CANCEL
+};
+
+struct http {
+    enum http_state state;
+    char *url;
+    CURL *curlh;
+    struct http_res res;
+    BTPDQ_ENTRY(http) entry;
+    void (*cb)(struct http *, struct http_res *, void *);
+    void *cb_arg;
+};
+
+BTPDQ_HEAD(http_tq, http);
+
+static pthread_t m_httptd;
+static  struct http_tq m_httpq = BTPDQ_HEAD_INITIALIZER(m_httpq);
+static pthread_mutex_t m_httpq_lock;
+static pthread_cond_t m_httpq_cond;
+static CURLM *m_curlh;
+
+static size_t
+http_write_cb(void *ptr, size_t size, size_t nmemb, void *arg)
+{
+    char *mem;
+    struct http_res *res = arg;
+    size_t nbytes = size * nmemb;
+    size_t nlength = res->length + nbytes;
+    if (nlength > MAX_DOWNLOAD)
+        return 0;
+    if ((mem = realloc(res->content, nlength)) == NULL)
+        return 0;
+    res->content = mem;
+    bcopy(ptr, res->content + res->length, nbytes);
+    res->length = nlength;
+    return nbytes;
+}
+
+int
+http_get(struct http **ret,
+    void (*cb)(struct http *, struct http_res *, void *),
+    void *arg,
+    const char *fmt, ...)
+{
+    struct http *h = btpd_calloc(1, sizeof(*h));
+
+    h->state = HS_ADD;
+    h->cb = cb;
+    h->cb_arg = arg;
+    if ((h->curlh = curl_easy_init()) == NULL)
+        btpd_err("Fatal error in curl.\n");
+
+    va_list ap;
+    va_start(ap, fmt);
+    if (vasprintf(&h->url, fmt, ap) == -1)
+        btpd_err("Out of memory.\n");
+    va_end(ap);
+
+    curl_easy_setopt(h->curlh, CURLOPT_URL, h->url);
+    curl_easy_setopt(h->curlh, CURLOPT_USERAGENT, BTPD_VERSION);
+    curl_easy_setopt(h->curlh, CURLOPT_WRITEFUNCTION, http_write_cb);
+    curl_easy_setopt(h->curlh, CURLOPT_WRITEDATA, &h->res);
+    curl_easy_setopt(h->curlh, CURLOPT_FOLLOWLOCATION, 1);
+
+    pthread_mutex_lock(&m_httpq_lock);
+    BTPDQ_INSERT_TAIL(&m_httpq, h, entry);
+    pthread_mutex_unlock(&m_httpq_lock);
+    pthread_cond_signal(&m_httpq_cond);
+
+    if (ret != NULL)
+        *ret = h;
+
+    return 0;
+}
+
+void
+http_cancel(struct http *http)
+{
+    pthread_mutex_lock(&m_httpq_lock);
+    if (http->state == HS_ADD)
+        http->state = HS_NOADD;
+    else if (http->state == HS_ACTIVE)
+        http->state = HS_CANCEL;
+    pthread_mutex_unlock(&m_httpq_lock);
+}
+
+int
+http_succeeded(struct http_res *res)
+{
+    return res->res == HRES_OK && res->code >= 200 && res->code < 300;
+}
+
+static void
+http_td_cb(void *arg)
+{
+    struct http *h = arg;
+    if (h->res.res == HRES_OK)
+        curl_easy_getinfo(h->curlh, CURLINFO_RESPONSE_CODE, &h->res.code);
+    if (h->res.res == HRES_FAIL) {
+        btpd_log(BTPD_L_BTPD, "Http error for url '%s' (%s).\n", h->url,
+            curl_easy_strerror(h->res.code));
+    }
+    h->cb(h, &h->res, h->cb_arg);
+    curl_easy_cleanup(h->curlh);
+    if (h->res.content != NULL)
+        free(h->res.content);
+    free(h->url);
+    free(h);
+}
+
+static void
+http_td_actions(void)
+{
+    int has_posted = 0;
+    struct http *http, *next;
+    int nmsgs;
+    CURLMsg *cmsg;
+
+    BTPDQ_FOREACH_MUTABLE(http, &m_httpq, entry, next) {
+        switch (http->state) {
+        case HS_ADD:
+            curl_multi_add_handle(m_curlh, http->curlh);
+            http->state = HS_ACTIVE;
+            break;
+        case HS_CANCEL:
+            curl_multi_remove_handle(m_curlh, http->curlh);
+        case HS_NOADD:
+            BTPDQ_REMOVE(&m_httpq, http, entry);
+            http->state = HS_CANCEL;
+            http->res.res = HRES_CANCEL;
+            if (!has_posted) {
+                has_posted = 1;
+                td_post_begin();
+            }
+            td_post(http_td_cb, http);
+            break;
+        case HS_DONE:
+            abort();
+        default:
+            break;
+        }
+    }
+
+    while ((cmsg = curl_multi_info_read(m_curlh, &nmsgs)) != NULL) {
+        BTPDQ_FOREACH(http, &m_httpq, entry) {
+            if (http->curlh == cmsg->easy_handle)
+                break;
+        }
+        assert(http != NULL);
+        BTPDQ_REMOVE(&m_httpq, http, entry);
+        http->state = HS_DONE;
+        if (cmsg->data.result == 0)
+            http->res.res = HRES_OK;
+        else {
+            http->res.res = HRES_FAIL;
+            http->res.code = cmsg->data.result;
+        }
+        curl_multi_remove_handle(m_curlh, http->curlh);
+        if (!has_posted) {
+            td_post_begin();
+            has_posted = 1;
+        }
+        td_post(http_td_cb, http);
+    }
+
+    if (has_posted)
+        td_post_end();
+}
+
+static void
+http_td_curl(void)
+{
+    fd_set rset, wset, eset;
+    int maxfd, nbusy;
+
+    pthread_mutex_unlock(&m_httpq_lock);
+
+    do {
+        while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(m_curlh, &nbusy))
+            ;
+
+        FD_ZERO(&rset);
+        FD_ZERO(&wset);
+        FD_ZERO(&eset);
+
+        curl_multi_fdset(m_curlh, &rset, &wset, &eset, &maxfd);
+        select(maxfd + 1, &rset, &wset, &eset, (& (struct timeval) { 2, 0}));
+
+        pthread_mutex_lock(&m_httpq_lock);
+        http_td_actions();
+        pthread_mutex_unlock(&m_httpq_lock);
+    } while (nbusy > 0);
+
+    pthread_mutex_lock(&m_httpq_lock);
+}
+
+static void *
+http_td(void *arg)
+{
+    pthread_mutex_lock(&m_httpq_lock);
+    for (;;) {
+        while (BTPDQ_EMPTY(&m_httpq))
+            pthread_cond_wait(&m_httpq_cond, &m_httpq_lock);
+        http_td_actions();
+        if (!BTPDQ_EMPTY(&m_httpq))
+            http_td_curl();
+    }
+}
+
+void
+http_init(void)
+{
+    int err;
+    if (curl_global_init(0))
+        goto curl_err;
+    if ((m_curlh = curl_multi_init()) == NULL)
+        goto curl_err;
+
+    err = pthread_mutex_init(&m_httpq_lock, NULL);
+    if (err == 0)
+        err = pthread_cond_init(&m_httpq_cond, NULL);
+    if (err == 0)
+        err = pthread_create(&m_httptd, NULL, http_td, NULL);
+    if (err != 0)
+        btpd_err("pthread failure (%s)\n", strerror(err));
+    return;
+curl_err:
+    btpd_err("Fatal error in curl.\n");
+}
diff --git a/btpd/http.h b/btpd/http.h
new file mode 100644
index 0000000..55a504f
--- /dev/null
+++ b/btpd/http.h
@@ -0,0 +1,29 @@
+#ifndef BTPD_HTTP_H
+#define BTPD_HTTP_H
+
+struct http;
+
+enum http_status {
+    HRES_OK,
+    HRES_FAIL,
+    HRES_CANCEL
+};
+
+struct http_res {
+    enum http_status res;
+    long code;
+    char *content;
+    size_t length;
+};
+
+int http_get(struct http **ret,
+    void (*cb)(struct http *, struct http_res *, void *),
+    void *arg,
+    const char *fmt, ...);
+void http_cancel(struct http *http);
+
+int http_succeeded(struct http_res *res);
+
+void http_init(void);
+
+#endif