diff options
Diffstat (limited to 'protocols/Facebook/src/mqtt.cpp')
-rw-r--r-- | protocols/Facebook/src/mqtt.cpp | 88 |
1 files changed, 57 insertions, 31 deletions
diff --git a/protocols/Facebook/src/mqtt.cpp b/protocols/Facebook/src/mqtt.cpp index 2d1e6a02e8..dd92795a4c 100644 --- a/protocols/Facebook/src/mqtt.cpp +++ b/protocols/Facebook/src/mqtt.cpp @@ -50,6 +50,33 @@ static uint8_t encodeType(int type) } } +uint8_t* FacebookProto::doZip(size_t cbData, const void *pData, size_t &cbRes) +{ + size_t dataSize = cbData + 100; + uint8_t *pRes = (uint8_t *)mir_alloc(dataSize); + + z_stream zStreamOut = {}; + deflateInit(&zStreamOut, Z_BEST_COMPRESSION); + zStreamOut.avail_in = (unsigned)cbData; + zStreamOut.next_in = (uint8_t *)pData; + zStreamOut.avail_out = (unsigned)dataSize; + zStreamOut.next_out = (uint8_t *)pRes; + + switch (deflate(&zStreamOut, Z_FINISH)) { + case Z_STREAM_END: debugLogA("Deflate: Z_STREAM_END"); break; + case Z_OK: debugLogA("Deflate: Z_OK"); break; + case Z_BUF_ERROR: debugLogA("Deflate: Z_BUF_ERROR"); break; + case Z_DATA_ERROR: debugLogA("Deflate: Z_DATA_ERROR"); break; + case Z_MEM_ERROR: debugLogA("Deflate: Z_MEM_ERROR"); break; + } + + deflateEnd(&zStreamOut); + cbRes = dataSize - zStreamOut.avail_out; + return pRes; +} + +///////////////////////////////////////////////////////////////////////////////////////// + FbThrift& FbThrift::operator<<(uint8_t value) { m_buf.append(&value, 1); @@ -135,12 +162,9 @@ void FbThrift::writeIntV(uint64_t value) } while (!bLast); } -MqttMessage::MqttMessage(FbMqttMessageType type, uint8_t flags, size_t payloadSize) +MqttMessage::MqttMessage(FbMqttMessageType type, uint8_t flags) { - uint8_t byte = ((type & 0x0F) << 4) | (flags & 0x0F); - *this << byte; - - writeIntV(payloadSize); + m_leadingByte = ((type & 0x0F) << 4) | (flags & 0x0F); } void MqttMessage::writeStr(const char *str) @@ -168,8 +192,18 @@ bool FacebookProto::MqttConnect() return true; } +void FacebookProto::MqttSend(const MqttMessage &payload) +{ + FbThrift msg; + msg << payload.m_leadingByte; + msg.writeIntV(payload.size()); + msg.writeBuf(payload.data(), payload.size()); + Netlib_Send(m_mqttConn, (char*)msg.data(), msg.size()); +} + void FacebookProto::MqttOpen() { + uint8_t zeroByte = 0; Utils_GetRandom(&m_iMqttId, sizeof(m_iMqttId) / 2); FbThrift thrift; @@ -207,42 +241,34 @@ void FacebookProto::MqttOpen() thrift.writeField(FB_THRIFT_TYPE_LIST, 14, 12); thrift.writeList(FB_THRIFT_TYPE_I32, 0); - thrift << (BYTE)0; + thrift << zeroByte; thrift.writeField(FB_THRIFT_TYPE_STRING); - thrift << m_szAuthToken << (BYTE)0; - - size_t dataSize = thrift.size() + 100; - BYTE *pData = (BYTE *)mir_alloc(dataSize); - - z_stream zStreamOut = {}; - deflateInit(&zStreamOut, Z_BEST_COMPRESSION); - zStreamOut.avail_in = (unsigned)thrift.size(); - zStreamOut.next_in = (BYTE *)thrift.data(); - zStreamOut.avail_out = (unsigned)dataSize; - zStreamOut.next_out = (BYTE *)pData; - - switch (deflate(&zStreamOut, Z_FINISH)) { - case Z_STREAM_END: debugLogA("Deflate: Z_STREAM_END"); break; - case Z_OK: debugLogA("Deflate: Z_OK"); break; - case Z_BUF_ERROR: debugLogA("Deflate: Z_BUF_ERROR"); break; - case Z_DATA_ERROR: debugLogA("Deflate: Z_DATA_ERROR"); break; - case Z_MEM_ERROR: debugLogA("Deflate: Z_MEM_ERROR"); break; - } + thrift << m_szAuthToken << zeroByte; - deflateEnd(&zStreamOut); - dataSize = dataSize - zStreamOut.avail_out; + size_t dataSize; + mir_ptr<uint8_t> pData(doZip(thrift.size(), thrift.data(), dataSize)); uint8_t protocolVersion = 3; uint8_t flags = FB_MQTT_CONNECT_FLAG_USER | FB_MQTT_CONNECT_FLAG_PASS | FB_MQTT_CONNECT_FLAG_CLR | FB_MQTT_CONNECT_FLAG_QOS1; - MqttMessage payload(FB_MQTT_MESSAGE_TYPE_CONNECT, 0, dataSize + 12); // size of things between size and payload (header, ...) + MqttMessage payload(FB_MQTT_MESSAGE_TYPE_CONNECT); payload.writeStr("MQTToT"); payload << protocolVersion << flags; payload.writeInt16(60); // timeout payload.writeBuf(pData, dataSize); + MqttSend(payload); +} - if (pData != thrift.data()) - mir_free(pData); +void FacebookProto::MqttPublish(const char *topic, const char *value) +{ + debugLogA("Publish: <%s> -> <%s>", topic, value); - Netlib_Send(m_mqttConn, (char*)payload.data(), (int)payload.size()); + size_t dataSize; + mir_ptr<uint8_t> pData(doZip(strlen(value), value, dataSize)); + + MqttMessage payload(FB_MQTT_MESSAGE_TYPE_PUBLISH); + payload.writeStr(topic); + payload.writeInt16(++m_mid); // timeout + payload.writeBuf(pData, dataSize); + MqttSend(payload); } |