diff options
Diffstat (limited to 'protocols')
-rw-r--r-- | protocols/Facebook/src/mqtt.cpp | 25 | ||||
-rw-r--r-- | protocols/Facebook/src/proto.h | 3 | ||||
-rw-r--r-- | protocols/Facebook/src/server.cpp | 31 |
3 files changed, 42 insertions, 17 deletions
diff --git a/protocols/Facebook/src/mqtt.cpp b/protocols/Facebook/src/mqtt.cpp index 8d468ece89..00ebf341a4 100644 --- a/protocols/Facebook/src/mqtt.cpp +++ b/protocols/Facebook/src/mqtt.cpp @@ -308,3 +308,28 @@ void FacebookProto::MqttUnsubscribe(const char *topic, ...) MqttSend(payload); } + +///////////////////////////////////////////////////////////////////////////////////////// +// MQTT queue + +void FacebookProto::MqttQueueConnect() +{ + JSONNode query; + query << INT_PARAM("delta_batch_size", 125) << INT_PARAM("max_deltas_able_to_process", 1000) << INT_PARAM("sync_api_version", 3) << CHAR_PARAM("encoding", "JSON"); + if (m_szSyncToken.IsEmpty()) { + JSONNode hashes; hashes.set_name("graphql_query_hashes"); hashes << CHAR_PARAM("xma_query_id", __STRINGIFY(FB_API_QUERY_XMA)); + + JSONNode xma; xma.set_name(__STRINGIFY(FB_API_QUERY_XMA)); xma << CHAR_PARAM("xma_id", "<ID>"); + JSONNode hql; hql.set_name("graphql_query_params"); hql << xma; + + JSONNode params; params.set_name("queue_params"); + params << CHAR_PARAM("buzz_on_deltas_enabled", "false") << hashes << hql; + + query << INT64_PARAM("initial_titan_sequence_id", m_sid) << CHAR_PARAM("device_id", m_szDeviceID) << INT64_PARAM("entity_fbid", m_uid) << params; + MqttPublish("/messenger_sync_create_queue", query.write().c_str()); + } + else { + query << INT64_PARAM("last_seq_id", m_sid) << CHAR_PARAM("sync_token", m_szSyncToken); + MqttPublish("/messenger_sync_get_diffs", query.write().c_str()); + } +} diff --git a/protocols/Facebook/src/proto.h b/protocols/Facebook/src/proto.h index 4015244ac6..2157642a06 100644 --- a/protocols/Facebook/src/proto.h +++ b/protocols/Facebook/src/proto.h @@ -413,6 +413,8 @@ class FacebookProto : public PROTO<FacebookProto> bool MqttParse(const MqttMessage &payload); void MqttSend(const MqttMessage &payload); + void MqttQueueConnect(); + void OnPublish(const char *str, const uint8_t *payLoad, size_t cbLen); void OnPublishMessage(FbThriftReader &rdr); void OnPublishPresence(FbThriftReader &rdr); @@ -433,6 +435,7 @@ class FacebookProto : public PROTO<FacebookProto> int m_iUnread; bool m_invisible; bool m_bOnline; + bool m_QueueCreated; CMStringA m_szAuthToken; // calculated diff --git a/protocols/Facebook/src/server.cpp b/protocols/Facebook/src/server.cpp index cf6ac2f2d7..43c12a51c4 100644 --- a/protocols/Facebook/src/server.cpp +++ b/protocols/Facebook/src/server.cpp @@ -65,24 +65,8 @@ void FacebookProto::OnLoggedIn() m_impl.m_heartBeat.Start(60000); // connect message queue - JSONNode query; - query << INT_PARAM("delta_batch_size", 125) << INT_PARAM("max_deltas_able_to_process", 1000) << INT_PARAM("sync_api_version", 3) << CHAR_PARAM("encoding", "JSON"); - if (m_szSyncToken.IsEmpty()) { - JSONNode hashes; hashes.set_name("graphql_query_hashes"); hashes << CHAR_PARAM("xma_query_id", __STRINGIFY(FB_API_QUERY_XMA)); + MqttQueueConnect(); - JSONNode xma; xma.set_name(__STRINGIFY(FB_API_QUERY_XMA)); xma << CHAR_PARAM("xma_id", "<ID>"); - JSONNode hql; hql.set_name("graphql_query_params"); hql << xma; - - JSONNode params; params.set_name("queue_params"); - params << CHAR_PARAM("buzz_on_deltas_enabled", "false") << hashes << hql; - - query << INT64_PARAM("initial_titan_sequence_id", m_sid) << CHAR_PARAM("device_id", m_szDeviceID) << INT64_PARAM("entity_fbid", m_uid) << params; - MqttPublish("/messenger_sync_create_queue", query.write().c_str()); - } - else { - query << INT64_PARAM("last_seq_id", m_sid) << CHAR_PARAM("sync_token", m_szSyncToken); - MqttPublish("/messenger_sync_get_diffs", query.write().c_str()); - } } void FacebookProto::OnLoggedOut() @@ -198,6 +182,8 @@ bool FacebookProto::RefreshToken() void FacebookProto::ServerThread(void *) { + m_QueueCreated = false; + LBL_Begin: m_szAuthToken = getMStringA(DBKEY_TOKEN); if (m_szAuthToken.IsEmpty()) { @@ -393,6 +379,17 @@ void FacebookProto::OnPublishMessage(FbThriftReader &rdr) debugLogA("MS: <%s>", szJson.c_str()); JSONNode root = JSONNode::parse(szJson); + CMStringA errorCode = root["errorCode"].as_mstring(); + if (!errorCode.IsEmpty()) { + if (!m_QueueCreated && (errorCode == "ERROR_QUEUE_NOT_FOUND" || errorCode == "ERROR_QUEUE_LOST" )) { + m_QueueCreated = true; // prevent queue creation request from being sent twice + m_szSyncToken.Empty(); + delSetting(DBKEY_SYNC_TOKEN); + MqttQueueConnect(); + } + return; + } + CMStringW str = root["lastIssuedSeqId"].as_mstring(); if (!str.IsEmpty()) { setWString(DBKEY_SID, str); |