diff options
| -rw-r--r-- | btpd/Makefile.am | 1 | ||||
| -rw-r--r-- | btpd/btpd.c | 86 | ||||
| -rw-r--r-- | btpd/btpd.h | 7 | ||||
| -rw-r--r-- | btpd/http.c | 245 | ||||
| -rw-r--r-- | btpd/http.h | 29 |
5 files changed, 367 insertions, 1 deletions
diff --git a/btpd/Makefile.am b/btpd/Makefile.am index 967f889..cd3870f 100644 --- a/btpd/Makefile.am +++ b/btpd/Makefile.am @@ -3,6 +3,7 @@ btpd_SOURCES=\ btpd.c btpd.h\ content.c content.h\ download.c download_subr.c download.h\ + http.c http.h\ main.c\ net.c net.h\ net_buf.c net_buf.h\ diff --git a/btpd/btpd.c b/btpd/btpd.c index 516602e..146cba0 100644 --- a/btpd/btpd.c +++ b/btpd/btpd.c @@ -17,6 +17,7 @@ #include <getopt.h> #include <math.h> #include <locale.h> +#include <pthread.h> #include <pwd.h> #include <signal.h> #include <stdio.h> @@ -26,6 +27,7 @@ #include <unistd.h> #include "btpd.h" +#include "http.h" struct child { pid_t pid; @@ -160,7 +162,87 @@ load_library(void) free(entries); } -extern void ipc_init(void); +struct td_cb { + void (*cb)(void *); + void *arg; + BTPDQ_ENTRY(td_cb) entry; +}; + +BTPDQ_HEAD(td_cb_tq, td_cb); + +static int m_td_rd, m_td_wr; +static struct event m_td_ev; +static struct td_cb_tq m_td_cbs = BTPDQ_HEAD_INITIALIZER(m_td_cbs); +static pthread_mutex_t m_td_lock; + +void +td_acquire_lock(void) +{ + pthread_mutex_lock(&m_td_lock); +} + +void +td_release_lock(void) +{ + pthread_mutex_unlock(&m_td_lock); +} + +void +td_post(void (*fun)(void *), void *arg) +{ + struct td_cb *cb = btpd_calloc(1, sizeof(*cb)); + cb->cb = fun; + cb->arg = arg; + BTPDQ_INSERT_TAIL(&m_td_cbs, cb, entry); +} + +void +td_post_end(void) +{ + char c = '1'; + td_release_lock(); + write(m_td_wr, &c, sizeof(c)); +} + +static void +td_cb(int fd, short type, void *arg) +{ + char buf[1024]; + struct td_cb_tq tmpq = BTPDQ_HEAD_INITIALIZER(tmpq); + struct td_cb *cb, *next; + + read(fd, buf, sizeof(buf)); + td_acquire_lock(); + BTPDQ_FOREACH_MUTABLE(cb, &m_td_cbs, entry, next) + BTPDQ_INSERT_TAIL(&tmpq, cb, entry); + BTPDQ_INIT(&m_td_cbs); + td_release_lock(); + + BTPDQ_FOREACH_MUTABLE(cb, &tmpq, entry, next) { + cb->cb(cb->arg); + free(cb); + } +} + +static void +td_init(void) +{ + int err; + int fds[2]; + if (pipe(fds) == -1) { + btpd_err("Couldn't create thread callback pipe (%s).\n", + strerror(errno)); + } + m_td_rd = fds[0]; + m_td_wr = fds[1]; + if ((err = pthread_mutex_init(&m_td_lock, NULL)) != 0) + btpd_err("Couldn't create mutex (%s).\n", strerror(err)); + + event_set(&m_td_ev, m_td_rd, EV_READ|EV_PERSIST, td_cb, NULL); + event_add(&m_td_ev, NULL); +} + +void ipc_init(void); void btpd_init(void) @@ -171,6 +253,8 @@ btpd_init(void) for (int i = sizeof(BTPD_VERSION); i < 20; i++) m_peer_id[i] = rand_between(0, 255); + td_init(); + http_init(); net_init(); //ipc_init(); ul_init(); diff --git a/btpd/btpd.h b/btpd/btpd.h index 68aaa0b..323c939 100644 --- a/btpd/btpd.h +++ b/btpd/btpd.h @@ -58,4 +58,11 @@ void btpd_del_torrent(struct torrent *tp); unsigned btpd_get_ntorrents(void); const uint8_t *btpd_get_peer_id(void); +void td_acquire_lock(void); +void td_release_lock(void); + +#define td_post_begin td_acquire_lock +void td_post(void (*fun)(void *), void *arg); +void td_post_end(void); + #endif diff --git a/btpd/http.c b/btpd/http.c new file mode 100644 index 0000000..d01ec6b --- /dev/null +++ b/btpd/http.c @@ -0,0 +1,245 @@ +#include <pthread.h> +#include <stdarg.h> +#include <string.h> +#include <unistd.h> +#include <curl/curl.h> + +#include "btpd.h" +#include "http.h" + +#define MAX_DOWNLOAD (256 << 10) + +enum http_state { + HS_ADD, + HS_ACTIVE, + HS_DONE, + HS_NOADD, + HS_CANCEL +}; + +struct http { + enum http_state state; + char *url; + CURL *curlh; + struct http_res res; + BTPDQ_ENTRY(http) entry; + void (*cb)(struct http *, struct http_res *, void *); + void *cb_arg; +}; + +BTPDQ_HEAD(http_tq, http); + +static pthread_t m_httptd; +static struct http_tq m_httpq = BTPDQ_HEAD_INITIALIZER(m_httpq); +static pthread_mutex_t m_httpq_lock; +static pthread_cond_t m_httpq_cond; +static CURLM *m_curlh; + +static size_t +http_write_cb(void *ptr, size_t size, size_t nmemb, void *arg) +{ + char *mem; + struct http_res *res = arg; + size_t nbytes = size * nmemb; + size_t nlength = res->length + nbytes; + if (nlength > MAX_DOWNLOAD) + return 0; + if ((mem = realloc(res->content, nlength)) == NULL) + return 0; + res->content = mem; + bcopy(ptr, res->content + res->length, nbytes); + res->length = nlength; + return nbytes; +} + +int +http_get(struct http **ret, + void (*cb)(struct http *, struct http_res *, void *), + void *arg, + const char *fmt, ...) +{ + struct http *h = btpd_calloc(1, sizeof(*h)); + + h->state = HS_ADD; + h->cb = cb; + h->cb_arg = arg; + if ((h->curlh = curl_easy_init()) == NULL) + btpd_err("Fatal error in curl.\n"); + + va_list ap; + va_start(ap, fmt); + if (vasprintf(&h->url, fmt, ap) == -1) + btpd_err("Out of memory.\n"); + va_end(ap); + + curl_easy_setopt(h->curlh, CURLOPT_URL, h->url); + curl_easy_setopt(h->curlh, CURLOPT_USERAGENT, BTPD_VERSION); + curl_easy_setopt(h->curlh, CURLOPT_WRITEFUNCTION, http_write_cb); + curl_easy_setopt(h->curlh, CURLOPT_WRITEDATA, &h->res); + curl_easy_setopt(h->curlh, CURLOPT_FOLLOWLOCATION, 1); + + pthread_mutex_lock(&m_httpq_lock); + BTPDQ_INSERT_TAIL(&m_httpq, h, entry); + pthread_mutex_unlock(&m_httpq_lock); + pthread_cond_signal(&m_httpq_cond); + + if (ret != NULL) + *ret = h; + + return 0; +} + +void +http_cancel(struct http *http) +{ + pthread_mutex_lock(&m_httpq_lock); + if (http->state == HS_ADD) + http->state = HS_NOADD; + else if (http->state == HS_ACTIVE) + http->state = HS_CANCEL; + pthread_mutex_unlock(&m_httpq_lock); +} + +int +http_succeeded(struct http_res *res) +{ + return res->res == HRES_OK && res->code >= 200 && res->code < 300; +} + +static void +http_td_cb(void *arg) +{ + struct http *h = arg; + if (h->res.res == HRES_OK) + curl_easy_getinfo(h->curlh, CURLINFO_RESPONSE_CODE, &h->res.code); + if (h->res.res == HRES_FAIL) { + btpd_log(BTPD_L_BTPD, "Http error for url '%s' (%s).\n", h->url, + curl_easy_strerror(h->res.code)); + } + h->cb(h, &h->res, h->cb_arg); + curl_easy_cleanup(h->curlh); + if (h->res.content != NULL) + free(h->res.content); + free(h->url); + free(h); +} + +static void +http_td_actions(void) +{ + int has_posted = 0; + struct http *http, *next; + int nmsgs; + CURLMsg *cmsg; + + BTPDQ_FOREACH_MUTABLE(http, &m_httpq, entry, next) { + switch (http->state) { + case HS_ADD: + curl_multi_add_handle(m_curlh, http->curlh); + http->state = HS_ACTIVE; + break; + case HS_CANCEL: + curl_multi_remove_handle(m_curlh, http->curlh); + case HS_NOADD: + BTPDQ_REMOVE(&m_httpq, http, entry); + http->state = HS_CANCEL; + http->res.res = HRES_CANCEL; + if (!has_posted) { + has_posted = 1; + td_post_begin(); + } + td_post(http_td_cb, http); + break; + case HS_DONE: + abort(); + default: + break; + } + } + + while ((cmsg = curl_multi_info_read(m_curlh, &nmsgs)) != NULL) { + BTPDQ_FOREACH(http, &m_httpq, entry) { + if (http->curlh == cmsg->easy_handle) + break; + } + assert(http != NULL); + BTPDQ_REMOVE(&m_httpq, http, entry); + http->state = HS_DONE; + if (cmsg->data.result == 0) + http->res.res = HRES_OK; + else { + http->res.res = HRES_FAIL; + http->res.code = cmsg->data.result; + } + curl_multi_remove_handle(m_curlh, http->curlh); + if (!has_posted) { + td_post_begin(); + has_posted = 1; + } + td_post(http_td_cb, http); + } + + if (has_posted) + td_post_end(); +} + +static void +http_td_curl(void) +{ + fd_set rset, wset, eset; + int maxfd, nbusy; + + pthread_mutex_unlock(&m_httpq_lock); + + do { + while (CURLM_CALL_MULTI_PERFORM == curl_multi_perform(m_curlh, &nbusy)) + ; + + FD_ZERO(&rset); + FD_ZERO(&wset); + FD_ZERO(&eset); + + curl_multi_fdset(m_curlh, &rset, &wset, &eset, &maxfd); + select(maxfd + 1, &rset, &wset, &eset, (& (struct timeval) { 2, 0})); + + pthread_mutex_lock(&m_httpq_lock); + http_td_actions(); + pthread_mutex_unlock(&m_httpq_lock); + } while (nbusy > 0); + + pthread_mutex_lock(&m_httpq_lock); +} + +static void * +http_td(void *arg) +{ + pthread_mutex_lock(&m_httpq_lock); + for (;;) { + while (BTPDQ_EMPTY(&m_httpq)) + pthread_cond_wait(&m_httpq_cond, &m_httpq_lock); + http_td_actions(); + if (!BTPDQ_EMPTY(&m_httpq)) + http_td_curl(); + } +} + +void +http_init(void) +{ + int err; + if (curl_global_init(0)) + goto curl_err; + if ((m_curlh = curl_multi_init()) == NULL) + goto curl_err; + + err = pthread_mutex_init(&m_httpq_lock, NULL); + if (err == 0) + err = pthread_cond_init(&m_httpq_cond, NULL); + if (err == 0) + err = pthread_create(&m_httptd, NULL, http_td, NULL); + if (err != 0) + btpd_err("pthread failure (%s)\n", strerror(err)); + return; +curl_err: + btpd_err("Fatal error in curl.\n"); +} diff --git a/btpd/http.h b/btpd/http.h new file mode 100644 index 0000000..55a504f --- /dev/null +++ b/btpd/http.h @@ -0,0 +1,29 @@ +#ifndef BTPD_HTTP_H +#define BTPD_HTTP_H + +struct http; + +enum http_status { + HRES_OK, + HRES_FAIL, + HRES_CANCEL +}; + +struct http_res { + enum http_status res; + long code; + char *content; + size_t length; +}; + +int http_get(struct http **ret, + void (*cb)(struct http *, struct http_res *, void *), + void *arg, + const char *fmt, ...); +void http_cancel(struct http *http); + +int http_succeeded(struct http_res *res); + +void http_init(void); + +#endif |