diff options
author | George Hazan <ghazan@miranda.im> | 2019-12-18 20:39:59 +0300 |
---|---|---|
committer | George Hazan <ghazan@miranda.im> | 2019-12-18 20:39:59 +0300 |
commit | fe77d2543f2822b6637c4090887bf349fc4ccb75 (patch) | |
tree | c3ab1b2e4e9a1c4ca3bf1ee8a5b9a316e77ab83b /protocols | |
parent | 406f0e4a4f42979778757243ed29c2753cbb1107 (diff) |
Facebook:
- MqttSend now calculates payload size automatically (no need to add some magic numbers);
- support for MQTT publish requests;
- minor code cleaning
Diffstat (limited to 'protocols')
-rw-r--r-- | protocols/Facebook/src/mqtt.cpp | 88 | ||||
-rw-r--r-- | protocols/Facebook/src/mqtt.h | 10 | ||||
-rw-r--r-- | protocols/Facebook/src/proto.h | 7 | ||||
-rw-r--r-- | protocols/Facebook/src/server.cpp | 7 |
4 files changed, 77 insertions, 35 deletions
diff --git a/protocols/Facebook/src/mqtt.cpp b/protocols/Facebook/src/mqtt.cpp index 2d1e6a02e8..dd92795a4c 100644 --- a/protocols/Facebook/src/mqtt.cpp +++ b/protocols/Facebook/src/mqtt.cpp @@ -50,6 +50,33 @@ static uint8_t encodeType(int type) } } +uint8_t* FacebookProto::doZip(size_t cbData, const void *pData, size_t &cbRes) +{ + size_t dataSize = cbData + 100; + uint8_t *pRes = (uint8_t *)mir_alloc(dataSize); + + z_stream zStreamOut = {}; + deflateInit(&zStreamOut, Z_BEST_COMPRESSION); + zStreamOut.avail_in = (unsigned)cbData; + zStreamOut.next_in = (uint8_t *)pData; + zStreamOut.avail_out = (unsigned)dataSize; + zStreamOut.next_out = (uint8_t *)pRes; + + switch (deflate(&zStreamOut, Z_FINISH)) { + case Z_STREAM_END: debugLogA("Deflate: Z_STREAM_END"); break; + case Z_OK: debugLogA("Deflate: Z_OK"); break; + case Z_BUF_ERROR: debugLogA("Deflate: Z_BUF_ERROR"); break; + case Z_DATA_ERROR: debugLogA("Deflate: Z_DATA_ERROR"); break; + case Z_MEM_ERROR: debugLogA("Deflate: Z_MEM_ERROR"); break; + } + + deflateEnd(&zStreamOut); + cbRes = dataSize - zStreamOut.avail_out; + return pRes; +} + +///////////////////////////////////////////////////////////////////////////////////////// + FbThrift& FbThrift::operator<<(uint8_t value) { m_buf.append(&value, 1); @@ -135,12 +162,9 @@ void FbThrift::writeIntV(uint64_t value) } while (!bLast); } -MqttMessage::MqttMessage(FbMqttMessageType type, uint8_t flags, size_t payloadSize) +MqttMessage::MqttMessage(FbMqttMessageType type, uint8_t flags) { - uint8_t byte = ((type & 0x0F) << 4) | (flags & 0x0F); - *this << byte; - - writeIntV(payloadSize); + m_leadingByte = ((type & 0x0F) << 4) | (flags & 0x0F); } void MqttMessage::writeStr(const char *str) @@ -168,8 +192,18 @@ bool FacebookProto::MqttConnect() return true; } +void FacebookProto::MqttSend(const MqttMessage &payload) +{ + FbThrift msg; + msg << payload.m_leadingByte; + msg.writeIntV(payload.size()); + msg.writeBuf(payload.data(), payload.size()); + Netlib_Send(m_mqttConn, (char*)msg.data(), msg.size()); +} + void FacebookProto::MqttOpen() { + uint8_t zeroByte = 0; Utils_GetRandom(&m_iMqttId, sizeof(m_iMqttId) / 2); FbThrift thrift; @@ -207,42 +241,34 @@ void FacebookProto::MqttOpen() thrift.writeField(FB_THRIFT_TYPE_LIST, 14, 12); thrift.writeList(FB_THRIFT_TYPE_I32, 0); - thrift << (BYTE)0; + thrift << zeroByte; thrift.writeField(FB_THRIFT_TYPE_STRING); - thrift << m_szAuthToken << (BYTE)0; - - size_t dataSize = thrift.size() + 100; - BYTE *pData = (BYTE *)mir_alloc(dataSize); - - z_stream zStreamOut = {}; - deflateInit(&zStreamOut, Z_BEST_COMPRESSION); - zStreamOut.avail_in = (unsigned)thrift.size(); - zStreamOut.next_in = (BYTE *)thrift.data(); - zStreamOut.avail_out = (unsigned)dataSize; - zStreamOut.next_out = (BYTE *)pData; - - switch (deflate(&zStreamOut, Z_FINISH)) { - case Z_STREAM_END: debugLogA("Deflate: Z_STREAM_END"); break; - case Z_OK: debugLogA("Deflate: Z_OK"); break; - case Z_BUF_ERROR: debugLogA("Deflate: Z_BUF_ERROR"); break; - case Z_DATA_ERROR: debugLogA("Deflate: Z_DATA_ERROR"); break; - case Z_MEM_ERROR: debugLogA("Deflate: Z_MEM_ERROR"); break; - } + thrift << m_szAuthToken << zeroByte; - deflateEnd(&zStreamOut); - dataSize = dataSize - zStreamOut.avail_out; + size_t dataSize; + mir_ptr<uint8_t> pData(doZip(thrift.size(), thrift.data(), dataSize)); uint8_t protocolVersion = 3; uint8_t flags = FB_MQTT_CONNECT_FLAG_USER | FB_MQTT_CONNECT_FLAG_PASS | FB_MQTT_CONNECT_FLAG_CLR | FB_MQTT_CONNECT_FLAG_QOS1; - MqttMessage payload(FB_MQTT_MESSAGE_TYPE_CONNECT, 0, dataSize + 12); // size of things between size and payload (header, ...) + MqttMessage payload(FB_MQTT_MESSAGE_TYPE_CONNECT); payload.writeStr("MQTToT"); payload << protocolVersion << flags; payload.writeInt16(60); // timeout payload.writeBuf(pData, dataSize); + MqttSend(payload); +} - if (pData != thrift.data()) - mir_free(pData); +void FacebookProto::MqttPublish(const char *topic, const char *value) +{ + debugLogA("Publish: <%s> -> <%s>", topic, value); - Netlib_Send(m_mqttConn, (char*)payload.data(), (int)payload.size()); + size_t dataSize; + mir_ptr<uint8_t> pData(doZip(strlen(value), value, dataSize)); + + MqttMessage payload(FB_MQTT_MESSAGE_TYPE_PUBLISH); + payload.writeStr(topic); + payload.writeInt16(++m_mid); // timeout + payload.writeBuf(pData, dataSize); + MqttSend(payload); } diff --git a/protocols/Facebook/src/mqtt.h b/protocols/Facebook/src/mqtt.h index 8b9597f304..3dd0dc6a20 100644 --- a/protocols/Facebook/src/mqtt.h +++ b/protocols/Facebook/src/mqtt.h @@ -59,8 +59,8 @@ class FbThrift MBinBuffer m_buf; public: - __inline void* data() { return m_buf.data(); } - __inline size_t size() { return m_buf.length(); } + __inline void* data() const { return m_buf.data(); } + __inline size_t size() const { return m_buf.length(); } FbThrift& operator<<(uint8_t); FbThrift& operator<<(const char *); @@ -78,8 +78,12 @@ public: class MqttMessage : public FbThrift { + friend class FacebookProto; + + uint8_t m_leadingByte; + public: - MqttMessage(FbMqttMessageType type, uint8_t flags, size_t payloadSize); + MqttMessage(FbMqttMessageType type, uint8_t flags = 0); void writeStr(const char *str); }; diff --git a/protocols/Facebook/src/proto.h b/protocols/Facebook/src/proto.h index 761b49edb2..f64eb5c3ea 100644 --- a/protocols/Facebook/src/proto.h +++ b/protocols/Facebook/src/proto.h @@ -339,13 +339,17 @@ public: class FacebookProto : public PROTO<FacebookProto> { + uint8_t* doZip(size_t cbData, const void *pData, size_t &cbRes); + AsyncHttpRequest *CreateRequest(const char *szName, const char *szMethod); AsyncHttpRequest *CreateRequestGQL(int64_t id); NETLIBHTTPREQUEST *ExecuteRequest(AsyncHttpRequest *pReq); // MQTT functions bool MqttConnect(); - void MqttOpen(); + void MqttOpen(); + void MqttPublish(const char *topic, const char *value); + void MqttSend(const MqttMessage &payload); HNETLIBCONN m_mqttConn; @@ -354,6 +358,7 @@ class FacebookProto : public PROTO<FacebookProto> CMStringA m_szClientID; // stored, random alphanumeric string of 20 chars __int64 m_uid; // stored, Facebook user id __int64 m_iMqttId; + int16_t m_mid; bool m_invisible; bool m_bOnline; diff --git a/protocols/Facebook/src/server.cpp b/protocols/Facebook/src/server.cpp index 72942d8cce..9eb13b9378 100644 --- a/protocols/Facebook/src/server.cpp +++ b/protocols/Facebook/src/server.cpp @@ -23,9 +23,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. void FacebookProto::OnLoggedIn() { m_bOnline = true; + m_mid = 0; ProtoBroadcastAck(0, ACKTYPE_STATUS, ACKRESULT_SUCCESS, (HANDLE)m_iStatus, m_iDesiredStatus); m_iStatus = m_iDesiredStatus; + + // MqttPublish("/foreground_state", "{\"foreground\":\"true\", \"keepalive_timeout\":\"60\"}"); } void FacebookProto::OnLoggedOut() @@ -107,6 +110,10 @@ FAIL: } debugLogA("exiting ServerThread"); + + Netlib_CloseHandle(m_mqttConn); + m_mqttConn = nullptr; + int oldStatus = m_iStatus; m_iDesiredStatus = m_iStatus = ID_STATUS_OFFLINE; ProtoBroadcastAck(0, ACKTYPE_STATUS, ACKRESULT_SUCCESS, (HANDLE)oldStatus, m_iStatus); |