summaryrefslogtreecommitdiff
path: root/libs/libcurl/src/multi.c
diff options
context:
space:
mode:
Diffstat (limited to 'libs/libcurl/src/multi.c')
-rw-r--r--libs/libcurl/src/multi.c231
1 files changed, 187 insertions, 44 deletions
diff --git a/libs/libcurl/src/multi.c b/libs/libcurl/src/multi.c
index 3c7fb85ed8..4cc7c5ae61 100644
--- a/libs/libcurl/src/multi.c
+++ b/libs/libcurl/src/multi.c
@@ -190,7 +190,7 @@ static void mstate(struct Curl_easy *data, CURLMstate state
*/
struct Curl_sh_entry {
- struct curl_hash transfers; /* hash of transfers using this socket */
+ struct Curl_hash transfers; /* hash of transfers using this socket */
unsigned int action; /* what combined action READ/WRITE this socket waits
for */
void *socketp; /* settable by users with curl_multi_assign() */
@@ -204,7 +204,7 @@ struct Curl_sh_entry {
#define SH_WRITE 2
/* look up a given socket in the socket hash, skip invalid sockets */
-static struct Curl_sh_entry *sh_getentry(struct curl_hash *sh,
+static struct Curl_sh_entry *sh_getentry(struct Curl_hash *sh,
curl_socket_t s)
{
if(s != CURL_SOCKET_BAD) {
@@ -238,7 +238,7 @@ static void trhash_dtor(void *nada)
/* make sure this socket is present in the hash for this handle */
-static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh,
+static struct Curl_sh_entry *sh_addentry(struct Curl_hash *sh,
curl_socket_t s)
{
struct Curl_sh_entry *there = sh_getentry(sh, s);
@@ -273,7 +273,7 @@ static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh,
/* delete the given socket + handle from the hash */
static void sh_delentry(struct Curl_sh_entry *entry,
- struct curl_hash *sh, curl_socket_t s)
+ struct Curl_hash *sh, curl_socket_t s)
{
Curl_hash_destroy(&entry->transfers);
@@ -325,7 +325,7 @@ static size_t hash_fd(void *key, size_t key_length, size_t slots_num)
* per call."
*
*/
-static int sh_init(struct curl_hash *hash, int hashsize)
+static int sh_init(struct Curl_hash *hash, int hashsize)
{
return Curl_hash_init(hash, hashsize, hash_fd, fd_key_compare,
sh_freeentry);
@@ -374,6 +374,11 @@ struct Curl_multi *Curl_multi_handle(int hashsize, /* socket hash */
multi->max_concurrent_streams = 100;
multi->ipv6_works = Curl_ipv6works(NULL);
+#ifdef USE_WINSOCK
+ multi->wsa_event = WSACreateEvent();
+ if(multi->wsa_event == WSA_INVALID_EVENT)
+ goto error;
+#else
#ifdef ENABLE_WAKEUP
if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, multi->wakeup_pair) < 0) {
multi->wakeup_pair[0] = CURL_SOCKET_BAD;
@@ -387,6 +392,7 @@ struct Curl_multi *Curl_multi_handle(int hashsize, /* socket hash */
multi->wakeup_pair[1] = CURL_SOCKET_BAD;
}
#endif
+#endif
return multi;
@@ -716,7 +722,7 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi,
struct Curl_easy *easy = data;
bool premature;
bool easy_owns_conn;
- struct curl_llist_element *e;
+ struct Curl_llist_element *e;
/* First, make some basic checks that the CURLM handle is a good handle */
if(!GOOD_MULTI_HANDLE(multi))
@@ -1081,11 +1087,17 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
unsigned int i;
unsigned int nfds = 0;
unsigned int curlfds;
- bool ufds_malloc = FALSE;
long timeout_internal;
int retcode = 0;
+#ifndef USE_WINSOCK
struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
struct pollfd *ufds = &a_few_on_stack[0];
+ bool ufds_malloc = FALSE;
+#else
+ struct pollfd pre_poll;
+ WSANETWORKEVENTS wsa_events;
+ DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT);
+#endif
if(!GOOD_MULTI_HANDLE(multi))
return CURLM_BAD_HANDLE;
@@ -1131,11 +1143,16 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
nfds += extra_nfds; /* add the externally provided ones */
#ifdef ENABLE_WAKEUP
+#ifdef USE_WINSOCK
+ if(use_wakeup) {
+#else
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
+#endif
++nfds;
}
#endif
+#ifndef USE_WINSOCK
if(nfds > NUM_POLLS_ON_STACK) {
/* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes
big, so at 2^29 sockets this value might wrap. When a process gets
@@ -1146,7 +1163,9 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
return CURLM_OUT_OF_MEMORY;
ufds_malloc = TRUE;
}
+
nfds = 0;
+#endif
/* only do the second loop if we found descriptors in the first stage run
above */
@@ -1157,24 +1176,42 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
while(data) {
bitmap = multi_getsock(data, sockbunch);
- for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) {
+ for(i = 0; i < MAX_SOCKSPEREASYHANDLE; i++) {
curl_socket_t s = CURL_SOCKET_BAD;
-
+#ifdef USE_WINSOCK
+ long mask = 0;
+#endif
if(bitmap & GETSOCK_READSOCK(i)) {
+#ifdef USE_WINSOCK
+ if(timeout_ms && SOCKET_READABLE(sockbunch[i], 0) > 0)
+ timeout_ms = 0;
+ mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
+#else
ufds[nfds].fd = sockbunch[i];
ufds[nfds].events = POLLIN;
++nfds;
+#endif
s = sockbunch[i];
}
if(bitmap & GETSOCK_WRITESOCK(i)) {
+#ifdef USE_WINSOCK
+ if(timeout_ms && SOCKET_WRITABLE(sockbunch[i], 0) > 0)
+ timeout_ms = 0;
+ mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
+#else
ufds[nfds].fd = sockbunch[i];
ufds[nfds].events = POLLOUT;
++nfds;
+#endif
s = sockbunch[i];
}
if(s == CURL_SOCKET_BAD) {
break;
}
+#ifdef USE_WINSOCK
+ if(WSAEventSelect(s, multi->wsa_event, mask) != 0)
+ return CURLM_INTERNAL_ERROR;
+#endif
}
data = data->next; /* check next handle */
@@ -1183,6 +1220,37 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
/* Add external file descriptions from poll-like struct curl_waitfd */
for(i = 0; i < extra_nfds; i++) {
+#ifdef USE_WINSOCK
+ long mask = 0;
+ extra_fds[i].revents = 0;
+ pre_poll.fd = extra_fds[i].fd;
+ pre_poll.events = 0;
+ pre_poll.revents = 0;
+ if(extra_fds[i].events & CURL_WAIT_POLLIN) {
+ mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
+ pre_poll.events |= POLLIN;
+ }
+ if(extra_fds[i].events & CURL_WAIT_POLLPRI) {
+ mask |= FD_OOB;
+ pre_poll.events |= POLLPRI;
+ }
+ if(extra_fds[i].events & CURL_WAIT_POLLOUT) {
+ mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
+ pre_poll.events |= POLLOUT;
+ }
+ if(Curl_poll(&pre_poll, 1, 0) > 0) {
+ if(pre_poll.revents & POLLIN)
+ extra_fds[i].revents |= CURL_WAIT_POLLIN;
+ if(pre_poll.revents & POLLPRI)
+ extra_fds[i].revents |= CURL_WAIT_POLLPRI;
+ if(pre_poll.revents & POLLOUT)
+ extra_fds[i].revents |= CURL_WAIT_POLLOUT;
+ if(extra_fds[i].revents)
+ timeout_ms = 0;
+ }
+ if(WSAEventSelect(extra_fds[i].fd, multi->wsa_event, mask) != 0)
+ return CURLM_INTERNAL_ERROR;
+#else
ufds[nfds].fd = extra_fds[i].fd;
ufds[nfds].events = 0;
if(extra_fds[i].events & CURL_WAIT_POLLIN)
@@ -1192,28 +1260,57 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
if(extra_fds[i].events & CURL_WAIT_POLLOUT)
ufds[nfds].events |= POLLOUT;
++nfds;
+#endif
}
#ifdef ENABLE_WAKEUP
+#ifndef USE_WINSOCK
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
ufds[nfds].fd = multi->wakeup_pair[0];
ufds[nfds].events = POLLIN;
++nfds;
}
#endif
+#endif
if(nfds) {
- int pollrc;
/* wait... */
- pollrc = Curl_poll(ufds, nfds, timeout_ms);
+#ifdef USE_WINSOCK
+ WSAWaitForMultipleEvents(1, &multi->wsa_event, FALSE, timeout_ms, FALSE);
+#else
+ int pollrc = Curl_poll(ufds, nfds, timeout_ms);
+#endif
+#ifdef USE_WINSOCK
+ /* With Winsock, we have to run this unconditionally to call
+ WSAEventSelect(fd, event, 0) on all the sockets */
+ {
+ retcode = 0;
+#else
if(pollrc > 0) {
retcode = pollrc;
+#endif
/* copy revents results from the poll to the curl_multi_wait poll
struct, the bit values of the actual underlying poll() implementation
may not be the same as the ones in the public libcurl API! */
for(i = 0; i < extra_nfds; i++) {
unsigned short mask = 0;
+#ifdef USE_WINSOCK
+ wsa_events.lNetworkEvents = 0;
+ mask = extra_fds[i].revents;
+ if(WSAEnumNetworkEvents(extra_fds[i].fd, multi->wsa_event,
+ &wsa_events) == 0) {
+ if(wsa_events.lNetworkEvents & (FD_READ|FD_ACCEPT|FD_CLOSE))
+ mask |= CURL_WAIT_POLLIN;
+ if(wsa_events.lNetworkEvents & (FD_WRITE|FD_CONNECT|FD_CLOSE))
+ mask |= CURL_WAIT_POLLOUT;
+ if(wsa_events.lNetworkEvents & FD_OOB)
+ mask |= CURL_WAIT_POLLPRI;
+ if(ret && wsa_events.lNetworkEvents != 0)
+ retcode++;
+ }
+ WSAEventSelect(extra_fds[i].fd, multi->wsa_event, 0);
+#else
unsigned r = ufds[curlfds + i].revents;
if(r & POLLIN)
@@ -1222,10 +1319,46 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
mask |= CURL_WAIT_POLLOUT;
if(r & POLLPRI)
mask |= CURL_WAIT_POLLPRI;
-
+#endif
extra_fds[i].revents = mask;
}
+#ifdef USE_WINSOCK
+ /* Count up all our own sockets that had activity,
+ and remove them from the event. */
+ if(curlfds) {
+ data = multi->easyp;
+ while(data) {
+ bitmap = multi_getsock(data, sockbunch);
+
+ for(i = 0; i < MAX_SOCKSPEREASYHANDLE; i++) {
+ if(bitmap & (GETSOCK_READSOCK(i) | GETSOCK_WRITESOCK(i))) {
+ wsa_events.lNetworkEvents = 0;
+ if(WSAEnumNetworkEvents(sockbunch[i], multi->wsa_event,
+ &wsa_events) == 0) {
+ if(ret && wsa_events.lNetworkEvents != 0)
+ retcode++;
+ }
+ if(ret && !timeout_ms && wsa_events.lNetworkEvents == 0) {
+ if((bitmap & GETSOCK_READSOCK(i)) &&
+ SOCKET_READABLE(sockbunch[i], 0) > 0)
+ retcode++;
+ else if((bitmap & GETSOCK_WRITESOCK(i)) &&
+ SOCKET_WRITABLE(sockbunch[i], 0) > 0)
+ retcode++;
+ }
+ WSAEventSelect(sockbunch[i], multi->wsa_event, 0);
+ }
+ else
+ break;
+ }
+
+ data = data->next;
+ }
+ }
+
+ WSAResetEvent(multi->wsa_event);
+#else
#ifdef ENABLE_WAKEUP
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(ufds[curlfds + extra_nfds].revents & POLLIN) {
@@ -1238,10 +1371,8 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
when there is no more data, breaking the loop. */
nread = sread(multi->wakeup_pair[0], buf, sizeof(buf));
if(nread <= 0) {
-#ifndef USE_WINSOCK
if(nread < 0 && EINTR == SOCKERRNO)
continue;
-#endif
break;
}
}
@@ -1250,11 +1381,14 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi,
}
}
#endif
+#endif
}
}
+#ifndef USE_WINSOCK
if(ufds_malloc)
free(ufds);
+#endif
if(ret)
*ret = retcode;
if(!extrawait || nfds)
@@ -1309,6 +1443,10 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi)
return CURLM_BAD_HANDLE;
#ifdef ENABLE_WAKEUP
+#ifdef USE_WINSOCK
+ if(WSASetEvent(multi->wsa_event))
+ return CURLM_OK;
+#else
/* the wakeup_pair variable is only written during init and cleanup,
making it safe to access from another thread after the init part
and before cleanup */
@@ -1342,6 +1480,7 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi)
}
}
#endif
+#endif
return CURLM_WAKEUP_FAILURE;
}
@@ -1562,7 +1701,7 @@ CURLcode Curl_preconnect(struct Curl_easy *data)
static CURLMcode multi_runsingle(struct Curl_multi *multi,
- struct curltime now,
+ struct curltime *nowp,
struct Curl_easy *data)
{
struct Curl_message *msg = NULL;
@@ -1603,7 +1742,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
(data->mstate < CURLM_STATE_COMPLETED)) {
/* we need to wait for the connect state as only then is the start time
stored, but we must not check already completed handles */
- timeout_ms = Curl_timeleft(data, &now,
+ timeout_ms = Curl_timeleft(data, nowp,
(data->mstate <= CURLM_STATE_DO)?
TRUE:FALSE);
@@ -1612,25 +1751,25 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
if(data->mstate == CURLM_STATE_WAITRESOLVE)
failf(data, "Resolving timed out after %" CURL_FORMAT_TIMEDIFF_T
" milliseconds",
- Curl_timediff(now, data->progress.t_startsingle));
+ Curl_timediff(*nowp, data->progress.t_startsingle));
else if(data->mstate == CURLM_STATE_WAITCONNECT)
failf(data, "Connection timed out after %" CURL_FORMAT_TIMEDIFF_T
" milliseconds",
- Curl_timediff(now, data->progress.t_startsingle));
+ Curl_timediff(*nowp, data->progress.t_startsingle));
else {
struct SingleRequest *k = &data->req;
if(k->size != -1) {
failf(data, "Operation timed out after %" CURL_FORMAT_TIMEDIFF_T
" milliseconds with %" CURL_FORMAT_CURL_OFF_T " out of %"
CURL_FORMAT_CURL_OFF_T " bytes received",
- Curl_timediff(now, data->progress.t_startsingle),
+ Curl_timediff(*nowp, data->progress.t_startsingle),
k->bytecount, k->size);
}
else {
failf(data, "Operation timed out after %" CURL_FORMAT_TIMEDIFF_T
" milliseconds with %" CURL_FORMAT_CURL_OFF_T
" bytes received",
- Curl_timediff(now, data->progress.t_startsingle),
+ Curl_timediff(*nowp, data->progress.t_startsingle),
k->bytecount);
}
}
@@ -1655,7 +1794,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
if(!result) {
/* after init, go CONNECT */
multistate(data, CURLM_STATE_CONNECT);
- Curl_pgrsTime(data, TIMER_STARTOP);
+ *nowp = Curl_pgrsTime(data, TIMER_STARTOP);
rc = CURLM_CALL_MULTI_PERFORM;
}
break;
@@ -1672,7 +1811,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
if(result)
break;
- Curl_pgrsTime(data, TIMER_STARTSINGLE);
+ *nowp = Curl_pgrsTime(data, TIMER_STARTSINGLE);
if(data->set.timeout)
Curl_expire(data, data->set.timeout, EXPIRE_TIMEOUT);
@@ -2080,7 +2219,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
if(Curl_pgrsUpdate(data->conn))
result = CURLE_ABORTED_BY_CALLBACK;
else
- result = Curl_speedcheck(data, now);
+ result = Curl_speedcheck(data, *nowp);
if(!result) {
send_timeout_ms = 0;
@@ -2090,7 +2229,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
data->progress.ul_limit_size,
data->set.max_send_speed,
data->progress.ul_limit_start,
- now);
+ *nowp);
recv_timeout_ms = 0;
if(data->set.max_recv_speed > 0)
@@ -2099,11 +2238,11 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
data->progress.dl_limit_size,
data->set.max_recv_speed,
data->progress.dl_limit_start,
- now);
+ *nowp);
if(!send_timeout_ms && !recv_timeout_ms) {
multistate(data, CURLM_STATE_PERFORM);
- Curl_ratelimit(data, now);
+ Curl_ratelimit(data, *nowp);
}
else if(send_timeout_ms >= recv_timeout_ms)
Curl_expire(data, send_timeout_ms, EXPIRE_TOOFAST);
@@ -2125,7 +2264,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
data->progress.ul_limit_size,
data->set.max_send_speed,
data->progress.ul_limit_start,
- now);
+ *nowp);
/* check if over recv speed */
recv_timeout_ms = 0;
@@ -2134,10 +2273,10 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
data->progress.dl_limit_size,
data->set.max_recv_speed,
data->progress.dl_limit_start,
- now);
+ *nowp);
if(send_timeout_ms || recv_timeout_ms) {
- Curl_ratelimit(data, now);
+ Curl_ratelimit(data, *nowp);
multistate(data, CURLM_STATE_TOOFAST);
if(send_timeout_ms >= recv_timeout_ms)
Curl_expire(data, send_timeout_ms, EXPIRE_TOOFAST);
@@ -2417,7 +2556,7 @@ CURLMcode curl_multi_perform(struct Curl_multi *multi, int *running_handles)
SIGPIPE_VARIABLE(pipe_st);
sigpipe_ignore(data, &pipe_st);
- result = multi_runsingle(multi, now, data);
+ result = multi_runsingle(multi, &now, data);
sigpipe_restore(&pipe_st);
if(result)
@@ -2500,10 +2639,14 @@ CURLMcode curl_multi_cleanup(struct Curl_multi *multi)
Curl_hash_destroy(&multi->hostcache);
Curl_psl_destroy(&multi->psl);
+#ifdef USE_WINSOCK
+ WSACloseEvent(multi->wsa_event);
+#else
#ifdef ENABLE_WAKEUP
sclose(multi->wakeup_pair[0]);
sclose(multi->wakeup_pair[1]);
#endif
+#endif
free(multi);
return CURLM_OK;
@@ -2531,7 +2674,7 @@ CURLMsg *curl_multi_info_read(struct Curl_multi *multi, int *msgs_in_queue)
!multi->in_callback &&
Curl_llist_count(&multi->msglist)) {
/* there is one or more messages in the list */
- struct curl_llist_element *e;
+ struct Curl_llist_element *e;
/* extract the head of the list to return */
e = multi->msglist.head;
@@ -2761,15 +2904,15 @@ static CURLMcode add_next_timeout(struct curltime now,
struct Curl_easy *d)
{
struct curltime *tv = &d->state.expiretime;
- struct curl_llist *list = &d->state.timeoutlist;
- struct curl_llist_element *e;
+ struct Curl_llist *list = &d->state.timeoutlist;
+ struct Curl_llist_element *e;
struct time_node *node = NULL;
/* move over the timeout list for this specific handle and remove all
timeouts that are now passed tense and store the next pending
timeout in *tv */
for(e = list->head; e;) {
- struct curl_llist_element *n = e->next;
+ struct Curl_llist_element *n = e->next;
timediff_t diff;
node = (struct time_node *)e->ptr;
diff = Curl_timediff(node->time, now);
@@ -2839,8 +2982,8 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
and just move on. */
;
else {
- struct curl_hash_iterator iter;
- struct curl_hash_element *he;
+ struct Curl_hash_iterator iter;
+ struct Curl_hash_element *he;
/* the socket can be shared by many transfers, iterate */
Curl_hash_start_iterate(&entry->transfers, &iter);
@@ -2887,7 +3030,7 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
SIGPIPE_VARIABLE(pipe_st);
sigpipe_ignore(data, &pipe_st);
- result = multi_runsingle(multi, now, data);
+ result = multi_runsingle(multi, &now, data);
sigpipe_restore(&pipe_st);
if(CURLM_OK >= result) {
@@ -3123,8 +3266,8 @@ void Curl_update_timer(struct Curl_multi *multi)
static void
multi_deltimeout(struct Curl_easy *data, expire_id eid)
{
- struct curl_llist_element *e;
- struct curl_llist *timeoutlist = &data->state.timeoutlist;
+ struct Curl_llist_element *e;
+ struct Curl_llist *timeoutlist = &data->state.timeoutlist;
/* find and remove the specific node from the list */
for(e = timeoutlist->head; e; e = e->next) {
struct time_node *n = (struct time_node *)e->ptr;
@@ -3147,11 +3290,11 @@ multi_addtimeout(struct Curl_easy *data,
struct curltime *stamp,
expire_id eid)
{
- struct curl_llist_element *e;
+ struct Curl_llist_element *e;
struct time_node *node;
- struct curl_llist_element *prev = NULL;
+ struct Curl_llist_element *prev = NULL;
size_t n;
- struct curl_llist *timeoutlist = &data->state.timeoutlist;
+ struct Curl_llist *timeoutlist = &data->state.timeoutlist;
node = &data->state.expires[eid];
@@ -3278,7 +3421,7 @@ void Curl_expire_clear(struct Curl_easy *data)
if(nowp->tv_sec || nowp->tv_usec) {
/* Since this is an cleared time, we must remove the previous entry from
the splay tree */
- struct curl_llist *list = &data->state.timeoutlist;
+ struct Curl_llist *list = &data->state.timeoutlist;
int rc;
rc = Curl_splayremovebyaddr(multi->timetree,
@@ -3349,7 +3492,7 @@ void Curl_multiuse_state(struct connectdata *conn,
static void process_pending_handles(struct Curl_multi *multi)
{
- struct curl_llist_element *e = multi->pending.head;
+ struct Curl_llist_element *e = multi->pending.head;
if(e) {
struct Curl_easy *data = e->ptr;