From 86d1a677fd310d7d90d6f7545c02a4bd68e1d955 Mon Sep 17 00:00:00 2001 From: dartraiden Date: Wed, 4 Jun 2025 09:49:23 +0300 Subject: libcurl: update to 8.14.0 --- libs/libcurl/src/mqtt.c | 258 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 204 insertions(+), 54 deletions(-) (limited to 'libs/libcurl/src/mqtt.c') diff --git a/libs/libcurl/src/mqtt.c b/libs/libcurl/src/mqtt.c index eae4b4ceb7..568ca4ceab 100644 --- a/libs/libcurl/src/mqtt.c +++ b/libs/libcurl/src/mqtt.c @@ -37,7 +37,7 @@ #include "strdup.h" #include "url.h" #include "escape.h" -#include "warnless.h" +#include "curlx/warnless.h" #include "curl_printf.h" #include "curl_memory.h" #include "multiif.h" @@ -46,17 +46,59 @@ /* The last #include file should be: */ #include "memdebug.h" +/* first byte is command. + second byte is for flags. */ #define MQTT_MSG_CONNECT 0x10 /* #define MQTT_MSG_CONNACK 0x20 */ #define MQTT_MSG_PUBLISH 0x30 #define MQTT_MSG_SUBSCRIBE 0x82 #define MQTT_MSG_SUBACK 0x90 #define MQTT_MSG_DISCONNECT 0xe0 +#define MQTT_MSG_PINGREQ 0xC0 +#define MQTT_MSG_PINGRESP 0xD0 #define MQTT_CONNACK_LEN 2 #define MQTT_SUBACK_LEN 3 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ +/* meta key for storing protocol meta at easy handle */ +#define CURL_META_MQTT_EASY "meta:proto:mqtt:easy" +/* meta key for storing protocol meta at connection */ +#define CURL_META_MQTT_CONN "meta:proto:mqtt:conn" + +enum mqttstate { + MQTT_FIRST, /* 0 */ + MQTT_REMAINING_LENGTH, /* 1 */ + MQTT_CONNACK, /* 2 */ + MQTT_SUBACK, /* 3 */ + MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */ + MQTT_PUBWAIT, /* 5 - wait for publish */ + MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */ + + MQTT_NOSTATE /* 7 - never used an actual state */ +}; + +struct mqtt_conn { + enum mqttstate state; + enum mqttstate nextstate; /* switch to this after remaining length is + done */ + unsigned int packetid; +}; + +/* protocol-specific transfer-related data */ +struct MQTT { + struct dynbuf sendbuf; + /* when receiving */ + struct dynbuf recvbuf; + size_t npacket; /* byte counter */ + size_t remaining_length; + unsigned char pkt_hd[4]; /* for decoding the arriving packet length */ + struct curltime lastTime; /* last time we sent or received data */ + unsigned char firstbyte; + BIT(pingsent); /* 1 while we wait for ping response */ +}; + + /* * Forward declarations. */ @@ -99,45 +141,72 @@ const struct Curl_handler Curl_handler_mqtt = { PROTOPT_NONE /* flags */ }; +static void mqtt_easy_dtor(void *key, size_t klen, void *entry) +{ + struct MQTT *mq = entry; + (void)key; + (void)klen; + curlx_dyn_free(&mq->sendbuf); + curlx_dyn_free(&mq->recvbuf); + free(mq); +} + +static void mqtt_conn_dtor(void *key, size_t klen, void *entry) +{ + (void)key; + (void)klen; + free(entry); +} + static CURLcode mqtt_setup_conn(struct Curl_easy *data, struct connectdata *conn) { - /* allocate the HTTP-specific struct for the Curl_easy, only to survive - during this request */ + /* setup MQTT specific meta data at easy handle and connection */ + struct mqtt_conn *mqtt; struct MQTT *mq; - (void)conn; - DEBUGASSERT(data->req.p.mqtt == NULL); + + mqtt = calloc(1, sizeof(*mqtt)); + if(!mqtt || + Curl_conn_meta_set(conn, CURL_META_MQTT_CONN, mqtt, mqtt_conn_dtor)) + return CURLE_OUT_OF_MEMORY; mq = calloc(1, sizeof(struct MQTT)); if(!mq) return CURLE_OUT_OF_MEMORY; - Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV); - Curl_dyn_init(&mq->sendbuf, DYN_MQTT_SEND); - data->req.p.mqtt = mq; + curlx_dyn_init(&mq->recvbuf, DYN_MQTT_RECV); + curlx_dyn_init(&mq->sendbuf, DYN_MQTT_SEND); + if(Curl_meta_set(data, CURL_META_MQTT_EASY, mq, mqtt_easy_dtor)) + return CURLE_OUT_OF_MEMORY; return CURLE_OK; } static CURLcode mqtt_send(struct Curl_easy *data, const char *buf, size_t len) { - struct MQTT *mq = data->req.p.mqtt; size_t n; - CURLcode result = Curl_xfer_send(data, buf, len, FALSE, &n); + CURLcode result; + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); + + if(!mq) + return CURLE_FAILED_INIT; + + result = Curl_xfer_send(data, buf, len, FALSE, &n); if(result) return result; + mq->lastTime = curlx_now(); Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n); if(len != n) { size_t nsend = len - n; - if(Curl_dyn_len(&mq->sendbuf)) { - DEBUGASSERT(Curl_dyn_len(&mq->sendbuf) >= nsend); - result = Curl_dyn_tail(&mq->sendbuf, nsend); /* keep this much */ + if(curlx_dyn_len(&mq->sendbuf)) { + DEBUGASSERT(curlx_dyn_len(&mq->sendbuf) >= nsend); + result = curlx_dyn_tail(&mq->sendbuf, nsend); /* keep this much */ } else { - result = Curl_dyn_addn(&mq->sendbuf, &buf[n], nsend); + result = curlx_dyn_addn(&mq->sendbuf, &buf[n], nsend); } } else - Curl_dyn_reset(&mq->sendbuf); + curlx_dyn_reset(&mq->sendbuf); return result; } @@ -344,20 +413,19 @@ end: static CURLcode mqtt_disconnect(struct Curl_easy *data) { - CURLcode result = CURLE_OK; - struct MQTT *mq = data->req.p.mqtt; - result = mqtt_send(data, "\xe0\x00", 2); - Curl_dyn_free(&mq->sendbuf); - Curl_dyn_free(&mq->recvbuf); - return result; + return mqtt_send(data, "\xe0\x00", 2); } static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes) { - struct MQTT *mq = data->req.p.mqtt; - size_t rlen = Curl_dyn_len(&mq->recvbuf); + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); + size_t rlen; CURLcode result; + if(!mq) + return CURLE_FAILED_INIT; + rlen = curlx_dyn_len(&mq->recvbuf); + if(rlen < nbytes) { unsigned char readbuf[1024]; ssize_t nread; @@ -367,42 +435,49 @@ static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes) if(result) return result; DEBUGASSERT(nread >= 0); - if(Curl_dyn_addn(&mq->recvbuf, readbuf, (size_t)nread)) + if(curlx_dyn_addn(&mq->recvbuf, readbuf, (size_t)nread)) return CURLE_OUT_OF_MEMORY; - rlen = Curl_dyn_len(&mq->recvbuf); + rlen = curlx_dyn_len(&mq->recvbuf); } return (rlen >= nbytes) ? CURLE_OK : CURLE_AGAIN; } static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes) { - struct MQTT *mq = data->req.p.mqtt; - size_t rlen = Curl_dyn_len(&mq->recvbuf); - if(rlen <= nbytes) - Curl_dyn_reset(&mq->recvbuf); - else - Curl_dyn_tail(&mq->recvbuf, rlen - nbytes); + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); + DEBUGASSERT(mq); + if(mq) { + size_t rlen = curlx_dyn_len(&mq->recvbuf); + if(rlen <= nbytes) + curlx_dyn_reset(&mq->recvbuf); + else + curlx_dyn_tail(&mq->recvbuf, rlen - nbytes); + } } static CURLcode mqtt_verify_connack(struct Curl_easy *data) { - struct MQTT *mq = data->req.p.mqtt; + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); CURLcode result; char *ptr; + DEBUGASSERT(mq); + if(!mq) + return CURLE_FAILED_INIT; + result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN); if(result) goto fail; /* verify CONNACK */ - DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN); - ptr = Curl_dyn_ptr(&mq->recvbuf); + DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN); + ptr = curlx_dyn_ptr(&mq->recvbuf); Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN); if(ptr[0] != 0x00 || ptr[1] != 0x00) { failf(data, "Expected %02x%02x but got %02x%02x", 0x00, 0x00, ptr[0], ptr[1]); - Curl_dyn_reset(&mq->recvbuf); + curlx_dyn_reset(&mq->recvbuf); result = CURLE_WEIRD_SERVER_REPLY; goto fail; } @@ -439,12 +514,16 @@ static CURLcode mqtt_subscribe(struct Curl_easy *data) char encodedsize[4]; size_t n; struct connectdata *conn = data->conn; + struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); + + if(!mqtt) + return CURLE_FAILED_INIT; result = mqtt_get_topic(data, &topic, &topiclen); if(result) goto fail; - conn->proto.mqtt.packetid++; + mqtt->packetid++; packetlen = topiclen + 5; /* packetid + topic (has a two byte length field) + 2 bytes topic length + QoS byte */ @@ -459,8 +538,8 @@ static CURLcode mqtt_subscribe(struct Curl_easy *data) packet[0] = MQTT_MSG_SUBSCRIBE; memcpy(&packet[1], encodedsize, n); - packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff; - packet[2 + n] = conn->proto.mqtt.packetid & 0xff; + packet[1 + n] = (mqtt->packetid >> 8) & 0xff; + packet[2 + n] = mqtt->packetid & 0xff; packet[3 + n] = (topiclen >> 8) & 0xff; packet[4 + n ] = topiclen & 0xff; memcpy(&packet[5 + n], topic, topiclen); @@ -479,25 +558,28 @@ fail: */ static CURLcode mqtt_verify_suback(struct Curl_easy *data) { - struct MQTT *mq = data->req.p.mqtt; + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); struct connectdata *conn = data->conn; - struct mqtt_conn *mqtt = &conn->proto.mqtt; + struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); CURLcode result; char *ptr; + if(!mqtt || !mq) + return CURLE_FAILED_INIT; + result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN); if(result) goto fail; /* verify SUBACK */ - DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN); - ptr = Curl_dyn_ptr(&mq->recvbuf); + DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN); + ptr = curlx_dyn_ptr(&mq->recvbuf); Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN); if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) || ((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) || ptr[2] != 0x00) { - Curl_dyn_reset(&mq->recvbuf); + curlx_dyn_reset(&mq->recvbuf); result = CURLE_WEIRD_SERVER_REPLY; goto fail; } @@ -601,7 +683,10 @@ static void mqstate(struct Curl_easy *data, enum mqttstate nextstate) /* used if state == FIRST */ { struct connectdata *conn = data->conn; - struct mqtt_conn *mqtt = &conn->proto.mqtt; + struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); + DEBUGASSERT(mqtt); + if(!mqtt) + return; #ifdef DEBUGBUILD infof(data, "%s (from %s) (next is %s)", statenames[state], @@ -620,10 +705,14 @@ static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done) struct connectdata *conn = data->conn; ssize_t nread; size_t remlen; - struct mqtt_conn *mqtt = &conn->proto.mqtt; - struct MQTT *mq = data->req.p.mqtt; + struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); unsigned char packet; + DEBUGASSERT(mqtt); + if(!mqtt || !mq) + return CURLE_FAILED_INIT; + switch(mqtt->state) { MQTT_SUBACK_COMING: case MQTT_SUBACK_COMING: @@ -687,6 +776,9 @@ MQTT_SUBACK_COMING: goto end; } + /* we received something */ + mq->lastTime = curlx_now(); + /* if QoS is set, message contains packet id */ result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread); if(result) @@ -709,9 +801,15 @@ end: static CURLcode mqtt_do(struct Curl_easy *data, bool *done) { + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); CURLcode result = CURLE_OK; *done = FALSE; /* unconditionally */ + if(!mq) + return CURLE_FAILED_INIT; + mq->lastTime = curlx_now(); + mq->pingsent = FALSE; + result = mqtt_connect(data); if(result) { failf(data, "Error %d sending MQTT CONNECT request", result); @@ -724,32 +822,73 @@ static CURLcode mqtt_do(struct Curl_easy *data, bool *done) static CURLcode mqtt_done(struct Curl_easy *data, CURLcode status, bool premature) { - struct MQTT *mq = data->req.p.mqtt; + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); (void)status; (void)premature; - Curl_dyn_free(&mq->sendbuf); - Curl_dyn_free(&mq->recvbuf); + if(mq) { + curlx_dyn_free(&mq->sendbuf); + curlx_dyn_free(&mq->recvbuf); + } return CURLE_OK; } +/* we ping regularly to avoid being disconnected by the server */ +static CURLcode mqtt_ping(struct Curl_easy *data) +{ + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); + CURLcode result = CURLE_OK; + struct connectdata *conn = data->conn; + struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); + + if(!mqtt || !mq) + return CURLE_FAILED_INIT; + + if(mqtt->state == MQTT_FIRST && + !mq->pingsent && + data->set.upkeep_interval_ms > 0) { + struct curltime t = curlx_now(); + timediff_t diff = curlx_timediff(t, mq->lastTime); + + if(diff > data->set.upkeep_interval_ms) { + /* 0xC0 is PINGREQ, and 0x00 is remaining length */ + unsigned char packet[2] = { 0xC0, 0x00 }; + size_t packetlen = sizeof(packet); + + result = mqtt_send(data, (char *)packet, packetlen); + if(!result) { + mq->pingsent = TRUE; + } + infof(data, "mqtt_ping: sent ping request."); + } + } + return result; +} + static CURLcode mqtt_doing(struct Curl_easy *data, bool *done) { + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); CURLcode result = CURLE_OK; - struct mqtt_conn *mqtt = &data->conn->proto.mqtt; - struct MQTT *mq = data->req.p.mqtt; ssize_t nread; unsigned char recvbyte; + struct mqtt_conn *mqtt = Curl_conn_meta_get(data->conn, CURL_META_MQTT_CONN); + + if(!mqtt || !mq) + return CURLE_FAILED_INIT; *done = FALSE; - if(Curl_dyn_len(&mq->sendbuf)) { + if(curlx_dyn_len(&mq->sendbuf)) { /* send the remainder of an outgoing packet */ - result = mqtt_send(data, Curl_dyn_ptr(&mq->sendbuf), - Curl_dyn_len(&mq->sendbuf)); + result = mqtt_send(data, curlx_dyn_ptr(&mq->sendbuf), + curlx_dyn_len(&mq->sendbuf)); if(result) return result; } + result = mqtt_ping(data); + if(result) + return result; + infof(data, "mqtt_doing: state [%d]", (int) mqtt->state); switch(mqtt->state) { case MQTT_FIRST: @@ -764,6 +903,10 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done) break; } Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&mq->firstbyte, 1); + + /* we received something */ + mq->lastTime = curlx_now(); + /* remember the first byte */ mq->npacket = 0; mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE); @@ -794,6 +937,13 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done) infof(data, "Got DISCONNECT"); *done = TRUE; } + + /* ping response */ + if(mq->firstbyte == MQTT_MSG_PINGRESP) { + infof(data, "Received ping response."); + mq->pingsent = FALSE; + mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); + } break; case MQTT_CONNACK: result = mqtt_verify_connack(data); -- cgit v1.2.3