summaryrefslogtreecommitdiff
path: root/libs/libcurl/src/mqtt.c
diff options
context:
space:
mode:
authordartraiden <wowemuh@gmail.com>2025-06-04 09:49:23 +0300
committerdartraiden <wowemuh@gmail.com>2025-06-04 10:27:11 +0300
commit86d1a677fd310d7d90d6f7545c02a4bd68e1d955 (patch)
tree7fd5f46ef18038d10dcdf9fa19ffee547d51d6ad /libs/libcurl/src/mqtt.c
parente8e2a816fbbcec0d6a64496928fecff19c281d82 (diff)
libcurl: update to 8.14.0
Diffstat (limited to 'libs/libcurl/src/mqtt.c')
-rw-r--r--libs/libcurl/src/mqtt.c258
1 files changed, 204 insertions, 54 deletions
diff --git a/libs/libcurl/src/mqtt.c b/libs/libcurl/src/mqtt.c
index eae4b4ceb7..568ca4ceab 100644
--- a/libs/libcurl/src/mqtt.c
+++ b/libs/libcurl/src/mqtt.c
@@ -37,7 +37,7 @@
#include "strdup.h"
#include "url.h"
#include "escape.h"
-#include "warnless.h"
+#include "curlx/warnless.h"
#include "curl_printf.h"
#include "curl_memory.h"
#include "multiif.h"
@@ -46,17 +46,59 @@
/* The last #include file should be: */
#include "memdebug.h"
+/* first byte is command.
+ second byte is for flags. */
#define MQTT_MSG_CONNECT 0x10
/* #define MQTT_MSG_CONNACK 0x20 */
#define MQTT_MSG_PUBLISH 0x30
#define MQTT_MSG_SUBSCRIBE 0x82
#define MQTT_MSG_SUBACK 0x90
#define MQTT_MSG_DISCONNECT 0xe0
+#define MQTT_MSG_PINGREQ 0xC0
+#define MQTT_MSG_PINGRESP 0xD0
#define MQTT_CONNACK_LEN 2
#define MQTT_SUBACK_LEN 3
#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
+/* meta key for storing protocol meta at easy handle */
+#define CURL_META_MQTT_EASY "meta:proto:mqtt:easy"
+/* meta key for storing protocol meta at connection */
+#define CURL_META_MQTT_CONN "meta:proto:mqtt:conn"
+
+enum mqttstate {
+ MQTT_FIRST, /* 0 */
+ MQTT_REMAINING_LENGTH, /* 1 */
+ MQTT_CONNACK, /* 2 */
+ MQTT_SUBACK, /* 3 */
+ MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */
+ MQTT_PUBWAIT, /* 5 - wait for publish */
+ MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */
+
+ MQTT_NOSTATE /* 7 - never used an actual state */
+};
+
+struct mqtt_conn {
+ enum mqttstate state;
+ enum mqttstate nextstate; /* switch to this after remaining length is
+ done */
+ unsigned int packetid;
+};
+
+/* protocol-specific transfer-related data */
+struct MQTT {
+ struct dynbuf sendbuf;
+ /* when receiving */
+ struct dynbuf recvbuf;
+ size_t npacket; /* byte counter */
+ size_t remaining_length;
+ unsigned char pkt_hd[4]; /* for decoding the arriving packet length */
+ struct curltime lastTime; /* last time we sent or received data */
+ unsigned char firstbyte;
+ BIT(pingsent); /* 1 while we wait for ping response */
+};
+
+
/*
* Forward declarations.
*/
@@ -99,45 +141,72 @@ const struct Curl_handler Curl_handler_mqtt = {
PROTOPT_NONE /* flags */
};
+static void mqtt_easy_dtor(void *key, size_t klen, void *entry)
+{
+ struct MQTT *mq = entry;
+ (void)key;
+ (void)klen;
+ curlx_dyn_free(&mq->sendbuf);
+ curlx_dyn_free(&mq->recvbuf);
+ free(mq);
+}
+
+static void mqtt_conn_dtor(void *key, size_t klen, void *entry)
+{
+ (void)key;
+ (void)klen;
+ free(entry);
+}
+
static CURLcode mqtt_setup_conn(struct Curl_easy *data,
struct connectdata *conn)
{
- /* allocate the HTTP-specific struct for the Curl_easy, only to survive
- during this request */
+ /* setup MQTT specific meta data at easy handle and connection */
+ struct mqtt_conn *mqtt;
struct MQTT *mq;
- (void)conn;
- DEBUGASSERT(data->req.p.mqtt == NULL);
+
+ mqtt = calloc(1, sizeof(*mqtt));
+ if(!mqtt ||
+ Curl_conn_meta_set(conn, CURL_META_MQTT_CONN, mqtt, mqtt_conn_dtor))
+ return CURLE_OUT_OF_MEMORY;
mq = calloc(1, sizeof(struct MQTT));
if(!mq)
return CURLE_OUT_OF_MEMORY;
- Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV);
- Curl_dyn_init(&mq->sendbuf, DYN_MQTT_SEND);
- data->req.p.mqtt = mq;
+ curlx_dyn_init(&mq->recvbuf, DYN_MQTT_RECV);
+ curlx_dyn_init(&mq->sendbuf, DYN_MQTT_SEND);
+ if(Curl_meta_set(data, CURL_META_MQTT_EASY, mq, mqtt_easy_dtor))
+ return CURLE_OUT_OF_MEMORY;
return CURLE_OK;
}
static CURLcode mqtt_send(struct Curl_easy *data,
const char *buf, size_t len)
{
- struct MQTT *mq = data->req.p.mqtt;
size_t n;
- CURLcode result = Curl_xfer_send(data, buf, len, FALSE, &n);
+ CURLcode result;
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
+
+ if(!mq)
+ return CURLE_FAILED_INIT;
+
+ result = Curl_xfer_send(data, buf, len, FALSE, &n);
if(result)
return result;
+ mq->lastTime = curlx_now();
Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
if(len != n) {
size_t nsend = len - n;
- if(Curl_dyn_len(&mq->sendbuf)) {
- DEBUGASSERT(Curl_dyn_len(&mq->sendbuf) >= nsend);
- result = Curl_dyn_tail(&mq->sendbuf, nsend); /* keep this much */
+ if(curlx_dyn_len(&mq->sendbuf)) {
+ DEBUGASSERT(curlx_dyn_len(&mq->sendbuf) >= nsend);
+ result = curlx_dyn_tail(&mq->sendbuf, nsend); /* keep this much */
}
else {
- result = Curl_dyn_addn(&mq->sendbuf, &buf[n], nsend);
+ result = curlx_dyn_addn(&mq->sendbuf, &buf[n], nsend);
}
}
else
- Curl_dyn_reset(&mq->sendbuf);
+ curlx_dyn_reset(&mq->sendbuf);
return result;
}
@@ -344,20 +413,19 @@ end:
static CURLcode mqtt_disconnect(struct Curl_easy *data)
{
- CURLcode result = CURLE_OK;
- struct MQTT *mq = data->req.p.mqtt;
- result = mqtt_send(data, "\xe0\x00", 2);
- Curl_dyn_free(&mq->sendbuf);
- Curl_dyn_free(&mq->recvbuf);
- return result;
+ return mqtt_send(data, "\xe0\x00", 2);
}
static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
{
- struct MQTT *mq = data->req.p.mqtt;
- size_t rlen = Curl_dyn_len(&mq->recvbuf);
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
+ size_t rlen;
CURLcode result;
+ if(!mq)
+ return CURLE_FAILED_INIT;
+ rlen = curlx_dyn_len(&mq->recvbuf);
+
if(rlen < nbytes) {
unsigned char readbuf[1024];
ssize_t nread;
@@ -367,42 +435,49 @@ static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
if(result)
return result;
DEBUGASSERT(nread >= 0);
- if(Curl_dyn_addn(&mq->recvbuf, readbuf, (size_t)nread))
+ if(curlx_dyn_addn(&mq->recvbuf, readbuf, (size_t)nread))
return CURLE_OUT_OF_MEMORY;
- rlen = Curl_dyn_len(&mq->recvbuf);
+ rlen = curlx_dyn_len(&mq->recvbuf);
}
return (rlen >= nbytes) ? CURLE_OK : CURLE_AGAIN;
}
static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes)
{
- struct MQTT *mq = data->req.p.mqtt;
- size_t rlen = Curl_dyn_len(&mq->recvbuf);
- if(rlen <= nbytes)
- Curl_dyn_reset(&mq->recvbuf);
- else
- Curl_dyn_tail(&mq->recvbuf, rlen - nbytes);
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
+ DEBUGASSERT(mq);
+ if(mq) {
+ size_t rlen = curlx_dyn_len(&mq->recvbuf);
+ if(rlen <= nbytes)
+ curlx_dyn_reset(&mq->recvbuf);
+ else
+ curlx_dyn_tail(&mq->recvbuf, rlen - nbytes);
+ }
}
static CURLcode mqtt_verify_connack(struct Curl_easy *data)
{
- struct MQTT *mq = data->req.p.mqtt;
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
CURLcode result;
char *ptr;
+ DEBUGASSERT(mq);
+ if(!mq)
+ return CURLE_FAILED_INIT;
+
result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN);
if(result)
goto fail;
/* verify CONNACK */
- DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN);
- ptr = Curl_dyn_ptr(&mq->recvbuf);
+ DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN);
+ ptr = curlx_dyn_ptr(&mq->recvbuf);
Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN);
if(ptr[0] != 0x00 || ptr[1] != 0x00) {
failf(data, "Expected %02x%02x but got %02x%02x",
0x00, 0x00, ptr[0], ptr[1]);
- Curl_dyn_reset(&mq->recvbuf);
+ curlx_dyn_reset(&mq->recvbuf);
result = CURLE_WEIRD_SERVER_REPLY;
goto fail;
}
@@ -439,12 +514,16 @@ static CURLcode mqtt_subscribe(struct Curl_easy *data)
char encodedsize[4];
size_t n;
struct connectdata *conn = data->conn;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+
+ if(!mqtt)
+ return CURLE_FAILED_INIT;
result = mqtt_get_topic(data, &topic, &topiclen);
if(result)
goto fail;
- conn->proto.mqtt.packetid++;
+ mqtt->packetid++;
packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
+ 2 bytes topic length + QoS byte */
@@ -459,8 +538,8 @@ static CURLcode mqtt_subscribe(struct Curl_easy *data)
packet[0] = MQTT_MSG_SUBSCRIBE;
memcpy(&packet[1], encodedsize, n);
- packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
- packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
+ packet[1 + n] = (mqtt->packetid >> 8) & 0xff;
+ packet[2 + n] = mqtt->packetid & 0xff;
packet[3 + n] = (topiclen >> 8) & 0xff;
packet[4 + n ] = topiclen & 0xff;
memcpy(&packet[5 + n], topic, topiclen);
@@ -479,25 +558,28 @@ fail:
*/
static CURLcode mqtt_verify_suback(struct Curl_easy *data)
{
- struct MQTT *mq = data->req.p.mqtt;
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
struct connectdata *conn = data->conn;
- struct mqtt_conn *mqtt = &conn->proto.mqtt;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
CURLcode result;
char *ptr;
+ if(!mqtt || !mq)
+ return CURLE_FAILED_INIT;
+
result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN);
if(result)
goto fail;
/* verify SUBACK */
- DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN);
- ptr = Curl_dyn_ptr(&mq->recvbuf);
+ DEBUGASSERT(curlx_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN);
+ ptr = curlx_dyn_ptr(&mq->recvbuf);
Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN);
if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) ||
((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) ||
ptr[2] != 0x00) {
- Curl_dyn_reset(&mq->recvbuf);
+ curlx_dyn_reset(&mq->recvbuf);
result = CURLE_WEIRD_SERVER_REPLY;
goto fail;
}
@@ -601,7 +683,10 @@ static void mqstate(struct Curl_easy *data,
enum mqttstate nextstate) /* used if state == FIRST */
{
struct connectdata *conn = data->conn;
- struct mqtt_conn *mqtt = &conn->proto.mqtt;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+ DEBUGASSERT(mqtt);
+ if(!mqtt)
+ return;
#ifdef DEBUGBUILD
infof(data, "%s (from %s) (next is %s)",
statenames[state],
@@ -620,10 +705,14 @@ static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
struct connectdata *conn = data->conn;
ssize_t nread;
size_t remlen;
- struct mqtt_conn *mqtt = &conn->proto.mqtt;
- struct MQTT *mq = data->req.p.mqtt;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
unsigned char packet;
+ DEBUGASSERT(mqtt);
+ if(!mqtt || !mq)
+ return CURLE_FAILED_INIT;
+
switch(mqtt->state) {
MQTT_SUBACK_COMING:
case MQTT_SUBACK_COMING:
@@ -687,6 +776,9 @@ MQTT_SUBACK_COMING:
goto end;
}
+ /* we received something */
+ mq->lastTime = curlx_now();
+
/* if QoS is set, message contains packet id */
result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread);
if(result)
@@ -709,9 +801,15 @@ end:
static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
{
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
CURLcode result = CURLE_OK;
*done = FALSE; /* unconditionally */
+ if(!mq)
+ return CURLE_FAILED_INIT;
+ mq->lastTime = curlx_now();
+ mq->pingsent = FALSE;
+
result = mqtt_connect(data);
if(result) {
failf(data, "Error %d sending MQTT CONNECT request", result);
@@ -724,32 +822,73 @@ static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
static CURLcode mqtt_done(struct Curl_easy *data,
CURLcode status, bool premature)
{
- struct MQTT *mq = data->req.p.mqtt;
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
(void)status;
(void)premature;
- Curl_dyn_free(&mq->sendbuf);
- Curl_dyn_free(&mq->recvbuf);
+ if(mq) {
+ curlx_dyn_free(&mq->sendbuf);
+ curlx_dyn_free(&mq->recvbuf);
+ }
return CURLE_OK;
}
+/* we ping regularly to avoid being disconnected by the server */
+static CURLcode mqtt_ping(struct Curl_easy *data)
+{
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
+ CURLcode result = CURLE_OK;
+ struct connectdata *conn = data->conn;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+
+ if(!mqtt || !mq)
+ return CURLE_FAILED_INIT;
+
+ if(mqtt->state == MQTT_FIRST &&
+ !mq->pingsent &&
+ data->set.upkeep_interval_ms > 0) {
+ struct curltime t = curlx_now();
+ timediff_t diff = curlx_timediff(t, mq->lastTime);
+
+ if(diff > data->set.upkeep_interval_ms) {
+ /* 0xC0 is PINGREQ, and 0x00 is remaining length */
+ unsigned char packet[2] = { 0xC0, 0x00 };
+ size_t packetlen = sizeof(packet);
+
+ result = mqtt_send(data, (char *)packet, packetlen);
+ if(!result) {
+ mq->pingsent = TRUE;
+ }
+ infof(data, "mqtt_ping: sent ping request.");
+ }
+ }
+ return result;
+}
+
static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
{
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
CURLcode result = CURLE_OK;
- struct mqtt_conn *mqtt = &data->conn->proto.mqtt;
- struct MQTT *mq = data->req.p.mqtt;
ssize_t nread;
unsigned char recvbyte;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(data->conn, CURL_META_MQTT_CONN);
+
+ if(!mqtt || !mq)
+ return CURLE_FAILED_INIT;
*done = FALSE;
- if(Curl_dyn_len(&mq->sendbuf)) {
+ if(curlx_dyn_len(&mq->sendbuf)) {
/* send the remainder of an outgoing packet */
- result = mqtt_send(data, Curl_dyn_ptr(&mq->sendbuf),
- Curl_dyn_len(&mq->sendbuf));
+ result = mqtt_send(data, curlx_dyn_ptr(&mq->sendbuf),
+ curlx_dyn_len(&mq->sendbuf));
if(result)
return result;
}
+ result = mqtt_ping(data);
+ if(result)
+ return result;
+
infof(data, "mqtt_doing: state [%d]", (int) mqtt->state);
switch(mqtt->state) {
case MQTT_FIRST:
@@ -764,6 +903,10 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
break;
}
Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&mq->firstbyte, 1);
+
+ /* we received something */
+ mq->lastTime = curlx_now();
+
/* remember the first byte */
mq->npacket = 0;
mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
@@ -794,6 +937,13 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
infof(data, "Got DISCONNECT");
*done = TRUE;
}
+
+ /* ping response */
+ if(mq->firstbyte == MQTT_MSG_PINGRESP) {
+ infof(data, "Received ping response.");
+ mq->pingsent = FALSE;
+ mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
+ }
break;
case MQTT_CONNACK:
result = mqtt_verify_connack(data);