diff options
author | George Hazan <ghazan@miranda.im> | 2019-12-23 16:46:46 +0300 |
---|---|---|
committer | George Hazan <ghazan@miranda.im> | 2019-12-23 16:46:46 +0300 |
commit | 8d2d77dbdc720b55647e107305162b25067457f4 (patch) | |
tree | a4a7569211b1240cfec9f4f1862f592dddefb23f /protocols/Facebook | |
parent | a29c9194cf2d9f11839ea833872beab754e19527 (diff) |
Facebook:
- MQTT message reader & parser (initial version);
- MQTT ping;
Diffstat (limited to 'protocols/Facebook')
-rw-r--r-- | protocols/Facebook/src/mqtt.cpp | 69 | ||||
-rw-r--r-- | protocols/Facebook/src/mqtt.h | 10 | ||||
-rw-r--r-- | protocols/Facebook/src/proto.h | 5 | ||||
-rw-r--r-- | protocols/Facebook/src/server.cpp | 33 |
4 files changed, 109 insertions, 8 deletions
diff --git a/protocols/Facebook/src/mqtt.cpp b/protocols/Facebook/src/mqtt.cpp index e8c3638741..160048318d 100644 --- a/protocols/Facebook/src/mqtt.cpp +++ b/protocols/Facebook/src/mqtt.cpp @@ -162,6 +162,11 @@ void FbThrift::writeIntV(uint64_t value) } while (!bLast); } +MqttMessage::MqttMessage() : + m_leadingByte(0) +{ +} + MqttMessage::MqttMessage(FbMqttMessageType type, uint8_t flags) { m_leadingByte = ((type & 0x0F) << 4) | (flags & 0x0F); @@ -192,6 +197,58 @@ bool FacebookProto::MqttConnect() return true; } +bool FacebookProto::MqttParse(const MqttMessage &payload) +{ + auto *pData = (const uint8_t *)payload.data(); + + switch (payload.getType()) { + case FB_MQTT_MESSAGE_TYPE_CONNACK: + if (pData[1] != 0) { // connection failed; + ProtoBroadcastAck(0, ACKTYPE_LOGIN, ACKRESULT_FAILED, (HANDLE)m_iStatus, m_iDesiredStatus); + return false; + } + + OnLoggedIn(); + MqttPing(); + break; + } + + return true; +} + +bool FacebookProto::MqttRead(MqttMessage &payload) +{ + uint8_t b; + int res = Netlib_Recv(m_mqttConn, (char *)&b, sizeof(b)); + if (res != 1) + return false; + + payload.m_leadingByte = b; + + uint32_t m = 1, remainingBytes = 0; + do { + if ((res = Netlib_Recv(m_mqttConn, (char *)&b, sizeof(b))) != 1) + return false; + + remainingBytes += (b & 0x7F) * m; + m *= 128; + } while ((b & 0x80) != 0); + + if (remainingBytes != 0) { + while (remainingBytes > 0) { + uint8_t buf[1024]; + int size = min(remainingBytes, sizeof(buf)); + if ((res = Netlib_Recv(m_mqttConn, (char *)buf, size)) <= 0) + return false; + + payload.writeBuf(buf, res); + remainingBytes -= res; + } + } + + return true; +} + void FacebookProto::MqttSend(const MqttMessage &payload) { FbThrift msg; @@ -201,6 +258,9 @@ void FacebookProto::MqttSend(const MqttMessage &payload) Netlib_Send(m_mqttConn, (char*)msg.data(), (unsigned)msg.size()); } +///////////////////////////////////////////////////////////////////////////////////////// +// creates initial MQTT will and sends initialization packet + void FacebookProto::MqttOpen() { uint8_t zeroByte = 0; @@ -259,6 +319,15 @@ void FacebookProto::MqttOpen() MqttSend(payload); } +///////////////////////////////////////////////////////////////////////////////////////// +// various MQTT send commands + +void FacebookProto::MqttPing() +{ + MqttMessage payload(FB_MQTT_MESSAGE_TYPE_PINGREQ); + MqttSend(payload); +} + void FacebookProto::MqttPublish(const char *topic, const char *value) { debugLogA("Publish: <%s> -> <%s>", topic, value); diff --git a/protocols/Facebook/src/mqtt.h b/protocols/Facebook/src/mqtt.h index 3dd0dc6a20..70a2fc4671 100644 --- a/protocols/Facebook/src/mqtt.h +++ b/protocols/Facebook/src/mqtt.h @@ -83,8 +83,18 @@ class MqttMessage : public FbThrift uint8_t m_leadingByte; public: + MqttMessage(); MqttMessage(FbMqttMessageType type, uint8_t flags = 0); + __forceinline int getType() const + { + return m_leadingByte >> 4; + } + + __forceinline int getFlags() const + { return m_leadingByte & 0x0F; + } + void writeStr(const char *str); }; diff --git a/protocols/Facebook/src/proto.h b/protocols/Facebook/src/proto.h index 5d43e5bab8..9a951236b1 100644 --- a/protocols/Facebook/src/proto.h +++ b/protocols/Facebook/src/proto.h @@ -348,9 +348,14 @@ class FacebookProto : public PROTO<FacebookProto> // MQTT functions bool MqttConnect(); void MqttOpen(); + + void MqttPing(); void MqttPublish(const char *topic, const char *value); void MqttSubscribe(const char *topic, ...); void MqttUnsubscribe(const char *topic, ...); + + bool MqttRead(MqttMessage &payload); + bool MqttParse(const MqttMessage &payload); void MqttSend(const MqttMessage &payload); HNETLIBCONN m_mqttConn; diff --git a/protocols/Facebook/src/server.cpp b/protocols/Facebook/src/server.cpp index c54c46e29e..a2753154e5 100644 --- a/protocols/Facebook/src/server.cpp +++ b/protocols/Facebook/src/server.cpp @@ -88,28 +88,45 @@ FAIL: return; } - // TODO: process contacts + // connect to MQTT server if (!MqttConnect()) goto FAIL; + // send initial packet MqttOpen(); - OnLoggedIn(); - int bufSize = 2048; - char *buf = (char *)mir_alloc(bufSize); + __int64 startTime = GetTickCount64(); while (!Miranda_IsTerminated()) { - int ret = Netlib_Recv(m_mqttConn, buf, bufSize); + NETLIBSELECT nls = {}; + nls.hReadConns[0] = m_mqttConn; + nls.dwTimeout = 1000; + int ret = Netlib_Select(&nls); if (ret == SOCKET_ERROR) { debugLogA("Netlib_Recv() failed, error=%d", WSAGetLastError()); break; } - if (ret == 0) { - debugLogA("Connection closed gracefully"); + + __int64 currTime = GetTickCount64(); + if (currTime - startTime > 60000) { + startTime = currTime; + MqttPing(); + } + + // no data, continue waiting + if (ret == 0) + continue; + + MqttMessage msg; + if (!MqttRead(msg)) { + debugLogA("MqttRead() failed"); break; } - // TODO: process MQTT responses + if (!MqttParse(msg)) { + debugLogA("MqttParse() failed"); + break; + } } debugLogA("exiting ServerThread"); |