From a706ec44ad2e6260f9e2879df62762e1f1f21ec7 Mon Sep 17 00:00:00 2001 From: Gluzskiy Alexandr <sss@sss.chaoslab.ru> Date: Sun, 1 Apr 2018 18:23:57 +0300 Subject: protocols: jabber: xep-0198 - few logic bugs fixed - started resumption implementation --- protocols/JabberG/src/jabber_strm_mgmt.cpp | 135 ++++++++++++++++++++--------- protocols/JabberG/src/jabber_strm_mgmt.h | 4 +- protocols/JabberG/src/jabber_thread.cpp | 2 + 3 files changed, 98 insertions(+), 43 deletions(-) (limited to 'protocols') diff --git a/protocols/JabberG/src/jabber_strm_mgmt.cpp b/protocols/JabberG/src/jabber_strm_mgmt.cpp index aff05bf828..f6227a803c 100755 --- a/protocols/JabberG/src/jabber_strm_mgmt.cpp +++ b/protocols/JabberG/src/jabber_strm_mgmt.cpp @@ -25,7 +25,8 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. strm_mgmt::strm_mgmt(CJabberProto *_proto) : proto(_proto), m_bStrmMgmtPendingEnable(false), m_bStrmMgmtEnabled(false), -m_bStrmMgmtResumeSupported(false) +m_bStrmMgmtResumeSupported(false), +bSessionResumed(false) { } @@ -34,23 +35,58 @@ void strm_mgmt::OnProcessEnabled(HXML node, ThreadData * /*info*/) { m_bStrmMgmtEnabled = true; auto val = XmlGetAttrValue(node, L"max"); - m_nStrmMgmtResumeMaxSeconds = _wtoi(val); + if(val) + m_nStrmMgmtResumeMaxSeconds = _wtoi(val); val = XmlGetAttrValue(node, L"resume"); - if (mir_wstrcmp(val, L"true") || mir_wstrcmp(val, L"1")) - m_bStrmMgmtResumeSupported = true; - - m_sStrmMgmtResumeId = XmlGetAttrValue(node, L"id"); + if (val) + { + if (mir_wstrcmp(val, L"true") || mir_wstrcmp(val, L"1")) + { + m_bStrmMgmtResumeSupported = true; + m_sStrmMgmtResumeId = XmlGetAttrValue(node, L"id"); + } + } m_nStrmMgmtLocalHCount = 0; m_nStrmMgmtSrvHCount = 0; } +void strm_mgmt::OnProcessResumed(HXML node, ThreadData * /*info*/) +{ + if (!mir_wstrcmp(XmlGetAttrValue(node, L"xmlns"), L"urn:xmpp:sm:3")) + { + auto var = XmlGetAttrValue(node, L"previd"); + if (!var) + return; + if (m_sStrmMgmtResumeId != var) + return; //TODO: unknown session, what we should do ? + var = XmlGetAttrValue(node, L"h"); + if (!var) + return; + bSessionResumed = true; + m_nStrmMgmtSrvHCount = _wtoi(var); + int size = m_nStrmMgmtLocalSCount - m_nStrmMgmtSrvHCount; + if (size < 0) + { + //TODO: this should never happen, indicates server side bug + //TODO: once our client side implementation good enough, abort stream in this case, noop for now + } + else if (size > 0 && !NodeCache.empty()) //TODO: NodeCache cannot be empty if size >0, it's a bug + ResendNodes(size); + else + { + for (auto i : NodeCache) + xmlFree(i); + NodeCache.clear(); + } + } +} + void strm_mgmt::OnProcessSMa(HXML node) { if (!mir_wstrcmp(XmlGetAttrValue(node, L"xmlns"), L"urn:xmpp:sm:3")) { auto val = XmlGetAttrValue(node, L"h"); - uint32_t iVal = _wtoi(val); - m_nStrmMgmtSrvHCount = iVal; + m_nStrmMgmtSrvHCount = _wtoi(val); int size = m_nStrmMgmtLocalSCount - m_nStrmMgmtSrvHCount; if (size < 0) { @@ -58,44 +94,48 @@ void strm_mgmt::OnProcessSMa(HXML node) //TODO: once our client side implementation good enough, abort stream in this case, noop for now } else if (size > 0 && !NodeCache.empty()) //TODO: NodeCache cannot be empty if size >0, it's a bug + ResendNodes(size); + else { - if (size <= NodeCache.size()) //TODO: size should not be larger than NodeCache.size(), another bug + for (auto i : NodeCache) + xmlFree(i); + NodeCache.clear(); + } + } +} + +void strm_mgmt::ResendNodes(uint32_t size) +{ + if (size > NodeCache.size()) //TODO: size should not be larger than NodeCache.size(), another bug + { + const size_t diff = NodeCache.size() - size; + if (diff) + { + size_t diff_tmp = diff; + for (auto i : NodeCache) { - const size_t diff = NodeCache.size() - size; - if (diff) + if (diff_tmp > 0) { - size_t diff_tmp = diff; - for (auto i : NodeCache) - { - if (diff_tmp > 0) - { - xmlFree(i); - diff_tmp--; - } - } - diff_tmp = diff; - while (diff_tmp) - { - NodeCache.pop_front(); - diff_tmp--; - } + xmlFree(i); + diff_tmp--; } } - std::list<HXML> tmp_list = NodeCache; - NodeCache.clear(); - for (auto i : tmp_list) + diff_tmp = diff; + while (diff_tmp) { - proto->m_ThreadInfo->send_no_strm_mgmt(i); //freed by send ? - //xmlFree(i); + NodeCache.pop_front(); + diff_tmp--; } } - else - { - for (auto i : NodeCache) - xmlFree(i); - NodeCache.clear(); - } } + std::list<HXML> tmp_list = NodeCache; + NodeCache.clear(); + for (auto i : tmp_list) + { + proto->m_ThreadInfo->send_no_strm_mgmt(i); //freed by send ? + //xmlFree(i); + } + } void strm_mgmt::OnProcessSMr(HXML node) @@ -169,11 +209,22 @@ void strm_mgmt::HandleIncommingNode(HXML node) void strm_mgmt::EnableStrmMgmt() { - XmlNode enable_sm(L"enable"); - XmlAddAttr(enable_sm, L"xmlns", L"urn:xmpp:sm:3"); - XmlAddAttr(enable_sm, L"resume", L"true"); //enable resumption (most useful part of this xep) - proto->m_ThreadInfo->send(enable_sm); - m_nStrmMgmtLocalSCount = 1; //TODO: this MUST be 0, i have bug somewhere. + if (m_sStrmMgmtResumeId.empty()) + { + XmlNode enable_sm(L"enable"); + XmlAddAttr(enable_sm, L"xmlns", L"urn:xmpp:sm:3"); + XmlAddAttr(enable_sm, L"resume", L"true"); //enable resumption (most useful part of this xep) + proto->m_ThreadInfo->send(enable_sm); + m_nStrmMgmtLocalSCount = 1; //TODO: this MUST be 0, i have bug somewhere. + } + else + { + XmlNode enable_sm(L"resume"); + XmlAddAttr(enable_sm, L"xmlns", L"urn:xmpp:sm:3"); + xmlAddAttrInt(enable_sm, L"h", m_nStrmMgmtLocalHCount); + XmlAddAttr(enable_sm, L"previd", m_sStrmMgmtResumeId.c_str()); + proto->m_ThreadInfo->send(enable_sm); + } } void strm_mgmt::SendAck() diff --git a/protocols/JabberG/src/jabber_strm_mgmt.h b/protocols/JabberG/src/jabber_strm_mgmt.h index 2bb770df77..10a659cde3 100755 --- a/protocols/JabberG/src/jabber_strm_mgmt.h +++ b/protocols/JabberG/src/jabber_strm_mgmt.h @@ -31,12 +31,13 @@ class strm_mgmt { void OnProcessSMa(HXML node); void OnProcessSMr(HXML node); + void ResendNodes(uint32_t count); CJabberProto *proto; uint32_t m_nStrmMgmtSrvHCount, m_nStrmMgmtLocalHCount, m_nStrmMgmtLocalSCount, m_nStrmMgmtResumeMaxSeconds; const uint32_t m_nStrmMgmtCacheSize = 10; - bool m_bStrmMgmtPendingEnable, m_bStrmMgmtEnabled, m_bStrmMgmtResumeSupported; + bool m_bStrmMgmtPendingEnable, m_bStrmMgmtEnabled, m_bStrmMgmtResumeSupported, bSessionResumed; std::wstring m_sStrmMgmtResumeId; std::list<HXML> NodeCache; @@ -46,6 +47,7 @@ public: void HandleOutgoingNode(HXML node); void HandleIncommingNode(HXML node); void OnProcessEnabled(HXML node, ThreadData *info); + void OnProcessResumed(HXML node, ThreadData *info); void OnProcessFailed(HXML node, ThreadData * info); void CheckStreamFeatures(HXML node); void CheckState(); diff --git a/protocols/JabberG/src/jabber_thread.cpp b/protocols/JabberG/src/jabber_thread.cpp index d79a2d72e0..77f7ab0d7d 100755 --- a/protocols/JabberG/src/jabber_thread.cpp +++ b/protocols/JabberG/src/jabber_thread.cpp @@ -900,6 +900,8 @@ void CJabberProto::OnProcessProtocol(HXML node, ThreadData *info) OnProcessFailed(node, info); else if (!mir_wstrcmp(XmlGetName(node), L"enabled")) OnProcessEnabled(node, info); + else if (m_bEnableStreamMgmt && !mir_wstrcmp(XmlGetName(node), L"resumed")) + m_StrmMgmt.OnProcessResumed(node, info); else debugLogA("Invalid top-level tag (only <message/> <presence/> and <iq/> allowed)"); } -- cgit v1.2.3