summaryrefslogtreecommitdiff
path: root/protocols
diff options
context:
space:
mode:
Diffstat (limited to 'protocols')
-rw-r--r--protocols/Facebook/src/mqtt.cpp25
-rw-r--r--protocols/Facebook/src/proto.h3
-rw-r--r--protocols/Facebook/src/server.cpp31
3 files changed, 42 insertions, 17 deletions
diff --git a/protocols/Facebook/src/mqtt.cpp b/protocols/Facebook/src/mqtt.cpp
index 8d468ece89..00ebf341a4 100644
--- a/protocols/Facebook/src/mqtt.cpp
+++ b/protocols/Facebook/src/mqtt.cpp
@@ -308,3 +308,28 @@ void FacebookProto::MqttUnsubscribe(const char *topic, ...)
MqttSend(payload);
}
+
+/////////////////////////////////////////////////////////////////////////////////////////
+// MQTT queue
+
+void FacebookProto::MqttQueueConnect()
+{
+ JSONNode query;
+ query << INT_PARAM("delta_batch_size", 125) << INT_PARAM("max_deltas_able_to_process", 1000) << INT_PARAM("sync_api_version", 3) << CHAR_PARAM("encoding", "JSON");
+ if (m_szSyncToken.IsEmpty()) {
+ JSONNode hashes; hashes.set_name("graphql_query_hashes"); hashes << CHAR_PARAM("xma_query_id", __STRINGIFY(FB_API_QUERY_XMA));
+
+ JSONNode xma; xma.set_name(__STRINGIFY(FB_API_QUERY_XMA)); xma << CHAR_PARAM("xma_id", "<ID>");
+ JSONNode hql; hql.set_name("graphql_query_params"); hql << xma;
+
+ JSONNode params; params.set_name("queue_params");
+ params << CHAR_PARAM("buzz_on_deltas_enabled", "false") << hashes << hql;
+
+ query << INT64_PARAM("initial_titan_sequence_id", m_sid) << CHAR_PARAM("device_id", m_szDeviceID) << INT64_PARAM("entity_fbid", m_uid) << params;
+ MqttPublish("/messenger_sync_create_queue", query.write().c_str());
+ }
+ else {
+ query << INT64_PARAM("last_seq_id", m_sid) << CHAR_PARAM("sync_token", m_szSyncToken);
+ MqttPublish("/messenger_sync_get_diffs", query.write().c_str());
+ }
+}
diff --git a/protocols/Facebook/src/proto.h b/protocols/Facebook/src/proto.h
index 4015244ac6..2157642a06 100644
--- a/protocols/Facebook/src/proto.h
+++ b/protocols/Facebook/src/proto.h
@@ -413,6 +413,8 @@ class FacebookProto : public PROTO<FacebookProto>
bool MqttParse(const MqttMessage &payload);
void MqttSend(const MqttMessage &payload);
+ void MqttQueueConnect();
+
void OnPublish(const char *str, const uint8_t *payLoad, size_t cbLen);
void OnPublishMessage(FbThriftReader &rdr);
void OnPublishPresence(FbThriftReader &rdr);
@@ -433,6 +435,7 @@ class FacebookProto : public PROTO<FacebookProto>
int m_iUnread;
bool m_invisible;
bool m_bOnline;
+ bool m_QueueCreated;
CMStringA m_szAuthToken; // calculated
diff --git a/protocols/Facebook/src/server.cpp b/protocols/Facebook/src/server.cpp
index cf6ac2f2d7..43c12a51c4 100644
--- a/protocols/Facebook/src/server.cpp
+++ b/protocols/Facebook/src/server.cpp
@@ -65,24 +65,8 @@ void FacebookProto::OnLoggedIn()
m_impl.m_heartBeat.Start(60000);
// connect message queue
- JSONNode query;
- query << INT_PARAM("delta_batch_size", 125) << INT_PARAM("max_deltas_able_to_process", 1000) << INT_PARAM("sync_api_version", 3) << CHAR_PARAM("encoding", "JSON");
- if (m_szSyncToken.IsEmpty()) {
- JSONNode hashes; hashes.set_name("graphql_query_hashes"); hashes << CHAR_PARAM("xma_query_id", __STRINGIFY(FB_API_QUERY_XMA));
+ MqttQueueConnect();
- JSONNode xma; xma.set_name(__STRINGIFY(FB_API_QUERY_XMA)); xma << CHAR_PARAM("xma_id", "<ID>");
- JSONNode hql; hql.set_name("graphql_query_params"); hql << xma;
-
- JSONNode params; params.set_name("queue_params");
- params << CHAR_PARAM("buzz_on_deltas_enabled", "false") << hashes << hql;
-
- query << INT64_PARAM("initial_titan_sequence_id", m_sid) << CHAR_PARAM("device_id", m_szDeviceID) << INT64_PARAM("entity_fbid", m_uid) << params;
- MqttPublish("/messenger_sync_create_queue", query.write().c_str());
- }
- else {
- query << INT64_PARAM("last_seq_id", m_sid) << CHAR_PARAM("sync_token", m_szSyncToken);
- MqttPublish("/messenger_sync_get_diffs", query.write().c_str());
- }
}
void FacebookProto::OnLoggedOut()
@@ -198,6 +182,8 @@ bool FacebookProto::RefreshToken()
void FacebookProto::ServerThread(void *)
{
+ m_QueueCreated = false;
+
LBL_Begin:
m_szAuthToken = getMStringA(DBKEY_TOKEN);
if (m_szAuthToken.IsEmpty()) {
@@ -393,6 +379,17 @@ void FacebookProto::OnPublishMessage(FbThriftReader &rdr)
debugLogA("MS: <%s>", szJson.c_str());
JSONNode root = JSONNode::parse(szJson);
+ CMStringA errorCode = root["errorCode"].as_mstring();
+ if (!errorCode.IsEmpty()) {
+ if (!m_QueueCreated && (errorCode == "ERROR_QUEUE_NOT_FOUND" || errorCode == "ERROR_QUEUE_LOST" )) {
+ m_QueueCreated = true; // prevent queue creation request from being sent twice
+ m_szSyncToken.Empty();
+ delSetting(DBKEY_SYNC_TOKEN);
+ MqttQueueConnect();
+ }
+ return;
+ }
+
CMStringW str = root["lastIssuedSeqId"].as_mstring();
if (!str.IsEmpty()) {
setWString(DBKEY_SID, str);