summaryrefslogtreecommitdiff
path: root/protocols/Facebook/src/mqtt.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'protocols/Facebook/src/mqtt.cpp')
-rw-r--r--protocols/Facebook/src/mqtt.cpp88
1 files changed, 57 insertions, 31 deletions
diff --git a/protocols/Facebook/src/mqtt.cpp b/protocols/Facebook/src/mqtt.cpp
index 2d1e6a02e8..dd92795a4c 100644
--- a/protocols/Facebook/src/mqtt.cpp
+++ b/protocols/Facebook/src/mqtt.cpp
@@ -50,6 +50,33 @@ static uint8_t encodeType(int type)
}
}
+uint8_t* FacebookProto::doZip(size_t cbData, const void *pData, size_t &cbRes)
+{
+ size_t dataSize = cbData + 100;
+ uint8_t *pRes = (uint8_t *)mir_alloc(dataSize);
+
+ z_stream zStreamOut = {};
+ deflateInit(&zStreamOut, Z_BEST_COMPRESSION);
+ zStreamOut.avail_in = (unsigned)cbData;
+ zStreamOut.next_in = (uint8_t *)pData;
+ zStreamOut.avail_out = (unsigned)dataSize;
+ zStreamOut.next_out = (uint8_t *)pRes;
+
+ switch (deflate(&zStreamOut, Z_FINISH)) {
+ case Z_STREAM_END: debugLogA("Deflate: Z_STREAM_END"); break;
+ case Z_OK: debugLogA("Deflate: Z_OK"); break;
+ case Z_BUF_ERROR: debugLogA("Deflate: Z_BUF_ERROR"); break;
+ case Z_DATA_ERROR: debugLogA("Deflate: Z_DATA_ERROR"); break;
+ case Z_MEM_ERROR: debugLogA("Deflate: Z_MEM_ERROR"); break;
+ }
+
+ deflateEnd(&zStreamOut);
+ cbRes = dataSize - zStreamOut.avail_out;
+ return pRes;
+}
+
+/////////////////////////////////////////////////////////////////////////////////////////
+
FbThrift& FbThrift::operator<<(uint8_t value)
{
m_buf.append(&value, 1);
@@ -135,12 +162,9 @@ void FbThrift::writeIntV(uint64_t value)
} while (!bLast);
}
-MqttMessage::MqttMessage(FbMqttMessageType type, uint8_t flags, size_t payloadSize)
+MqttMessage::MqttMessage(FbMqttMessageType type, uint8_t flags)
{
- uint8_t byte = ((type & 0x0F) << 4) | (flags & 0x0F);
- *this << byte;
-
- writeIntV(payloadSize);
+ m_leadingByte = ((type & 0x0F) << 4) | (flags & 0x0F);
}
void MqttMessage::writeStr(const char *str)
@@ -168,8 +192,18 @@ bool FacebookProto::MqttConnect()
return true;
}
+void FacebookProto::MqttSend(const MqttMessage &payload)
+{
+ FbThrift msg;
+ msg << payload.m_leadingByte;
+ msg.writeIntV(payload.size());
+ msg.writeBuf(payload.data(), payload.size());
+ Netlib_Send(m_mqttConn, (char*)msg.data(), msg.size());
+}
+
void FacebookProto::MqttOpen()
{
+ uint8_t zeroByte = 0;
Utils_GetRandom(&m_iMqttId, sizeof(m_iMqttId) / 2);
FbThrift thrift;
@@ -207,42 +241,34 @@ void FacebookProto::MqttOpen()
thrift.writeField(FB_THRIFT_TYPE_LIST, 14, 12);
thrift.writeList(FB_THRIFT_TYPE_I32, 0);
- thrift << (BYTE)0;
+ thrift << zeroByte;
thrift.writeField(FB_THRIFT_TYPE_STRING);
- thrift << m_szAuthToken << (BYTE)0;
-
- size_t dataSize = thrift.size() + 100;
- BYTE *pData = (BYTE *)mir_alloc(dataSize);
-
- z_stream zStreamOut = {};
- deflateInit(&zStreamOut, Z_BEST_COMPRESSION);
- zStreamOut.avail_in = (unsigned)thrift.size();
- zStreamOut.next_in = (BYTE *)thrift.data();
- zStreamOut.avail_out = (unsigned)dataSize;
- zStreamOut.next_out = (BYTE *)pData;
-
- switch (deflate(&zStreamOut, Z_FINISH)) {
- case Z_STREAM_END: debugLogA("Deflate: Z_STREAM_END"); break;
- case Z_OK: debugLogA("Deflate: Z_OK"); break;
- case Z_BUF_ERROR: debugLogA("Deflate: Z_BUF_ERROR"); break;
- case Z_DATA_ERROR: debugLogA("Deflate: Z_DATA_ERROR"); break;
- case Z_MEM_ERROR: debugLogA("Deflate: Z_MEM_ERROR"); break;
- }
+ thrift << m_szAuthToken << zeroByte;
- deflateEnd(&zStreamOut);
- dataSize = dataSize - zStreamOut.avail_out;
+ size_t dataSize;
+ mir_ptr<uint8_t> pData(doZip(thrift.size(), thrift.data(), dataSize));
uint8_t protocolVersion = 3;
uint8_t flags = FB_MQTT_CONNECT_FLAG_USER | FB_MQTT_CONNECT_FLAG_PASS | FB_MQTT_CONNECT_FLAG_CLR | FB_MQTT_CONNECT_FLAG_QOS1;
- MqttMessage payload(FB_MQTT_MESSAGE_TYPE_CONNECT, 0, dataSize + 12); // size of things between size and payload (header, ...)
+ MqttMessage payload(FB_MQTT_MESSAGE_TYPE_CONNECT);
payload.writeStr("MQTToT");
payload << protocolVersion << flags;
payload.writeInt16(60); // timeout
payload.writeBuf(pData, dataSize);
+ MqttSend(payload);
+}
- if (pData != thrift.data())
- mir_free(pData);
+void FacebookProto::MqttPublish(const char *topic, const char *value)
+{
+ debugLogA("Publish: <%s> -> <%s>", topic, value);
- Netlib_Send(m_mqttConn, (char*)payload.data(), (int)payload.size());
+ size_t dataSize;
+ mir_ptr<uint8_t> pData(doZip(strlen(value), value, dataSize));
+
+ MqttMessage payload(FB_MQTT_MESSAGE_TYPE_PUBLISH);
+ payload.writeStr(topic);
+ payload.writeInt16(++m_mid); // timeout
+ payload.writeBuf(pData, dataSize);
+ MqttSend(payload);
}