summaryrefslogtreecommitdiff
path: root/protocols/Facebook
diff options
context:
space:
mode:
authorGeorge Hazan <ghazan@miranda.im>2019-12-23 16:46:46 +0300
committerGeorge Hazan <ghazan@miranda.im>2019-12-23 16:46:46 +0300
commit8d2d77dbdc720b55647e107305162b25067457f4 (patch)
treea4a7569211b1240cfec9f4f1862f592dddefb23f /protocols/Facebook
parenta29c9194cf2d9f11839ea833872beab754e19527 (diff)
Facebook:
- MQTT message reader & parser (initial version); - MQTT ping;
Diffstat (limited to 'protocols/Facebook')
-rw-r--r--protocols/Facebook/src/mqtt.cpp69
-rw-r--r--protocols/Facebook/src/mqtt.h10
-rw-r--r--protocols/Facebook/src/proto.h5
-rw-r--r--protocols/Facebook/src/server.cpp33
4 files changed, 109 insertions, 8 deletions
diff --git a/protocols/Facebook/src/mqtt.cpp b/protocols/Facebook/src/mqtt.cpp
index e8c3638741..160048318d 100644
--- a/protocols/Facebook/src/mqtt.cpp
+++ b/protocols/Facebook/src/mqtt.cpp
@@ -162,6 +162,11 @@ void FbThrift::writeIntV(uint64_t value)
} while (!bLast);
}
+MqttMessage::MqttMessage() :
+ m_leadingByte(0)
+{
+}
+
MqttMessage::MqttMessage(FbMqttMessageType type, uint8_t flags)
{
m_leadingByte = ((type & 0x0F) << 4) | (flags & 0x0F);
@@ -192,6 +197,58 @@ bool FacebookProto::MqttConnect()
return true;
}
+bool FacebookProto::MqttParse(const MqttMessage &payload)
+{
+ auto *pData = (const uint8_t *)payload.data();
+
+ switch (payload.getType()) {
+ case FB_MQTT_MESSAGE_TYPE_CONNACK:
+ if (pData[1] != 0) { // connection failed;
+ ProtoBroadcastAck(0, ACKTYPE_LOGIN, ACKRESULT_FAILED, (HANDLE)m_iStatus, m_iDesiredStatus);
+ return false;
+ }
+
+ OnLoggedIn();
+ MqttPing();
+ break;
+ }
+
+ return true;
+}
+
+bool FacebookProto::MqttRead(MqttMessage &payload)
+{
+ uint8_t b;
+ int res = Netlib_Recv(m_mqttConn, (char *)&b, sizeof(b));
+ if (res != 1)
+ return false;
+
+ payload.m_leadingByte = b;
+
+ uint32_t m = 1, remainingBytes = 0;
+ do {
+ if ((res = Netlib_Recv(m_mqttConn, (char *)&b, sizeof(b))) != 1)
+ return false;
+
+ remainingBytes += (b & 0x7F) * m;
+ m *= 128;
+ } while ((b & 0x80) != 0);
+
+ if (remainingBytes != 0) {
+ while (remainingBytes > 0) {
+ uint8_t buf[1024];
+ int size = min(remainingBytes, sizeof(buf));
+ if ((res = Netlib_Recv(m_mqttConn, (char *)buf, size)) <= 0)
+ return false;
+
+ payload.writeBuf(buf, res);
+ remainingBytes -= res;
+ }
+ }
+
+ return true;
+}
+
void FacebookProto::MqttSend(const MqttMessage &payload)
{
FbThrift msg;
@@ -201,6 +258,9 @@ void FacebookProto::MqttSend(const MqttMessage &payload)
Netlib_Send(m_mqttConn, (char*)msg.data(), (unsigned)msg.size());
}
+/////////////////////////////////////////////////////////////////////////////////////////
+// creates initial MQTT will and sends initialization packet
+
void FacebookProto::MqttOpen()
{
uint8_t zeroByte = 0;
@@ -259,6 +319,15 @@ void FacebookProto::MqttOpen()
MqttSend(payload);
}
+/////////////////////////////////////////////////////////////////////////////////////////
+// various MQTT send commands
+
+void FacebookProto::MqttPing()
+{
+ MqttMessage payload(FB_MQTT_MESSAGE_TYPE_PINGREQ);
+ MqttSend(payload);
+}
+
void FacebookProto::MqttPublish(const char *topic, const char *value)
{
debugLogA("Publish: <%s> -> <%s>", topic, value);
diff --git a/protocols/Facebook/src/mqtt.h b/protocols/Facebook/src/mqtt.h
index 3dd0dc6a20..70a2fc4671 100644
--- a/protocols/Facebook/src/mqtt.h
+++ b/protocols/Facebook/src/mqtt.h
@@ -83,8 +83,18 @@ class MqttMessage : public FbThrift
uint8_t m_leadingByte;
public:
+ MqttMessage();
MqttMessage(FbMqttMessageType type, uint8_t flags = 0);
+ __forceinline int getType() const
+ {
+ return m_leadingByte >> 4;
+ }
+
+ __forceinline int getFlags() const
+ { return m_leadingByte & 0x0F;
+ }
+
void writeStr(const char *str);
};
diff --git a/protocols/Facebook/src/proto.h b/protocols/Facebook/src/proto.h
index 5d43e5bab8..9a951236b1 100644
--- a/protocols/Facebook/src/proto.h
+++ b/protocols/Facebook/src/proto.h
@@ -348,9 +348,14 @@ class FacebookProto : public PROTO<FacebookProto>
// MQTT functions
bool MqttConnect();
void MqttOpen();
+
+ void MqttPing();
void MqttPublish(const char *topic, const char *value);
void MqttSubscribe(const char *topic, ...);
void MqttUnsubscribe(const char *topic, ...);
+
+ bool MqttRead(MqttMessage &payload);
+ bool MqttParse(const MqttMessage &payload);
void MqttSend(const MqttMessage &payload);
HNETLIBCONN m_mqttConn;
diff --git a/protocols/Facebook/src/server.cpp b/protocols/Facebook/src/server.cpp
index c54c46e29e..a2753154e5 100644
--- a/protocols/Facebook/src/server.cpp
+++ b/protocols/Facebook/src/server.cpp
@@ -88,28 +88,45 @@ FAIL:
return;
}
- // TODO: process contacts
+ // connect to MQTT server
if (!MqttConnect())
goto FAIL;
+ // send initial packet
MqttOpen();
- OnLoggedIn();
- int bufSize = 2048;
- char *buf = (char *)mir_alloc(bufSize);
+ __int64 startTime = GetTickCount64();
while (!Miranda_IsTerminated()) {
- int ret = Netlib_Recv(m_mqttConn, buf, bufSize);
+ NETLIBSELECT nls = {};
+ nls.hReadConns[0] = m_mqttConn;
+ nls.dwTimeout = 1000;
+ int ret = Netlib_Select(&nls);
if (ret == SOCKET_ERROR) {
debugLogA("Netlib_Recv() failed, error=%d", WSAGetLastError());
break;
}
- if (ret == 0) {
- debugLogA("Connection closed gracefully");
+
+ __int64 currTime = GetTickCount64();
+ if (currTime - startTime > 60000) {
+ startTime = currTime;
+ MqttPing();
+ }
+
+ // no data, continue waiting
+ if (ret == 0)
+ continue;
+
+ MqttMessage msg;
+ if (!MqttRead(msg)) {
+ debugLogA("MqttRead() failed");
break;
}
- // TODO: process MQTT responses
+ if (!MqttParse(msg)) {
+ debugLogA("MqttParse() failed");
+ break;
+ }
}
debugLogA("exiting ServerThread");