From 5df0729e23a1c38bdda34c15fdce3810475c0e4f Mon Sep 17 00:00:00 2001 From: George Hazan Date: Thu, 16 Mar 2023 17:49:18 +0300 Subject: Jabber: stream management fix --- protocols/JabberG/src/jabber_strm_mgmt.cpp | 128 +++++++++++++---------------- protocols/JabberG/src/jabber_strm_mgmt.h | 6 +- 2 files changed, 59 insertions(+), 75 deletions(-) diff --git a/protocols/JabberG/src/jabber_strm_mgmt.cpp b/protocols/JabberG/src/jabber_strm_mgmt.cpp index a081570cfe..4f96076ac7 100644 --- a/protocols/JabberG/src/jabber_strm_mgmt.cpp +++ b/protocols/JabberG/src/jabber_strm_mgmt.cpp @@ -48,7 +48,6 @@ void strm_mgmt::OnProcessEnabled(const TiXmlElement *node, ThreadData * /*info*/ } //TODO: handle 'location' m_nLocalHCount = 0; - m_nSrvHCount = 0; } void strm_mgmt::OnProcessResumed(const TiXmlElement *node, ThreadData * /*info*/) @@ -63,33 +62,14 @@ void strm_mgmt::OnProcessResumed(const TiXmlElement *node, ThreadData * /*info*/ if (m_sResumeId != var) return; //TODO: unknown session, what we should do ? - var = XmlGetAttr(node, "h"); - if (!var) - return; - m_bSessionResumed = m_bEnabled = true; m_bPendingEnable = false; m_tConnLostTime = 0; - m_nSrvHCount = atoi(var); - int size = m_nLocalSCount - m_nSrvHCount; //FinishLoginProcess(info); proto->OnLoggedIn(); proto->ProtoBroadcastAck(0, ACKTYPE_STATUS, ACKRESULT_SUCCESS, (HANDLE)proto->m_iStatus, proto->m_iDesiredStatus); - - if (size < 0) { - proto->debugLogA("strm_mgmt: error: locally sent nodes count %d, server side received count %d", m_nLocalSCount, m_nSrvHCount); - m_nLocalSCount = m_nSrvHCount; //temporary workaround - //TODO: this should never happen, indicates server side bug - //TODO: once our client side implementation good enough, abort stream in this case, noop for now - } - else if (size > 0 && !NodeCache.empty()) //TODO: NodeCache cannot be empty if size >0, it's a bug - ResendNodes(size); - else { - for (auto i : NodeCache) - xmlStorage.DeleteNode(i); - NodeCache.clear(); - } + ProcessCache(node->IntAttribute("h", -1), true); } void strm_mgmt::OnProcessSMa(const TiXmlElement *node) @@ -97,50 +77,43 @@ void strm_mgmt::OnProcessSMa(const TiXmlElement *node) if (mir_strcmp(XmlGetAttr(node, "xmlns"), "urn:xmpp:sm:3")) return; - m_nSrvHCount = node->IntAttribute("h"); - proto->debugLogA("strm_mgmt: info: locally sent nodes count %d, server side received count %d", m_nLocalSCount, m_nSrvHCount); - int size = m_nLocalSCount - m_nSrvHCount; - if (size < 0) { - proto->debugLogA("strm_mgmt: error: locally sent nodes count %d, server side received count %d", m_nLocalSCount, m_nSrvHCount); - m_nLocalSCount = m_nSrvHCount; //temporary workaround - //TODO: this should never happen, indicates server side bug - //TODO: once our client side implementation good enough, abort stream in this case, noop for now - } - else if (size > 0 && !NodeCache.empty()) //TODO: NodeCache cannot be empty if size >0, it's a bug - ResendNodes(size); - else { - for (auto i : NodeCache) - xmlStorage.DeleteNode(i); - NodeCache.clear(); - } + if (!m_bRequestPending) + return; + m_bRequestPending = false; + ProcessCache(node->IntAttribute("h", -1), false); } -void strm_mgmt::ResendNodes(uint32_t size) +void strm_mgmt::ProcessCache(uint32_t nSrvHCount, bool resuming) { - proto->debugLogA("strm_mgmt: info: resending %d missed nodes", size); - if (size < NodeCache.size()) { - proto->debugLogA("strm_mgmt: info: resending nodes: need to resend %d nodes, nodes in cache %d, cleaning cache to match resending node count", size, NodeCache.size()); - const size_t diff = NodeCache.size() - size; - if (diff) { - size_t diff_tmp = diff; - for (auto i : NodeCache) { - if (diff_tmp > 0) { - xmlStorage.DeleteNode(i); - diff_tmp--; - } - } - diff_tmp = diff; - while (diff_tmp) { - NodeCache.pop_front(); - diff_tmp--; - } - } + int lost = m_nReqLocalSCount - nSrvHCount; + int untracked = m_nLocalSCount - m_nReqLocalSCount; + + if (nSrvHCount < 0 || m_nReqLocalSCount < 0 + || lost < 0 || untracked < 0 || lost + untracked > NodeCache.size()) { + proto->debugLogA("strm_mgmt: error: a bug or wrong ack node, resetting cache"); + for (auto it : NodeCache) + xmlStorage.DeleteNode(it); + NodeCache.clear(); + return; } - std::list tmp_list = NodeCache; - NodeCache.clear(); - m_nLocalSCount = m_nSrvHCount; //we have handled missed nodes, set our counter to match server side value - for (auto i : tmp_list) - proto->m_ThreadInfo->send(i); + + proto->debugLogA("strm_mgmt: info: lost %d, untracked %d, cache_size %d", lost, untracked, NodeCache.size()); + + //delete ack'ed nodes from the cache + int todelete = (int)NodeCache.size() - (lost + untracked); + for (int i = 0; i < todelete; i++) { + xmlStorage.DeleteNode(NodeCache.front()); + NodeCache.pop_front(); + } + + int toresend = resuming ? lost + untracked : lost; + for (int i = 0; i < toresend; i++) { + TiXmlElement *it = NodeCache.front(); + proto->m_ThreadInfo->send(it); + xmlStorage.DeleteNode(it); + NodeCache.pop_front(); + } + proto->debugLogA("strm_mgmt: info: deleting %d nodes, resending %d nodes", todelete, toresend); } void strm_mgmt::OnProcessSMr(const TiXmlElement *node) @@ -156,7 +129,7 @@ void strm_mgmt::OnProcessFailed(const TiXmlElement *node, ThreadData *info) //us proto->debugLogA("strm_mgmt: error: Failed to resume session %s", m_sResumeId.c_str()); - m_bEnabled = false; + m_bEnabled = m_bHalfEnabled = false; m_bSessionResumed = false; m_sResumeId.clear(); @@ -192,7 +165,7 @@ void strm_mgmt::CheckState() void strm_mgmt::HandleOutgoingNode(TiXmlElement *node) { - if (!m_bEnabled) + if (!m_bHalfEnabled) return; auto *pNodeCopy = node->DeepClone(&xmlStorage)->ToElement(); @@ -201,19 +174,20 @@ void strm_mgmt::HandleOutgoingNode(TiXmlElement *node) m_nLocalSCount++; NodeCache.push_back(pNodeCopy); - if ((m_nLocalSCount - m_nSrvHCount) >= CACHE_SIZE || m_nLocalSCount % 3 == 0) + if (NodeCache.size() >= CACHE_SIZE) RequestAck(); } void strm_mgmt::ResetState() { // reset state of stream management - m_bEnabled = m_bPendingEnable = false; + m_bEnabled = m_bPendingEnable = m_bHalfEnabled = m_bRequestPending = false; + m_nReqLocalSCount = 0; m_nResumeMaxSeconds = 0; m_tConnLostTime = 0; // reset stream management h counters - m_nLocalHCount = m_nLocalSCount = m_nSrvHCount = 0; + m_nLocalHCount = m_nLocalSCount = 0; // clear resume id m_sResumeId.clear(); @@ -221,7 +195,8 @@ void strm_mgmt::ResetState() void strm_mgmt::HandleConnectionLost() { - m_bEnabled = false; + m_bEnabled = m_bHalfEnabled = m_bRequestPending = false; + m_nReqLocalSCount = 0; m_tConnLostTime = time(0); } @@ -245,13 +220,14 @@ void strm_mgmt::EnableStrmMgmt() { if (m_bEnabled) return; + m_bHalfEnabled = true; if (m_sResumeId.empty()) { XmlNode enable_sm("enable"); XmlAddAttr(enable_sm, "xmlns", "urn:xmpp:sm:3"); XmlAddAttr(enable_sm, "resume", "true"); // enable resumption (most useful part of this xep) proto->m_ThreadInfo->send(enable_sm); - m_nLocalSCount = 1; // TODO: this MUST be 0, i have bug somewhere, feel free to fix it. + m_nLocalSCount = 0; } else { // resume session XmlNode enable_sm("resume"); @@ -266,9 +242,9 @@ void strm_mgmt::SendAck() return; proto->debugLogA("strm_mgmt: info: sending ack: locally received node count %d", m_nLocalHCount); - XmlNode enable_sm("a"); - enable_sm << XATTR("xmlns", "urn:xmpp:sm:3") << XATTRI("h", m_nLocalHCount); - proto->m_ThreadInfo->send_no_strm_mgmt(enable_sm); + XmlNode ack_node("a"); + ack_node << XATTR("xmlns", "urn:xmpp:sm:3") << XATTRI("h", m_nLocalHCount); + proto->m_ThreadInfo->send_no_strm_mgmt(ack_node); } void strm_mgmt::RequestAck() @@ -276,8 +252,14 @@ void strm_mgmt::RequestAck() if (!m_bEnabled) return; - XmlNode enable_sm("r"); enable_sm << XATTR("xmlns", "urn:xmpp:sm:3"); - proto->m_ThreadInfo->send_no_strm_mgmt(enable_sm); + if (!m_bRequestPending) { + // We should save m_nLocalSCount here bc the server acknowlages stanza count for the moment when it receives + // NOT for the moment it sends + m_bRequestPending = true; + m_nReqLocalSCount = m_nLocalSCount; + XmlNode req_node("r"); req_node << XATTR("xmlns", "urn:xmpp:sm:3"); + proto->m_ThreadInfo->send_no_strm_mgmt(req_node); + } } bool strm_mgmt::IsSessionResumed() diff --git a/protocols/JabberG/src/jabber_strm_mgmt.h b/protocols/JabberG/src/jabber_strm_mgmt.h index 158c72fefb..fc4f898af3 100644 --- a/protocols/JabberG/src/jabber_strm_mgmt.h +++ b/protocols/JabberG/src/jabber_strm_mgmt.h @@ -33,18 +33,20 @@ class strm_mgmt { void OnProcessSMa(const TiXmlElement *node); void OnProcessSMr(const TiXmlElement *node); - void ResendNodes(uint32_t count); void FinishLoginProcess(ThreadData *info); + void ProcessCache(uint32_t nSrvHCount, bool resuming); CJabberProto *proto; TiXmlDocument xmlStorage; bool m_bEnabled; + bool m_bHalfEnabled; bool m_bPendingEnable; bool m_bResumeSupported = false; bool m_bSessionResumed = false; + bool m_bRequestPending = false; - int m_nSrvHCount, m_nLocalHCount, m_nLocalSCount, m_nResumeMaxSeconds; + int m_nLocalHCount, m_nLocalSCount, m_nReqLocalSCount, m_nResumeMaxSeconds; time_t m_tConnLostTime; std::string m_sResumeId; std::list NodeCache; -- cgit v1.2.3