diff options
author | George Hazan <ghazan@miranda.im> | 2019-12-23 13:53:27 +0300 |
---|---|---|
committer | George Hazan <ghazan@miranda.im> | 2019-12-23 13:53:27 +0300 |
commit | a29c9194cf2d9f11839ea833872beab754e19527 (patch) | |
tree | 2a1ff6475dd77d41a1e6bf6acfb5be479c5543eb /libs/libmosquitto/src/loop.c | |
parent | c1358991022919836c77d630ee9a3719fff86ed0 (diff) |
libmosquitto - a helper for MQTT clients
Diffstat (limited to 'libs/libmosquitto/src/loop.c')
-rw-r--r-- | libs/libmosquitto/src/loop.c | 390 |
1 files changed, 390 insertions, 0 deletions
diff --git a/libs/libmosquitto/src/loop.c b/libs/libmosquitto/src/loop.c new file mode 100644 index 0000000000..2342c945aa --- /dev/null +++ b/libs/libmosquitto/src/loop.c @@ -0,0 +1,390 @@ +/* +Copyright (c) 2010-2019 Roger Light <roger@atchoo.org> + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#include <errno.h> +#ifndef WIN32 +#include <sys/select.h> +#include <time.h> +#endif + +#include "mosquitto.h" +#include "mosquitto_internal.h" +#include "net_mosq.h" +#include "packet_mosq.h" +#include "socks_mosq.h" +#include "tls_mosq.h" +#include "util_mosq.h" + +#if !defined(WIN32) && !defined(__SYMBIAN32__) +#define HAVE_PSELECT +#endif + +int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets) +{ +#ifdef HAVE_PSELECT + struct timespec local_timeout; +#else + struct timeval local_timeout; +#endif + fd_set readfds, writefds; + int fdcount; + int rc; + char pairbuf; + int maxfd = 0; + time_t now; + + if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL; +#ifndef WIN32 + if(mosq->sock >= FD_SETSIZE || mosq->sockpairR >= FD_SETSIZE){ + return MOSQ_ERR_INVAL; + } +#endif + + FD_ZERO(&readfds); + FD_ZERO(&writefds); + if(mosq->sock != INVALID_SOCKET){ + maxfd = mosq->sock; + FD_SET(mosq->sock, &readfds); + pthread_mutex_lock(&mosq->current_out_packet_mutex); + pthread_mutex_lock(&mosq->out_packet_mutex); + if(mosq->out_packet || mosq->current_out_packet){ + FD_SET(mosq->sock, &writefds); + } +#ifdef WITH_TLS + if(mosq->ssl){ + if(mosq->want_write){ + FD_SET(mosq->sock, &writefds); + }else if(mosq->want_connect){ + /* Remove possible FD_SET from above, we don't want to check + * for writing if we are still connecting, unless want_write is + * definitely set. The presence of outgoing packets does not + * matter yet. */ + FD_CLR(mosq->sock, &writefds); + } + } +#endif + pthread_mutex_unlock(&mosq->out_packet_mutex); + pthread_mutex_unlock(&mosq->current_out_packet_mutex); + }else{ +#ifdef WITH_SRV + if(mosq->achan){ + pthread_mutex_lock(&mosq->state_mutex); + if(mosq->state == mosq_cs_connect_srv){ + rc = ares_fds(mosq->achan, &readfds, &writefds); + if(rc > maxfd){ + maxfd = rc; + } + }else{ + pthread_mutex_unlock(&mosq->state_mutex); + return MOSQ_ERR_NO_CONN; + } + pthread_mutex_unlock(&mosq->state_mutex); + } +#else + return MOSQ_ERR_NO_CONN; +#endif + } + if(mosq->sockpairR != INVALID_SOCKET){ + /* sockpairR is used to break out of select() before the timeout, on a + * call to publish() etc. */ + FD_SET(mosq->sockpairR, &readfds); + if(mosq->sockpairR > maxfd){ + maxfd = mosq->sockpairR; + } + } + + if(timeout < 0){ + timeout = 1000; + } + + now = mosquitto_time(); + if(mosq->next_msg_out && now + timeout/1000 > mosq->next_msg_out){ + timeout = (mosq->next_msg_out - now)*1000; + } + + if(timeout < 0){ + /* There has been a delay somewhere which means we should have already + * sent a message. */ + timeout = 0; + } + + local_timeout.tv_sec = timeout/1000; +#ifdef HAVE_PSELECT + local_timeout.tv_nsec = (timeout-local_timeout.tv_sec*1000)*1e6; +#else + local_timeout.tv_usec = (timeout-local_timeout.tv_sec*1000)*1000; +#endif + +#ifdef HAVE_PSELECT + fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL); +#else + fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout); +#endif + if(fdcount == -1){ +#ifdef WIN32 + errno = WSAGetLastError(); +#endif + if(errno == EINTR){ + return MOSQ_ERR_SUCCESS; + }else{ + return MOSQ_ERR_ERRNO; + } + }else{ + if(mosq->sock != INVALID_SOCKET){ + if(FD_ISSET(mosq->sock, &readfds)){ + rc = mosquitto_loop_read(mosq, max_packets); + if(rc || mosq->sock == INVALID_SOCKET){ + return rc; + } + } + if(mosq->sockpairR != INVALID_SOCKET && FD_ISSET(mosq->sockpairR, &readfds)){ +#ifndef WIN32 + if(read(mosq->sockpairR, &pairbuf, 1) == 0){ + } +#else + recv(mosq->sockpairR, &pairbuf, 1, 0); +#endif + /* Fake write possible, to stimulate output write even though + * we didn't ask for it, because at that point the publish or + * other command wasn't present. */ + if(mosq->sock != INVALID_SOCKET) + FD_SET(mosq->sock, &writefds); + } + if(mosq->sock != INVALID_SOCKET && FD_ISSET(mosq->sock, &writefds)){ +#ifdef WITH_TLS + if(mosq->want_connect){ + rc = net__socket_connect_tls(mosq); + if(rc) return rc; + }else +#endif + { + rc = mosquitto_loop_write(mosq, max_packets); + if(rc || mosq->sock == INVALID_SOCKET){ + return rc; + } + } + } + } +#ifdef WITH_SRV + if(mosq->achan){ + ares_process(mosq->achan, &readfds, &writefds); + } +#endif + } + return mosquitto_loop_misc(mosq); +} + + +int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets) +{ + int run = 1; + int rc; + unsigned int reconnects = 0; + unsigned long reconnect_delay; +#ifndef WIN32 + struct timespec req, rem; +#endif + + if(!mosq) return MOSQ_ERR_INVAL; + + if(mosq->state == mosq_cs_connect_async){ + mosquitto_reconnect(mosq); + } + + while(run){ + do{ + rc = mosquitto_loop(mosq, timeout, max_packets); + if (reconnects !=0 && rc == MOSQ_ERR_SUCCESS){ + reconnects = 0; + } + }while(run && rc == MOSQ_ERR_SUCCESS); + /* Quit after fatal errors. */ + switch(rc){ + case MOSQ_ERR_NOMEM: + case MOSQ_ERR_PROTOCOL: + case MOSQ_ERR_INVAL: + case MOSQ_ERR_NOT_FOUND: + case MOSQ_ERR_TLS: + case MOSQ_ERR_PAYLOAD_SIZE: + case MOSQ_ERR_NOT_SUPPORTED: + case MOSQ_ERR_AUTH: + case MOSQ_ERR_ACL_DENIED: + case MOSQ_ERR_UNKNOWN: + case MOSQ_ERR_EAI: + case MOSQ_ERR_PROXY: + return rc; + case MOSQ_ERR_ERRNO: + break; + } + if(errno == EPROTO){ + return rc; + } + do{ + rc = MOSQ_ERR_SUCCESS; + pthread_mutex_lock(&mosq->state_mutex); + if(mosq->state == mosq_cs_disconnecting){ + run = 0; + pthread_mutex_unlock(&mosq->state_mutex); + }else{ + pthread_mutex_unlock(&mosq->state_mutex); + + if(mosq->reconnect_delay_max > mosq->reconnect_delay){ + if(mosq->reconnect_exponential_backoff){ + reconnect_delay = mosq->reconnect_delay*(reconnects+1)*(reconnects+1); + }else{ + reconnect_delay = mosq->reconnect_delay*(reconnects+1); + } + }else{ + reconnect_delay = mosq->reconnect_delay; + } + + if(reconnect_delay > mosq->reconnect_delay_max){ + reconnect_delay = mosq->reconnect_delay_max; + }else{ + reconnects++; + } + +#ifdef WIN32 + Sleep(reconnect_delay*1000); +#else + req.tv_sec = reconnect_delay; + req.tv_nsec = 0; + while(nanosleep(&req, &rem) == -1 && errno == EINTR){ + req = rem; + } +#endif + + pthread_mutex_lock(&mosq->state_mutex); + if(mosq->state == mosq_cs_disconnecting){ + run = 0; + pthread_mutex_unlock(&mosq->state_mutex); + }else{ + pthread_mutex_unlock(&mosq->state_mutex); + rc = mosquitto_reconnect(mosq); + } + } + }while(run && rc != MOSQ_ERR_SUCCESS); + } + return rc; +} + + +int mosquitto_loop_misc(struct mosquitto *mosq) +{ + if(!mosq) return MOSQ_ERR_INVAL; + if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN; + + return mosquitto__check_keepalive(mosq); +} + + +static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc) +{ + if(rc){ + net__socket_close(mosq); + pthread_mutex_lock(&mosq->state_mutex); + if(mosq->state == mosq_cs_disconnecting){ + rc = MOSQ_ERR_SUCCESS; + } + pthread_mutex_unlock(&mosq->state_mutex); + pthread_mutex_lock(&mosq->callback_mutex); + if(mosq->on_disconnect){ + mosq->in_callback = true; + mosq->on_disconnect(mosq, mosq->userdata, rc); + mosq->in_callback = false; + } + if(mosq->on_disconnect_v5){ + mosq->in_callback = true; + mosq->on_disconnect_v5(mosq, mosq->userdata, rc, NULL); + mosq->in_callback = false; + } + pthread_mutex_unlock(&mosq->callback_mutex); + return rc; + } + return rc; +} + + +int mosquitto_loop_read(struct mosquitto *mosq, int max_packets) +{ + int rc; + int i; + if(max_packets < 1) return MOSQ_ERR_INVAL; + +#ifdef WITH_TLS + if(mosq->want_connect){ + return net__socket_connect_tls(mosq); + } +#endif + + pthread_mutex_lock(&mosq->msgs_out.mutex); + max_packets = mosq->msgs_out.queue_len; + pthread_mutex_unlock(&mosq->msgs_out.mutex); + + pthread_mutex_lock(&mosq->msgs_in.mutex); + max_packets += mosq->msgs_in.queue_len; + pthread_mutex_unlock(&mosq->msgs_in.mutex); + + if(max_packets < 1) max_packets = 1; + /* Queue len here tells us how many messages are awaiting processing and + * have QoS > 0. We should try to deal with that many in this loop in order + * to keep up. */ + for(i=0; i<max_packets || SSL_DATA_PENDING(mosq); i++){ +#ifdef WITH_SOCKS + if(mosq->socks5_host){ + rc = socks5__read(mosq); + }else +#endif + { + rc = packet__read(mosq); + } + if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ + return mosquitto__loop_rc_handle(mosq, rc); + } + } + return rc; +} + + +int mosquitto_loop_write(struct mosquitto *mosq, int max_packets) +{ + int rc; + int i; + if(max_packets < 1) return MOSQ_ERR_INVAL; + + pthread_mutex_lock(&mosq->msgs_out.mutex); + max_packets = mosq->msgs_out.queue_len; + pthread_mutex_unlock(&mosq->msgs_out.mutex); + + pthread_mutex_lock(&mosq->msgs_in.mutex); + max_packets += mosq->msgs_in.queue_len; + pthread_mutex_unlock(&mosq->msgs_in.mutex); + + if(max_packets < 1) max_packets = 1; + /* Queue len here tells us how many messages are awaiting processing and + * have QoS > 0. We should try to deal with that many in this loop in order + * to keep up. */ + for(i=0; i<max_packets; i++){ + rc = packet__write(mosq); + if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){ + return mosquitto__loop_rc_handle(mosq, rc); + } + } + return rc; +} + |