diff options
Diffstat (limited to 'libs/libcurl/src/multi.c')
-rw-r--r-- | libs/libcurl/src/multi.c | 231 |
1 files changed, 187 insertions, 44 deletions
diff --git a/libs/libcurl/src/multi.c b/libs/libcurl/src/multi.c index 3c7fb85ed8..4cc7c5ae61 100644 --- a/libs/libcurl/src/multi.c +++ b/libs/libcurl/src/multi.c @@ -190,7 +190,7 @@ static void mstate(struct Curl_easy *data, CURLMstate state */ struct Curl_sh_entry { - struct curl_hash transfers; /* hash of transfers using this socket */ + struct Curl_hash transfers; /* hash of transfers using this socket */ unsigned int action; /* what combined action READ/WRITE this socket waits for */ void *socketp; /* settable by users with curl_multi_assign() */ @@ -204,7 +204,7 @@ struct Curl_sh_entry { #define SH_WRITE 2 /* look up a given socket in the socket hash, skip invalid sockets */ -static struct Curl_sh_entry *sh_getentry(struct curl_hash *sh, +static struct Curl_sh_entry *sh_getentry(struct Curl_hash *sh, curl_socket_t s) { if(s != CURL_SOCKET_BAD) { @@ -238,7 +238,7 @@ static void trhash_dtor(void *nada) /* make sure this socket is present in the hash for this handle */ -static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh, +static struct Curl_sh_entry *sh_addentry(struct Curl_hash *sh, curl_socket_t s) { struct Curl_sh_entry *there = sh_getentry(sh, s); @@ -273,7 +273,7 @@ static struct Curl_sh_entry *sh_addentry(struct curl_hash *sh, /* delete the given socket + handle from the hash */ static void sh_delentry(struct Curl_sh_entry *entry, - struct curl_hash *sh, curl_socket_t s) + struct Curl_hash *sh, curl_socket_t s) { Curl_hash_destroy(&entry->transfers); @@ -325,7 +325,7 @@ static size_t hash_fd(void *key, size_t key_length, size_t slots_num) * per call." * */ -static int sh_init(struct curl_hash *hash, int hashsize) +static int sh_init(struct Curl_hash *hash, int hashsize) { return Curl_hash_init(hash, hashsize, hash_fd, fd_key_compare, sh_freeentry); @@ -374,6 +374,11 @@ struct Curl_multi *Curl_multi_handle(int hashsize, /* socket hash */ multi->max_concurrent_streams = 100; multi->ipv6_works = Curl_ipv6works(NULL); +#ifdef USE_WINSOCK + multi->wsa_event = WSACreateEvent(); + if(multi->wsa_event == WSA_INVALID_EVENT) + goto error; +#else #ifdef ENABLE_WAKEUP if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, multi->wakeup_pair) < 0) { multi->wakeup_pair[0] = CURL_SOCKET_BAD; @@ -387,6 +392,7 @@ struct Curl_multi *Curl_multi_handle(int hashsize, /* socket hash */ multi->wakeup_pair[1] = CURL_SOCKET_BAD; } #endif +#endif return multi; @@ -716,7 +722,7 @@ CURLMcode curl_multi_remove_handle(struct Curl_multi *multi, struct Curl_easy *easy = data; bool premature; bool easy_owns_conn; - struct curl_llist_element *e; + struct Curl_llist_element *e; /* First, make some basic checks that the CURLM handle is a good handle */ if(!GOOD_MULTI_HANDLE(multi)) @@ -1081,11 +1087,17 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi, unsigned int i; unsigned int nfds = 0; unsigned int curlfds; - bool ufds_malloc = FALSE; long timeout_internal; int retcode = 0; +#ifndef USE_WINSOCK struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK]; struct pollfd *ufds = &a_few_on_stack[0]; + bool ufds_malloc = FALSE; +#else + struct pollfd pre_poll; + WSANETWORKEVENTS wsa_events; + DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT); +#endif if(!GOOD_MULTI_HANDLE(multi)) return CURLM_BAD_HANDLE; @@ -1131,11 +1143,16 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi, nfds += extra_nfds; /* add the externally provided ones */ #ifdef ENABLE_WAKEUP +#ifdef USE_WINSOCK + if(use_wakeup) { +#else if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { +#endif ++nfds; } #endif +#ifndef USE_WINSOCK if(nfds > NUM_POLLS_ON_STACK) { /* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes big, so at 2^29 sockets this value might wrap. When a process gets @@ -1146,7 +1163,9 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi, return CURLM_OUT_OF_MEMORY; ufds_malloc = TRUE; } + nfds = 0; +#endif /* only do the second loop if we found descriptors in the first stage run above */ @@ -1157,24 +1176,42 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi, while(data) { bitmap = multi_getsock(data, sockbunch); - for(i = 0; i< MAX_SOCKSPEREASYHANDLE; i++) { + for(i = 0; i < MAX_SOCKSPEREASYHANDLE; i++) { curl_socket_t s = CURL_SOCKET_BAD; - +#ifdef USE_WINSOCK + long mask = 0; +#endif if(bitmap & GETSOCK_READSOCK(i)) { +#ifdef USE_WINSOCK + if(timeout_ms && SOCKET_READABLE(sockbunch[i], 0) > 0) + timeout_ms = 0; + mask |= FD_READ|FD_ACCEPT|FD_CLOSE; +#else ufds[nfds].fd = sockbunch[i]; ufds[nfds].events = POLLIN; ++nfds; +#endif s = sockbunch[i]; } if(bitmap & GETSOCK_WRITESOCK(i)) { +#ifdef USE_WINSOCK + if(timeout_ms && SOCKET_WRITABLE(sockbunch[i], 0) > 0) + timeout_ms = 0; + mask |= FD_WRITE|FD_CONNECT|FD_CLOSE; +#else ufds[nfds].fd = sockbunch[i]; ufds[nfds].events = POLLOUT; ++nfds; +#endif s = sockbunch[i]; } if(s == CURL_SOCKET_BAD) { break; } +#ifdef USE_WINSOCK + if(WSAEventSelect(s, multi->wsa_event, mask) != 0) + return CURLM_INTERNAL_ERROR; +#endif } data = data->next; /* check next handle */ @@ -1183,6 +1220,37 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi, /* Add external file descriptions from poll-like struct curl_waitfd */ for(i = 0; i < extra_nfds; i++) { +#ifdef USE_WINSOCK + long mask = 0; + extra_fds[i].revents = 0; + pre_poll.fd = extra_fds[i].fd; + pre_poll.events = 0; + pre_poll.revents = 0; + if(extra_fds[i].events & CURL_WAIT_POLLIN) { + mask |= FD_READ|FD_ACCEPT|FD_CLOSE; + pre_poll.events |= POLLIN; + } + if(extra_fds[i].events & CURL_WAIT_POLLPRI) { + mask |= FD_OOB; + pre_poll.events |= POLLPRI; + } + if(extra_fds[i].events & CURL_WAIT_POLLOUT) { + mask |= FD_WRITE|FD_CONNECT|FD_CLOSE; + pre_poll.events |= POLLOUT; + } + if(Curl_poll(&pre_poll, 1, 0) > 0) { + if(pre_poll.revents & POLLIN) + extra_fds[i].revents |= CURL_WAIT_POLLIN; + if(pre_poll.revents & POLLPRI) + extra_fds[i].revents |= CURL_WAIT_POLLPRI; + if(pre_poll.revents & POLLOUT) + extra_fds[i].revents |= CURL_WAIT_POLLOUT; + if(extra_fds[i].revents) + timeout_ms = 0; + } + if(WSAEventSelect(extra_fds[i].fd, multi->wsa_event, mask) != 0) + return CURLM_INTERNAL_ERROR; +#else ufds[nfds].fd = extra_fds[i].fd; ufds[nfds].events = 0; if(extra_fds[i].events & CURL_WAIT_POLLIN) @@ -1192,28 +1260,57 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi, if(extra_fds[i].events & CURL_WAIT_POLLOUT) ufds[nfds].events |= POLLOUT; ++nfds; +#endif } #ifdef ENABLE_WAKEUP +#ifndef USE_WINSOCK if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { ufds[nfds].fd = multi->wakeup_pair[0]; ufds[nfds].events = POLLIN; ++nfds; } #endif +#endif if(nfds) { - int pollrc; /* wait... */ - pollrc = Curl_poll(ufds, nfds, timeout_ms); +#ifdef USE_WINSOCK + WSAWaitForMultipleEvents(1, &multi->wsa_event, FALSE, timeout_ms, FALSE); +#else + int pollrc = Curl_poll(ufds, nfds, timeout_ms); +#endif +#ifdef USE_WINSOCK + /* With Winsock, we have to run this unconditionally to call + WSAEventSelect(fd, event, 0) on all the sockets */ + { + retcode = 0; +#else if(pollrc > 0) { retcode = pollrc; +#endif /* copy revents results from the poll to the curl_multi_wait poll struct, the bit values of the actual underlying poll() implementation may not be the same as the ones in the public libcurl API! */ for(i = 0; i < extra_nfds; i++) { unsigned short mask = 0; +#ifdef USE_WINSOCK + wsa_events.lNetworkEvents = 0; + mask = extra_fds[i].revents; + if(WSAEnumNetworkEvents(extra_fds[i].fd, multi->wsa_event, + &wsa_events) == 0) { + if(wsa_events.lNetworkEvents & (FD_READ|FD_ACCEPT|FD_CLOSE)) + mask |= CURL_WAIT_POLLIN; + if(wsa_events.lNetworkEvents & (FD_WRITE|FD_CONNECT|FD_CLOSE)) + mask |= CURL_WAIT_POLLOUT; + if(wsa_events.lNetworkEvents & FD_OOB) + mask |= CURL_WAIT_POLLPRI; + if(ret && wsa_events.lNetworkEvents != 0) + retcode++; + } + WSAEventSelect(extra_fds[i].fd, multi->wsa_event, 0); +#else unsigned r = ufds[curlfds + i].revents; if(r & POLLIN) @@ -1222,10 +1319,46 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi, mask |= CURL_WAIT_POLLOUT; if(r & POLLPRI) mask |= CURL_WAIT_POLLPRI; - +#endif extra_fds[i].revents = mask; } +#ifdef USE_WINSOCK + /* Count up all our own sockets that had activity, + and remove them from the event. */ + if(curlfds) { + data = multi->easyp; + while(data) { + bitmap = multi_getsock(data, sockbunch); + + for(i = 0; i < MAX_SOCKSPEREASYHANDLE; i++) { + if(bitmap & (GETSOCK_READSOCK(i) | GETSOCK_WRITESOCK(i))) { + wsa_events.lNetworkEvents = 0; + if(WSAEnumNetworkEvents(sockbunch[i], multi->wsa_event, + &wsa_events) == 0) { + if(ret && wsa_events.lNetworkEvents != 0) + retcode++; + } + if(ret && !timeout_ms && wsa_events.lNetworkEvents == 0) { + if((bitmap & GETSOCK_READSOCK(i)) && + SOCKET_READABLE(sockbunch[i], 0) > 0) + retcode++; + else if((bitmap & GETSOCK_WRITESOCK(i)) && + SOCKET_WRITABLE(sockbunch[i], 0) > 0) + retcode++; + } + WSAEventSelect(sockbunch[i], multi->wsa_event, 0); + } + else + break; + } + + data = data->next; + } + } + + WSAResetEvent(multi->wsa_event); +#else #ifdef ENABLE_WAKEUP if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { if(ufds[curlfds + extra_nfds].revents & POLLIN) { @@ -1238,10 +1371,8 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi, when there is no more data, breaking the loop. */ nread = sread(multi->wakeup_pair[0], buf, sizeof(buf)); if(nread <= 0) { -#ifndef USE_WINSOCK if(nread < 0 && EINTR == SOCKERRNO) continue; -#endif break; } } @@ -1250,11 +1381,14 @@ static CURLMcode Curl_multi_wait(struct Curl_multi *multi, } } #endif +#endif } } +#ifndef USE_WINSOCK if(ufds_malloc) free(ufds); +#endif if(ret) *ret = retcode; if(!extrawait || nfds) @@ -1309,6 +1443,10 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi) return CURLM_BAD_HANDLE; #ifdef ENABLE_WAKEUP +#ifdef USE_WINSOCK + if(WSASetEvent(multi->wsa_event)) + return CURLM_OK; +#else /* the wakeup_pair variable is only written during init and cleanup, making it safe to access from another thread after the init part and before cleanup */ @@ -1342,6 +1480,7 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi) } } #endif +#endif return CURLM_WAKEUP_FAILURE; } @@ -1562,7 +1701,7 @@ CURLcode Curl_preconnect(struct Curl_easy *data) static CURLMcode multi_runsingle(struct Curl_multi *multi, - struct curltime now, + struct curltime *nowp, struct Curl_easy *data) { struct Curl_message *msg = NULL; @@ -1603,7 +1742,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, (data->mstate < CURLM_STATE_COMPLETED)) { /* we need to wait for the connect state as only then is the start time stored, but we must not check already completed handles */ - timeout_ms = Curl_timeleft(data, &now, + timeout_ms = Curl_timeleft(data, nowp, (data->mstate <= CURLM_STATE_DO)? TRUE:FALSE); @@ -1612,25 +1751,25 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, if(data->mstate == CURLM_STATE_WAITRESOLVE) failf(data, "Resolving timed out after %" CURL_FORMAT_TIMEDIFF_T " milliseconds", - Curl_timediff(now, data->progress.t_startsingle)); + Curl_timediff(*nowp, data->progress.t_startsingle)); else if(data->mstate == CURLM_STATE_WAITCONNECT) failf(data, "Connection timed out after %" CURL_FORMAT_TIMEDIFF_T " milliseconds", - Curl_timediff(now, data->progress.t_startsingle)); + Curl_timediff(*nowp, data->progress.t_startsingle)); else { struct SingleRequest *k = &data->req; if(k->size != -1) { failf(data, "Operation timed out after %" CURL_FORMAT_TIMEDIFF_T " milliseconds with %" CURL_FORMAT_CURL_OFF_T " out of %" CURL_FORMAT_CURL_OFF_T " bytes received", - Curl_timediff(now, data->progress.t_startsingle), + Curl_timediff(*nowp, data->progress.t_startsingle), k->bytecount, k->size); } else { failf(data, "Operation timed out after %" CURL_FORMAT_TIMEDIFF_T " milliseconds with %" CURL_FORMAT_CURL_OFF_T " bytes received", - Curl_timediff(now, data->progress.t_startsingle), + Curl_timediff(*nowp, data->progress.t_startsingle), k->bytecount); } } @@ -1655,7 +1794,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, if(!result) { /* after init, go CONNECT */ multistate(data, CURLM_STATE_CONNECT); - Curl_pgrsTime(data, TIMER_STARTOP); + *nowp = Curl_pgrsTime(data, TIMER_STARTOP); rc = CURLM_CALL_MULTI_PERFORM; } break; @@ -1672,7 +1811,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, if(result) break; - Curl_pgrsTime(data, TIMER_STARTSINGLE); + *nowp = Curl_pgrsTime(data, TIMER_STARTSINGLE); if(data->set.timeout) Curl_expire(data, data->set.timeout, EXPIRE_TIMEOUT); @@ -2080,7 +2219,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, if(Curl_pgrsUpdate(data->conn)) result = CURLE_ABORTED_BY_CALLBACK; else - result = Curl_speedcheck(data, now); + result = Curl_speedcheck(data, *nowp); if(!result) { send_timeout_ms = 0; @@ -2090,7 +2229,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, data->progress.ul_limit_size, data->set.max_send_speed, data->progress.ul_limit_start, - now); + *nowp); recv_timeout_ms = 0; if(data->set.max_recv_speed > 0) @@ -2099,11 +2238,11 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, data->progress.dl_limit_size, data->set.max_recv_speed, data->progress.dl_limit_start, - now); + *nowp); if(!send_timeout_ms && !recv_timeout_ms) { multistate(data, CURLM_STATE_PERFORM); - Curl_ratelimit(data, now); + Curl_ratelimit(data, *nowp); } else if(send_timeout_ms >= recv_timeout_ms) Curl_expire(data, send_timeout_ms, EXPIRE_TOOFAST); @@ -2125,7 +2264,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, data->progress.ul_limit_size, data->set.max_send_speed, data->progress.ul_limit_start, - now); + *nowp); /* check if over recv speed */ recv_timeout_ms = 0; @@ -2134,10 +2273,10 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, data->progress.dl_limit_size, data->set.max_recv_speed, data->progress.dl_limit_start, - now); + *nowp); if(send_timeout_ms || recv_timeout_ms) { - Curl_ratelimit(data, now); + Curl_ratelimit(data, *nowp); multistate(data, CURLM_STATE_TOOFAST); if(send_timeout_ms >= recv_timeout_ms) Curl_expire(data, send_timeout_ms, EXPIRE_TOOFAST); @@ -2417,7 +2556,7 @@ CURLMcode curl_multi_perform(struct Curl_multi *multi, int *running_handles) SIGPIPE_VARIABLE(pipe_st); sigpipe_ignore(data, &pipe_st); - result = multi_runsingle(multi, now, data); + result = multi_runsingle(multi, &now, data); sigpipe_restore(&pipe_st); if(result) @@ -2500,10 +2639,14 @@ CURLMcode curl_multi_cleanup(struct Curl_multi *multi) Curl_hash_destroy(&multi->hostcache); Curl_psl_destroy(&multi->psl); +#ifdef USE_WINSOCK + WSACloseEvent(multi->wsa_event); +#else #ifdef ENABLE_WAKEUP sclose(multi->wakeup_pair[0]); sclose(multi->wakeup_pair[1]); #endif +#endif free(multi); return CURLM_OK; @@ -2531,7 +2674,7 @@ CURLMsg *curl_multi_info_read(struct Curl_multi *multi, int *msgs_in_queue) !multi->in_callback && Curl_llist_count(&multi->msglist)) { /* there is one or more messages in the list */ - struct curl_llist_element *e; + struct Curl_llist_element *e; /* extract the head of the list to return */ e = multi->msglist.head; @@ -2761,15 +2904,15 @@ static CURLMcode add_next_timeout(struct curltime now, struct Curl_easy *d) { struct curltime *tv = &d->state.expiretime; - struct curl_llist *list = &d->state.timeoutlist; - struct curl_llist_element *e; + struct Curl_llist *list = &d->state.timeoutlist; + struct Curl_llist_element *e; struct time_node *node = NULL; /* move over the timeout list for this specific handle and remove all timeouts that are now passed tense and store the next pending timeout in *tv */ for(e = list->head; e;) { - struct curl_llist_element *n = e->next; + struct Curl_llist_element *n = e->next; timediff_t diff; node = (struct time_node *)e->ptr; diff = Curl_timediff(node->time, now); @@ -2839,8 +2982,8 @@ static CURLMcode multi_socket(struct Curl_multi *multi, and just move on. */ ; else { - struct curl_hash_iterator iter; - struct curl_hash_element *he; + struct Curl_hash_iterator iter; + struct Curl_hash_element *he; /* the socket can be shared by many transfers, iterate */ Curl_hash_start_iterate(&entry->transfers, &iter); @@ -2887,7 +3030,7 @@ static CURLMcode multi_socket(struct Curl_multi *multi, SIGPIPE_VARIABLE(pipe_st); sigpipe_ignore(data, &pipe_st); - result = multi_runsingle(multi, now, data); + result = multi_runsingle(multi, &now, data); sigpipe_restore(&pipe_st); if(CURLM_OK >= result) { @@ -3123,8 +3266,8 @@ void Curl_update_timer(struct Curl_multi *multi) static void multi_deltimeout(struct Curl_easy *data, expire_id eid) { - struct curl_llist_element *e; - struct curl_llist *timeoutlist = &data->state.timeoutlist; + struct Curl_llist_element *e; + struct Curl_llist *timeoutlist = &data->state.timeoutlist; /* find and remove the specific node from the list */ for(e = timeoutlist->head; e; e = e->next) { struct time_node *n = (struct time_node *)e->ptr; @@ -3147,11 +3290,11 @@ multi_addtimeout(struct Curl_easy *data, struct curltime *stamp, expire_id eid) { - struct curl_llist_element *e; + struct Curl_llist_element *e; struct time_node *node; - struct curl_llist_element *prev = NULL; + struct Curl_llist_element *prev = NULL; size_t n; - struct curl_llist *timeoutlist = &data->state.timeoutlist; + struct Curl_llist *timeoutlist = &data->state.timeoutlist; node = &data->state.expires[eid]; @@ -3278,7 +3421,7 @@ void Curl_expire_clear(struct Curl_easy *data) if(nowp->tv_sec || nowp->tv_usec) { /* Since this is an cleared time, we must remove the previous entry from the splay tree */ - struct curl_llist *list = &data->state.timeoutlist; + struct Curl_llist *list = &data->state.timeoutlist; int rc; rc = Curl_splayremovebyaddr(multi->timetree, @@ -3349,7 +3492,7 @@ void Curl_multiuse_state(struct connectdata *conn, static void process_pending_handles(struct Curl_multi *multi) { - struct curl_llist_element *e = multi->pending.head; + struct Curl_llist_element *e = multi->pending.head; if(e) { struct Curl_easy *data = e->ptr; |