summaryrefslogtreecommitdiff
path: root/libs/libmosquitto/src/loop.c
diff options
context:
space:
mode:
authorGeorge Hazan <ghazan@miranda.im>2019-12-23 13:53:27 +0300
committerGeorge Hazan <ghazan@miranda.im>2019-12-23 13:53:27 +0300
commita29c9194cf2d9f11839ea833872beab754e19527 (patch)
tree2a1ff6475dd77d41a1e6bf6acfb5be479c5543eb /libs/libmosquitto/src/loop.c
parentc1358991022919836c77d630ee9a3719fff86ed0 (diff)
libmosquitto - a helper for MQTT clients
Diffstat (limited to 'libs/libmosquitto/src/loop.c')
-rw-r--r--libs/libmosquitto/src/loop.c390
1 files changed, 390 insertions, 0 deletions
diff --git a/libs/libmosquitto/src/loop.c b/libs/libmosquitto/src/loop.c
new file mode 100644
index 0000000000..2342c945aa
--- /dev/null
+++ b/libs/libmosquitto/src/loop.c
@@ -0,0 +1,390 @@
+/*
+Copyright (c) 2010-2019 Roger Light <roger@atchoo.org>
+
+All rights reserved. This program and the accompanying materials
+are made available under the terms of the Eclipse Public License v1.0
+and Eclipse Distribution License v1.0 which accompany this distribution.
+
+The Eclipse Public License is available at
+ http://www.eclipse.org/legal/epl-v10.html
+and the Eclipse Distribution License is available at
+ http://www.eclipse.org/org/documents/edl-v10.php.
+
+Contributors:
+ Roger Light - initial implementation and documentation.
+*/
+
+#include "config.h"
+
+#include <errno.h>
+#ifndef WIN32
+#include <sys/select.h>
+#include <time.h>
+#endif
+
+#include "mosquitto.h"
+#include "mosquitto_internal.h"
+#include "net_mosq.h"
+#include "packet_mosq.h"
+#include "socks_mosq.h"
+#include "tls_mosq.h"
+#include "util_mosq.h"
+
+#if !defined(WIN32) && !defined(__SYMBIAN32__)
+#define HAVE_PSELECT
+#endif
+
+int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
+{
+#ifdef HAVE_PSELECT
+ struct timespec local_timeout;
+#else
+ struct timeval local_timeout;
+#endif
+ fd_set readfds, writefds;
+ int fdcount;
+ int rc;
+ char pairbuf;
+ int maxfd = 0;
+ time_t now;
+
+ if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
+#ifndef WIN32
+ if(mosq->sock >= FD_SETSIZE || mosq->sockpairR >= FD_SETSIZE){
+ return MOSQ_ERR_INVAL;
+ }
+#endif
+
+ FD_ZERO(&readfds);
+ FD_ZERO(&writefds);
+ if(mosq->sock != INVALID_SOCKET){
+ maxfd = mosq->sock;
+ FD_SET(mosq->sock, &readfds);
+ pthread_mutex_lock(&mosq->current_out_packet_mutex);
+ pthread_mutex_lock(&mosq->out_packet_mutex);
+ if(mosq->out_packet || mosq->current_out_packet){
+ FD_SET(mosq->sock, &writefds);
+ }
+#ifdef WITH_TLS
+ if(mosq->ssl){
+ if(mosq->want_write){
+ FD_SET(mosq->sock, &writefds);
+ }else if(mosq->want_connect){
+ /* Remove possible FD_SET from above, we don't want to check
+ * for writing if we are still connecting, unless want_write is
+ * definitely set. The presence of outgoing packets does not
+ * matter yet. */
+ FD_CLR(mosq->sock, &writefds);
+ }
+ }
+#endif
+ pthread_mutex_unlock(&mosq->out_packet_mutex);
+ pthread_mutex_unlock(&mosq->current_out_packet_mutex);
+ }else{
+#ifdef WITH_SRV
+ if(mosq->achan){
+ pthread_mutex_lock(&mosq->state_mutex);
+ if(mosq->state == mosq_cs_connect_srv){
+ rc = ares_fds(mosq->achan, &readfds, &writefds);
+ if(rc > maxfd){
+ maxfd = rc;
+ }
+ }else{
+ pthread_mutex_unlock(&mosq->state_mutex);
+ return MOSQ_ERR_NO_CONN;
+ }
+ pthread_mutex_unlock(&mosq->state_mutex);
+ }
+#else
+ return MOSQ_ERR_NO_CONN;
+#endif
+ }
+ if(mosq->sockpairR != INVALID_SOCKET){
+ /* sockpairR is used to break out of select() before the timeout, on a
+ * call to publish() etc. */
+ FD_SET(mosq->sockpairR, &readfds);
+ if(mosq->sockpairR > maxfd){
+ maxfd = mosq->sockpairR;
+ }
+ }
+
+ if(timeout < 0){
+ timeout = 1000;
+ }
+
+ now = mosquitto_time();
+ if(mosq->next_msg_out && now + timeout/1000 > mosq->next_msg_out){
+ timeout = (mosq->next_msg_out - now)*1000;
+ }
+
+ if(timeout < 0){
+ /* There has been a delay somewhere which means we should have already
+ * sent a message. */
+ timeout = 0;
+ }
+
+ local_timeout.tv_sec = timeout/1000;
+#ifdef HAVE_PSELECT
+ local_timeout.tv_nsec = (timeout-local_timeout.tv_sec*1000)*1e6;
+#else
+ local_timeout.tv_usec = (timeout-local_timeout.tv_sec*1000)*1000;
+#endif
+
+#ifdef HAVE_PSELECT
+ fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
+#else
+ fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
+#endif
+ if(fdcount == -1){
+#ifdef WIN32
+ errno = WSAGetLastError();
+#endif
+ if(errno == EINTR){
+ return MOSQ_ERR_SUCCESS;
+ }else{
+ return MOSQ_ERR_ERRNO;
+ }
+ }else{
+ if(mosq->sock != INVALID_SOCKET){
+ if(FD_ISSET(mosq->sock, &readfds)){
+ rc = mosquitto_loop_read(mosq, max_packets);
+ if(rc || mosq->sock == INVALID_SOCKET){
+ return rc;
+ }
+ }
+ if(mosq->sockpairR != INVALID_SOCKET && FD_ISSET(mosq->sockpairR, &readfds)){
+#ifndef WIN32
+ if(read(mosq->sockpairR, &pairbuf, 1) == 0){
+ }
+#else
+ recv(mosq->sockpairR, &pairbuf, 1, 0);
+#endif
+ /* Fake write possible, to stimulate output write even though
+ * we didn't ask for it, because at that point the publish or
+ * other command wasn't present. */
+ if(mosq->sock != INVALID_SOCKET)
+ FD_SET(mosq->sock, &writefds);
+ }
+ if(mosq->sock != INVALID_SOCKET && FD_ISSET(mosq->sock, &writefds)){
+#ifdef WITH_TLS
+ if(mosq->want_connect){
+ rc = net__socket_connect_tls(mosq);
+ if(rc) return rc;
+ }else
+#endif
+ {
+ rc = mosquitto_loop_write(mosq, max_packets);
+ if(rc || mosq->sock == INVALID_SOCKET){
+ return rc;
+ }
+ }
+ }
+ }
+#ifdef WITH_SRV
+ if(mosq->achan){
+ ares_process(mosq->achan, &readfds, &writefds);
+ }
+#endif
+ }
+ return mosquitto_loop_misc(mosq);
+}
+
+
+int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
+{
+ int run = 1;
+ int rc;
+ unsigned int reconnects = 0;
+ unsigned long reconnect_delay;
+#ifndef WIN32
+ struct timespec req, rem;
+#endif
+
+ if(!mosq) return MOSQ_ERR_INVAL;
+
+ if(mosq->state == mosq_cs_connect_async){
+ mosquitto_reconnect(mosq);
+ }
+
+ while(run){
+ do{
+ rc = mosquitto_loop(mosq, timeout, max_packets);
+ if (reconnects !=0 && rc == MOSQ_ERR_SUCCESS){
+ reconnects = 0;
+ }
+ }while(run && rc == MOSQ_ERR_SUCCESS);
+ /* Quit after fatal errors. */
+ switch(rc){
+ case MOSQ_ERR_NOMEM:
+ case MOSQ_ERR_PROTOCOL:
+ case MOSQ_ERR_INVAL:
+ case MOSQ_ERR_NOT_FOUND:
+ case MOSQ_ERR_TLS:
+ case MOSQ_ERR_PAYLOAD_SIZE:
+ case MOSQ_ERR_NOT_SUPPORTED:
+ case MOSQ_ERR_AUTH:
+ case MOSQ_ERR_ACL_DENIED:
+ case MOSQ_ERR_UNKNOWN:
+ case MOSQ_ERR_EAI:
+ case MOSQ_ERR_PROXY:
+ return rc;
+ case MOSQ_ERR_ERRNO:
+ break;
+ }
+ if(errno == EPROTO){
+ return rc;
+ }
+ do{
+ rc = MOSQ_ERR_SUCCESS;
+ pthread_mutex_lock(&mosq->state_mutex);
+ if(mosq->state == mosq_cs_disconnecting){
+ run = 0;
+ pthread_mutex_unlock(&mosq->state_mutex);
+ }else{
+ pthread_mutex_unlock(&mosq->state_mutex);
+
+ if(mosq->reconnect_delay_max > mosq->reconnect_delay){
+ if(mosq->reconnect_exponential_backoff){
+ reconnect_delay = mosq->reconnect_delay*(reconnects+1)*(reconnects+1);
+ }else{
+ reconnect_delay = mosq->reconnect_delay*(reconnects+1);
+ }
+ }else{
+ reconnect_delay = mosq->reconnect_delay;
+ }
+
+ if(reconnect_delay > mosq->reconnect_delay_max){
+ reconnect_delay = mosq->reconnect_delay_max;
+ }else{
+ reconnects++;
+ }
+
+#ifdef WIN32
+ Sleep(reconnect_delay*1000);
+#else
+ req.tv_sec = reconnect_delay;
+ req.tv_nsec = 0;
+ while(nanosleep(&req, &rem) == -1 && errno == EINTR){
+ req = rem;
+ }
+#endif
+
+ pthread_mutex_lock(&mosq->state_mutex);
+ if(mosq->state == mosq_cs_disconnecting){
+ run = 0;
+ pthread_mutex_unlock(&mosq->state_mutex);
+ }else{
+ pthread_mutex_unlock(&mosq->state_mutex);
+ rc = mosquitto_reconnect(mosq);
+ }
+ }
+ }while(run && rc != MOSQ_ERR_SUCCESS);
+ }
+ return rc;
+}
+
+
+int mosquitto_loop_misc(struct mosquitto *mosq)
+{
+ if(!mosq) return MOSQ_ERR_INVAL;
+ if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
+
+ return mosquitto__check_keepalive(mosq);
+}
+
+
+static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc)
+{
+ if(rc){
+ net__socket_close(mosq);
+ pthread_mutex_lock(&mosq->state_mutex);
+ if(mosq->state == mosq_cs_disconnecting){
+ rc = MOSQ_ERR_SUCCESS;
+ }
+ pthread_mutex_unlock(&mosq->state_mutex);
+ pthread_mutex_lock(&mosq->callback_mutex);
+ if(mosq->on_disconnect){
+ mosq->in_callback = true;
+ mosq->on_disconnect(mosq, mosq->userdata, rc);
+ mosq->in_callback = false;
+ }
+ if(mosq->on_disconnect_v5){
+ mosq->in_callback = true;
+ mosq->on_disconnect_v5(mosq, mosq->userdata, rc, NULL);
+ mosq->in_callback = false;
+ }
+ pthread_mutex_unlock(&mosq->callback_mutex);
+ return rc;
+ }
+ return rc;
+}
+
+
+int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
+{
+ int rc;
+ int i;
+ if(max_packets < 1) return MOSQ_ERR_INVAL;
+
+#ifdef WITH_TLS
+ if(mosq->want_connect){
+ return net__socket_connect_tls(mosq);
+ }
+#endif
+
+ pthread_mutex_lock(&mosq->msgs_out.mutex);
+ max_packets = mosq->msgs_out.queue_len;
+ pthread_mutex_unlock(&mosq->msgs_out.mutex);
+
+ pthread_mutex_lock(&mosq->msgs_in.mutex);
+ max_packets += mosq->msgs_in.queue_len;
+ pthread_mutex_unlock(&mosq->msgs_in.mutex);
+
+ if(max_packets < 1) max_packets = 1;
+ /* Queue len here tells us how many messages are awaiting processing and
+ * have QoS > 0. We should try to deal with that many in this loop in order
+ * to keep up. */
+ for(i=0; i<max_packets || SSL_DATA_PENDING(mosq); i++){
+#ifdef WITH_SOCKS
+ if(mosq->socks5_host){
+ rc = socks5__read(mosq);
+ }else
+#endif
+ {
+ rc = packet__read(mosq);
+ }
+ if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
+ return mosquitto__loop_rc_handle(mosq, rc);
+ }
+ }
+ return rc;
+}
+
+
+int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
+{
+ int rc;
+ int i;
+ if(max_packets < 1) return MOSQ_ERR_INVAL;
+
+ pthread_mutex_lock(&mosq->msgs_out.mutex);
+ max_packets = mosq->msgs_out.queue_len;
+ pthread_mutex_unlock(&mosq->msgs_out.mutex);
+
+ pthread_mutex_lock(&mosq->msgs_in.mutex);
+ max_packets += mosq->msgs_in.queue_len;
+ pthread_mutex_unlock(&mosq->msgs_in.mutex);
+
+ if(max_packets < 1) max_packets = 1;
+ /* Queue len here tells us how many messages are awaiting processing and
+ * have QoS > 0. We should try to deal with that many in this loop in order
+ * to keep up. */
+ for(i=0; i<max_packets; i++){
+ rc = packet__write(mosq);
+ if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
+ return mosquitto__loop_rc_handle(mosq, rc);
+ }
+ }
+ return rc;
+}
+