summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--protocols/Facebook/src/mqtt.cpp46
-rw-r--r--protocols/Facebook/src/mqtt.h8
-rw-r--r--protocols/Facebook/src/proto.h3
3 files changed, 51 insertions, 6 deletions
diff --git a/protocols/Facebook/src/mqtt.cpp b/protocols/Facebook/src/mqtt.cpp
index 160048318d..72ed222079 100644
--- a/protocols/Facebook/src/mqtt.cpp
+++ b/protocols/Facebook/src/mqtt.cpp
@@ -50,7 +50,7 @@ static uint8_t encodeType(int type)
}
}
-uint8_t* FacebookProto::doZip(size_t cbData, const void *pData, size_t &cbRes)
+uint8_t *FacebookProto::doZip(size_t cbData, const void *pData, size_t &cbRes)
{
size_t dataSize = cbData + 100;
uint8_t *pRes = (uint8_t *)mir_alloc(dataSize);
@@ -75,6 +75,31 @@ uint8_t* FacebookProto::doZip(size_t cbData, const void *pData, size_t &cbRes)
return pRes;
}
+uint8_t *FacebookProto::doUnzip(size_t cbData, const void *pData, size_t &cbRes)
+{
+ size_t dataSize = cbData * 10;
+ uint8_t *pRes = (uint8_t *)mir_alloc(dataSize);
+
+ z_stream zStreamOut = {};
+ inflateInit(&zStreamOut);
+ zStreamOut.avail_in = (unsigned)cbData;
+ zStreamOut.next_in = (uint8_t *)pData;
+ zStreamOut.avail_out = (unsigned)dataSize;
+ zStreamOut.next_out = (uint8_t *)pRes;
+
+ switch (inflate(&zStreamOut, Z_FINISH)) {
+ case Z_STREAM_END: debugLogA("Deflate: Z_STREAM_END"); break;
+ case Z_OK: debugLogA("Deflate: Z_OK"); break;
+ case Z_BUF_ERROR: debugLogA("Deflate: Z_BUF_ERROR"); break;
+ case Z_DATA_ERROR: debugLogA("Deflate: Z_DATA_ERROR"); break;
+ case Z_MEM_ERROR: debugLogA("Deflate: Z_MEM_ERROR"); break;
+ }
+
+ inflateEnd(&zStreamOut);
+ cbRes = dataSize - zStreamOut.avail_out;
+ return pRes;
+}
+
/////////////////////////////////////////////////////////////////////////////////////////
FbThrift& FbThrift::operator<<(uint8_t value)
@@ -219,7 +244,7 @@ bool FacebookProto::MqttParse(const MqttMessage &payload)
bool FacebookProto::MqttRead(MqttMessage &payload)
{
uint8_t b;
- int res = Netlib_Recv(m_mqttConn, (char *)&b, sizeof(b));
+ int res = Netlib_Recv(m_mqttConn, (char *)&b, sizeof(b), MSG_NODUMP);
if (res != 1)
return false;
@@ -227,13 +252,15 @@ bool FacebookProto::MqttRead(MqttMessage &payload)
uint32_t m = 1, remainingBytes = 0;
do {
- if ((res = Netlib_Recv(m_mqttConn, (char *)&b, sizeof(b))) != 1)
+ if ((res = Netlib_Recv(m_mqttConn, (char *)&b, sizeof(b), MSG_NODUMP)) != 1)
return false;
remainingBytes += (b & 0x7F) * m;
m *= 128;
} while ((b & 0x80) != 0);
+ debugLogA("Received message of type=%d, flags=%x, body length=%d", payload.getType(), payload.getFlags(), remainingBytes);
+
if (remainingBytes != 0) {
while (remainingBytes > 0) {
uint8_t buf[1024];
@@ -244,6 +271,19 @@ bool FacebookProto::MqttRead(MqttMessage &payload)
payload.writeBuf(buf, res);
remainingBytes -= res;
}
+
+ // that might be a zipped buffer
+ if (payload.size() >= 2) {
+ auto *p = (const uint8_t *)payload.data();
+ if ((((p[0] << 8) | p[1]) % 31) == 0 && (p[0] & 0x0F) == 8) { // zip header ok
+ size_t dataSize;
+ void *pData = doUnzip(payload.size(), payload.data(), dataSize);
+ if (pData != nullptr) {
+ payload.reset(dataSize, pData);
+ mir_free(pData);
+ }
+ }
+ }
}
return true;
diff --git a/protocols/Facebook/src/mqtt.h b/protocols/Facebook/src/mqtt.h
index 70a2fc4671..dc7fcd41fc 100644
--- a/protocols/Facebook/src/mqtt.h
+++ b/protocols/Facebook/src/mqtt.h
@@ -59,8 +59,12 @@ class FbThrift
MBinBuffer m_buf;
public:
- __inline void* data() const { return m_buf.data(); }
- __inline size_t size() const { return m_buf.length(); }
+ __forceinline void* data() const { return m_buf.data(); }
+ __forceinline size_t size() const { return m_buf.length(); }
+
+ __forceinline void reset(const size_t cbLen, void *pData)
+ { m_buf.assign(pData, cbLen);
+ }
FbThrift& operator<<(uint8_t);
FbThrift& operator<<(const char *);
diff --git a/protocols/Facebook/src/proto.h b/protocols/Facebook/src/proto.h
index 9a951236b1..01a0bae4f4 100644
--- a/protocols/Facebook/src/proto.h
+++ b/protocols/Facebook/src/proto.h
@@ -339,7 +339,8 @@ public:
class FacebookProto : public PROTO<FacebookProto>
{
- uint8_t* doZip(size_t cbData, const void *pData, size_t &cbRes);
+ uint8_t *doZip(size_t cbData, const void *pData, size_t &cbRes);
+ uint8_t *doUnzip(size_t cbData, const void *pData, size_t &cbRes);
AsyncHttpRequest *CreateRequest(const char *szName, const char *szMethod);
AsyncHttpRequest *CreateRequestGQL(int64_t id);