diff options
-rw-r--r-- | protocols/JabberG/src/jabber_strm_mgmt.cpp | 128 | ||||
-rw-r--r-- | protocols/JabberG/src/jabber_strm_mgmt.h | 6 |
2 files changed, 59 insertions, 75 deletions
diff --git a/protocols/JabberG/src/jabber_strm_mgmt.cpp b/protocols/JabberG/src/jabber_strm_mgmt.cpp index a081570cfe..4f96076ac7 100644 --- a/protocols/JabberG/src/jabber_strm_mgmt.cpp +++ b/protocols/JabberG/src/jabber_strm_mgmt.cpp @@ -48,7 +48,6 @@ void strm_mgmt::OnProcessEnabled(const TiXmlElement *node, ThreadData * /*info*/ }
//TODO: handle 'location'
m_nLocalHCount = 0;
- m_nSrvHCount = 0;
}
void strm_mgmt::OnProcessResumed(const TiXmlElement *node, ThreadData * /*info*/)
@@ -63,33 +62,14 @@ void strm_mgmt::OnProcessResumed(const TiXmlElement *node, ThreadData * /*info*/ if (m_sResumeId != var)
return; //TODO: unknown session, what we should do ?
- var = XmlGetAttr(node, "h");
- if (!var)
- return;
-
m_bSessionResumed = m_bEnabled = true;
m_bPendingEnable = false;
m_tConnLostTime = 0;
- m_nSrvHCount = atoi(var);
- int size = m_nLocalSCount - m_nSrvHCount;
//FinishLoginProcess(info);
proto->OnLoggedIn();
proto->ProtoBroadcastAck(0, ACKTYPE_STATUS, ACKRESULT_SUCCESS, (HANDLE)proto->m_iStatus, proto->m_iDesiredStatus);
-
- if (size < 0) {
- proto->debugLogA("strm_mgmt: error: locally sent nodes count %d, server side received count %d", m_nLocalSCount, m_nSrvHCount);
- m_nLocalSCount = m_nSrvHCount; //temporary workaround
- //TODO: this should never happen, indicates server side bug
- //TODO: once our client side implementation good enough, abort stream in this case, noop for now
- }
- else if (size > 0 && !NodeCache.empty()) //TODO: NodeCache cannot be empty if size >0, it's a bug
- ResendNodes(size);
- else {
- for (auto i : NodeCache)
- xmlStorage.DeleteNode(i);
- NodeCache.clear();
- }
+ ProcessCache(node->IntAttribute("h", -1), true);
}
void strm_mgmt::OnProcessSMa(const TiXmlElement *node)
@@ -97,50 +77,43 @@ void strm_mgmt::OnProcessSMa(const TiXmlElement *node) if (mir_strcmp(XmlGetAttr(node, "xmlns"), "urn:xmpp:sm:3"))
return;
- m_nSrvHCount = node->IntAttribute("h");
- proto->debugLogA("strm_mgmt: info: locally sent nodes count %d, server side received count %d", m_nLocalSCount, m_nSrvHCount);
- int size = m_nLocalSCount - m_nSrvHCount;
- if (size < 0) {
- proto->debugLogA("strm_mgmt: error: locally sent nodes count %d, server side received count %d", m_nLocalSCount, m_nSrvHCount);
- m_nLocalSCount = m_nSrvHCount; //temporary workaround
- //TODO: this should never happen, indicates server side bug
- //TODO: once our client side implementation good enough, abort stream in this case, noop for now
- }
- else if (size > 0 && !NodeCache.empty()) //TODO: NodeCache cannot be empty if size >0, it's a bug
- ResendNodes(size);
- else {
- for (auto i : NodeCache)
- xmlStorage.DeleteNode(i);
- NodeCache.clear();
- }
+ if (!m_bRequestPending)
+ return;
+ m_bRequestPending = false;
+ ProcessCache(node->IntAttribute("h", -1), false);
}
-void strm_mgmt::ResendNodes(uint32_t size)
+void strm_mgmt::ProcessCache(uint32_t nSrvHCount, bool resuming)
{
- proto->debugLogA("strm_mgmt: info: resending %d missed nodes", size);
- if (size < NodeCache.size()) {
- proto->debugLogA("strm_mgmt: info: resending nodes: need to resend %d nodes, nodes in cache %d, cleaning cache to match resending node count", size, NodeCache.size());
- const size_t diff = NodeCache.size() - size;
- if (diff) {
- size_t diff_tmp = diff;
- for (auto i : NodeCache) {
- if (diff_tmp > 0) {
- xmlStorage.DeleteNode(i);
- diff_tmp--;
- }
- }
- diff_tmp = diff;
- while (diff_tmp) {
- NodeCache.pop_front();
- diff_tmp--;
- }
- }
+ int lost = m_nReqLocalSCount - nSrvHCount;
+ int untracked = m_nLocalSCount - m_nReqLocalSCount;
+
+ if (nSrvHCount < 0 || m_nReqLocalSCount < 0
+ || lost < 0 || untracked < 0 || lost + untracked > NodeCache.size()) {
+ proto->debugLogA("strm_mgmt: error: a bug or wrong ack node, resetting cache");
+ for (auto it : NodeCache)
+ xmlStorage.DeleteNode(it);
+ NodeCache.clear();
+ return;
}
- std::list<TiXmlElement *> tmp_list = NodeCache;
- NodeCache.clear();
- m_nLocalSCount = m_nSrvHCount; //we have handled missed nodes, set our counter to match server side value
- for (auto i : tmp_list)
- proto->m_ThreadInfo->send(i);
+
+ proto->debugLogA("strm_mgmt: info: lost %d, untracked %d, cache_size %d", lost, untracked, NodeCache.size());
+
+ //delete ack'ed nodes from the cache
+ int todelete = (int)NodeCache.size() - (lost + untracked);
+ for (int i = 0; i < todelete; i++) {
+ xmlStorage.DeleteNode(NodeCache.front());
+ NodeCache.pop_front();
+ }
+
+ int toresend = resuming ? lost + untracked : lost;
+ for (int i = 0; i < toresend; i++) {
+ TiXmlElement *it = NodeCache.front();
+ proto->m_ThreadInfo->send(it);
+ xmlStorage.DeleteNode(it);
+ NodeCache.pop_front();
+ }
+ proto->debugLogA("strm_mgmt: info: deleting %d nodes, resending %d nodes", todelete, toresend);
}
void strm_mgmt::OnProcessSMr(const TiXmlElement *node)
@@ -156,7 +129,7 @@ void strm_mgmt::OnProcessFailed(const TiXmlElement *node, ThreadData *info) //us proto->debugLogA("strm_mgmt: error: Failed to resume session %s", m_sResumeId.c_str());
- m_bEnabled = false;
+ m_bEnabled = m_bHalfEnabled = false;
m_bSessionResumed = false;
m_sResumeId.clear();
@@ -192,7 +165,7 @@ void strm_mgmt::CheckState() void strm_mgmt::HandleOutgoingNode(TiXmlElement *node)
{
- if (!m_bEnabled)
+ if (!m_bHalfEnabled)
return;
auto *pNodeCopy = node->DeepClone(&xmlStorage)->ToElement();
@@ -201,19 +174,20 @@ void strm_mgmt::HandleOutgoingNode(TiXmlElement *node) m_nLocalSCount++;
NodeCache.push_back(pNodeCopy);
- if ((m_nLocalSCount - m_nSrvHCount) >= CACHE_SIZE || m_nLocalSCount % 3 == 0)
+ if (NodeCache.size() >= CACHE_SIZE)
RequestAck();
}
void strm_mgmt::ResetState()
{
// reset state of stream management
- m_bEnabled = m_bPendingEnable = false;
+ m_bEnabled = m_bPendingEnable = m_bHalfEnabled = m_bRequestPending = false;
+ m_nReqLocalSCount = 0;
m_nResumeMaxSeconds = 0;
m_tConnLostTime = 0;
// reset stream management h counters
- m_nLocalHCount = m_nLocalSCount = m_nSrvHCount = 0;
+ m_nLocalHCount = m_nLocalSCount = 0;
// clear resume id
m_sResumeId.clear();
@@ -221,7 +195,8 @@ void strm_mgmt::ResetState() void strm_mgmt::HandleConnectionLost()
{
- m_bEnabled = false;
+ m_bEnabled = m_bHalfEnabled = m_bRequestPending = false;
+ m_nReqLocalSCount = 0;
m_tConnLostTime = time(0);
}
@@ -245,13 +220,14 @@ void strm_mgmt::EnableStrmMgmt() {
if (m_bEnabled)
return;
+ m_bHalfEnabled = true;
if (m_sResumeId.empty()) {
XmlNode enable_sm("enable");
XmlAddAttr(enable_sm, "xmlns", "urn:xmpp:sm:3");
XmlAddAttr(enable_sm, "resume", "true"); // enable resumption (most useful part of this xep)
proto->m_ThreadInfo->send(enable_sm);
- m_nLocalSCount = 1; // TODO: this MUST be 0, i have bug somewhere, feel free to fix it.
+ m_nLocalSCount = 0;
}
else { // resume session
XmlNode enable_sm("resume");
@@ -266,9 +242,9 @@ void strm_mgmt::SendAck() return;
proto->debugLogA("strm_mgmt: info: sending ack: locally received node count %d", m_nLocalHCount);
- XmlNode enable_sm("a");
- enable_sm << XATTR("xmlns", "urn:xmpp:sm:3") << XATTRI("h", m_nLocalHCount);
- proto->m_ThreadInfo->send_no_strm_mgmt(enable_sm);
+ XmlNode ack_node("a");
+ ack_node << XATTR("xmlns", "urn:xmpp:sm:3") << XATTRI("h", m_nLocalHCount);
+ proto->m_ThreadInfo->send_no_strm_mgmt(ack_node);
}
void strm_mgmt::RequestAck()
@@ -276,8 +252,14 @@ void strm_mgmt::RequestAck() if (!m_bEnabled)
return;
- XmlNode enable_sm("r"); enable_sm << XATTR("xmlns", "urn:xmpp:sm:3");
- proto->m_ThreadInfo->send_no_strm_mgmt(enable_sm);
+ if (!m_bRequestPending) {
+ // We should save m_nLocalSCount here bc the server acknowlages stanza count for the moment when it receives <r>
+ // NOT for the moment it sends <a>
+ m_bRequestPending = true;
+ m_nReqLocalSCount = m_nLocalSCount;
+ XmlNode req_node("r"); req_node << XATTR("xmlns", "urn:xmpp:sm:3");
+ proto->m_ThreadInfo->send_no_strm_mgmt(req_node);
+ }
}
bool strm_mgmt::IsSessionResumed()
diff --git a/protocols/JabberG/src/jabber_strm_mgmt.h b/protocols/JabberG/src/jabber_strm_mgmt.h index 158c72fefb..fc4f898af3 100644 --- a/protocols/JabberG/src/jabber_strm_mgmt.h +++ b/protocols/JabberG/src/jabber_strm_mgmt.h @@ -33,18 +33,20 @@ class strm_mgmt {
void OnProcessSMa(const TiXmlElement *node);
void OnProcessSMr(const TiXmlElement *node);
- void ResendNodes(uint32_t count);
void FinishLoginProcess(ThreadData *info);
+ void ProcessCache(uint32_t nSrvHCount, bool resuming);
CJabberProto *proto;
TiXmlDocument xmlStorage;
bool m_bEnabled;
+ bool m_bHalfEnabled;
bool m_bPendingEnable;
bool m_bResumeSupported = false;
bool m_bSessionResumed = false;
+ bool m_bRequestPending = false;
- int m_nSrvHCount, m_nLocalHCount, m_nLocalSCount, m_nResumeMaxSeconds;
+ int m_nLocalHCount, m_nLocalSCount, m_nReqLocalSCount, m_nResumeMaxSeconds;
time_t m_tConnLostTime;
std::string m_sResumeId;
std::list<TiXmlElement*> NodeCache;
|