From 004f3d1f49c54bc62743a838161ac157ffc37e41 Mon Sep 17 00:00:00 2001 From: George Hazan Date: Thu, 13 Jun 2024 16:39:12 +0300 Subject: websocket internal code went to MWebSocket --- include/m_netlib.h | 68 +++++++-- libs/win32/mir_app.lib | Bin 295310 -> 298040 bytes libs/win64/mir_app.lib | Bin 295140 -> 298056 bytes protocols/Discord/src/connection.cpp | 4 +- protocols/Discord/src/gateway.cpp | 155 ++++----------------- protocols/Discord/src/proto.cpp | 4 +- protocols/Discord/src/proto.h | 16 +-- protocols/Discord/src/voice_client.cpp | 159 +++------------------- protocols/Steam/src/steam_login.cpp | 2 +- protocols/Steam/src/steam_proto.cpp | 2 +- protocols/Steam/src/steam_proto.h | 5 +- protocols/Steam/src/steam_server.cpp | 117 ++-------------- protocols/Steam/src/steam_utils.cpp | 4 +- protocols/WhatsApp/src/message.cpp | 2 +- protocols/WhatsApp/src/proto.cpp | 8 +- protocols/WhatsApp/src/proto.h | 7 +- protocols/WhatsApp/src/server.cpp | 152 +++------------------ protocols/WhatsApp/src/utils.cpp | 14 +- src/mir_app/src/mir_app.def | 16 ++- src/mir_app/src/mir_app64.def | 16 ++- src/mir_app/src/netlib_websocket.cpp | 242 ++++++++++++++++++++++++++------- 21 files changed, 382 insertions(+), 611 deletions(-) diff --git a/include/m_netlib.h b/include/m_netlib.h index f4630248cb..5157c118e5 100644 --- a/include/m_netlib.h +++ b/include/m_netlib.h @@ -755,27 +755,65 @@ EXTERN_C MIR_APP_DLL(void*) Netlib_GetTlsUnique(HNETLIBCONN nlc, int &cbLen, int ///////////////////////////////////////////////////////////////////////////////////////// // WebSocket support -struct WSHeader +class MIR_APP_EXPORT MWebSocket : public MNonCopyable { - WSHeader() - { - memset(this, 0, sizeof(*this)); - } + mir_cs m_cs; + bool m_bTerminated = false; + +protected: + HNETLIBUSER m_nlu = 0; + HNETLIBCONN m_hConn = 0; + +public: + MWebSocket(); + ~MWebSocket(); + + // packet processor + virtual void process(const uint8_t *buf, size_t cbLen) = 0; + + // connects to a WebSocket server + MHttpResponse* connect(HANDLE nlu, const char *szHost, const MHttpHeaders *pHeaders = nullptr); + + // runs a socket reading cycle + void run(); + + // terminates a reading cycle + void terminate(); + + // sends a packet to WebSocket + void sendText(const char *pData); + void sendBinary(const void *pData, size_t strLen); +}; + +class MIR_APP_EXPORT MJsonWebSocket : public MWebSocket +{ + void process(const uint8_t *buf, size_t cbLen) override; + +public: + MJsonWebSocket() {} + + virtual void process(const class JSONNode &json) = 0; +}; + +template class WebSocket : public MWebSocket +{ + T *p; + +public: + WebSocket(T *_1) : p(_1) {} - bool bIsFinal, bIsMasked; - int opCode, firstByte; - size_t payloadSize, headerSize; + void process(const uint8_t *buf, size_t cbLen) override; }; -// connects to a WebSocket server -EXTERN_C MIR_APP_DLL(MHttpResponse*) WebSocket_Connect(HNETLIBUSER, const char *szHost, const MHttpHeaders *pHeaders = nullptr); +template class JsonWebSocket : public MJsonWebSocket +{ + T *p; -// validates that the provided buffer contains full WebSocket datagram -EXTERN_C MIR_APP_DLL(bool) WebSocket_InitHeader(WSHeader &hdr, const void *pData, size_t bufSize); +public: + JsonWebSocket(T *_1) : p(_1) {} -// sends a packet to WebSocket -EXTERN_C MIR_APP_DLL(void) WebSocket_SendText(HNETLIBCONN nlc, const char *pData); -EXTERN_C MIR_APP_DLL(void) WebSocket_SendBinary(HNETLIBCONN nlc, const void *pData, size_t strLen); + void process(const JSONNode &node) override; +}; ///////////////////////////////////////////////////////////////////////////////////////// // Netlib hooks (0.8+) diff --git a/libs/win32/mir_app.lib b/libs/win32/mir_app.lib index 37b8b0a798..24ebf7eb34 100644 Binary files a/libs/win32/mir_app.lib and b/libs/win32/mir_app.lib differ diff --git a/libs/win64/mir_app.lib b/libs/win64/mir_app.lib index 80a84c2a02..e8b7a9e6f0 100644 Binary files a/libs/win64/mir_app.lib and b/libs/win64/mir_app.lib differ diff --git a/protocols/Discord/src/connection.cpp b/protocols/Discord/src/connection.cpp index 96cc32ee25..ff7e765ecb 100644 --- a/protocols/Discord/src/connection.cpp +++ b/protocols/Discord/src/connection.cpp @@ -93,8 +93,8 @@ void CDiscordProto::ShutdownSession() pMfaDialog->Close(); if (m_hWorkerThread) SetEvent(m_evRequestsQueue); - if (m_hGatewayConnection) - Netlib_Shutdown(m_hGatewayConnection); + if (m_ws) + m_ws->terminate(); if (m_hAPIConnection) Netlib_Shutdown(m_hAPIConnection); diff --git a/protocols/Discord/src/gateway.cpp b/protocols/Discord/src/gateway.cpp index dc7d1da26e..d6b7846eeb 100644 --- a/protocols/Discord/src/gateway.cpp +++ b/protocols/Discord/src/gateway.cpp @@ -22,12 +22,12 @@ along with this program. If not, see . bool CDiscordProto::GatewaySend(const JSONNode &pRoot) { - if (m_hGatewayConnection == nullptr) + if (m_ws == nullptr) return false; json_string szText = pRoot.write(); debugLogA("Gateway send: %s", szText.c_str()); - WebSocket_SendText(m_hGatewayConnection, szText.c_str()); + m_ws->sendText(szText.c_str()); return true; } @@ -51,7 +51,8 @@ bool CDiscordProto::GatewayThreadWorker() hdrs.AddHeader("Cookie", m_szWSCookie); } - NLHR_PTR pReply(WebSocket_Connect(m_hGatewayNetlibUser, m_szGateway + "/?encoding=json&v=8", &hdrs)); + JsonWebSocket ws(this); + NLHR_PTR pReply(ws.connect(m_hGatewayNetlibUser, m_szGateway + "/?encoding=json&v=8", &hdrs)); if (pReply == nullptr) { debugLogA("Gateway connection failed, exiting"); return false; @@ -72,164 +73,62 @@ bool CDiscordProto::GatewayThreadWorker() // succeeded! debugLogA("Gateway connection succeeded"); - m_hGatewayConnection = pReply->nlc; - - bool bExit = false; - int offset = 0; - MBinBuffer netbuf; - - while (!bExit) { - if (m_bTerminated) - break; - - unsigned char buf[2048]; - int bufSize = Netlib_Recv(m_hGatewayConnection, (char*)buf + offset, _countof(buf) - offset, MSG_NODUMP); - if (bufSize == 0) { - debugLogA("Gateway connection gracefully closed"); - bExit = !m_bTerminated; - break; - } - if (bufSize < 0) { - debugLogA("Gateway connection error, exiting"); - break; - } - WSHeader hdr; - if (!WebSocket_InitHeader(hdr, buf, bufSize)) { - offset += bufSize; - continue; - } - offset = 0; - - debugLogA("Got packet: buffer = %d, opcode = %d, headerSize = %d, final = %d, masked = %d", bufSize, hdr.opCode, hdr.headerSize, hdr.bIsFinal, hdr.bIsMasked); - - // we have some additional data, not only opcode - if ((size_t)bufSize > hdr.headerSize) { - size_t currPacketSize = bufSize - hdr.headerSize; - netbuf.append(buf, bufSize); - while (currPacketSize < hdr.payloadSize) { - int result = Netlib_Recv(m_hGatewayConnection, (char*)buf, _countof(buf), MSG_NODUMP); - if (result == 0) { - debugLogA("Gateway connection gracefully closed"); - bExit = !m_bTerminated; - break; - } - if (result < 0) { - debugLogA("Gateway connection error, exiting"); - break; - } - currPacketSize += result; - netbuf.append(buf, result); - } - } - - // read all payloads from the current buffer, one by one - size_t prevSize = 0; - while (true) { - switch (hdr.opCode) { - case 0: // text packet - case 1: // binary packet - case 2: // continuation - if (hdr.bIsFinal) { - // process a packet here - CMStringA szJson((char*)netbuf.data() + hdr.headerSize, (int)hdr.payloadSize); - debugLogA("JSON received:\n%s", szJson.c_str()); - JSONNode root = JSONNode::parse(szJson); - if (root) - bExit = GatewayProcess(root); - } - break; - - case 8: // close - debugLogA("server required to exit"); - bExit = true; // simply reconnect, don't exit - break; - - case 9: // ping - debugLogA("ping received"); - Netlib_Send(m_hGatewayConnection, (char*)buf + hdr.headerSize, bufSize - int(hdr.headerSize), 0); - break; - } - - if (hdr.bIsFinal) - netbuf.remove(hdr.headerSize + hdr.payloadSize); - - if (netbuf.length() == 0) - break; - - // if we have not enough data for header, continue reading - if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length())) - break; - - // if we have not enough data for data, continue reading - if (hdr.headerSize + hdr.payloadSize > netbuf.length()) - break; - - debugLogA("Got inner packet: buffer = %d, opcode = %d, headerSize = %d, payloadSize = %d, final = %d, masked = %d", netbuf.length(), hdr.opCode, hdr.headerSize, hdr.payloadSize, hdr.bIsFinal, hdr.bIsMasked); - if (prevSize == netbuf.length()) { - netbuf.remove(prevSize); - debugLogA("dropping current packet, exiting"); - break; - } - - prevSize = netbuf.length(); - } - } + m_ws = &ws; + ws.run(); - Netlib_CloseHandle(m_hGatewayConnection); - m_hGatewayConnection = nullptr; - return bExit; + m_ws = nullptr; + return true; } ////////////////////////////////////////////////////////////////////////////////////// // handles server commands -bool CDiscordProto::GatewayProcess(const JSONNode &pRoot) +void JsonWebSocket::process(const JSONNode &json) { - int opCode = pRoot["op"].as_int(); + int opCode = json["op"].as_int(); switch (opCode) { case OPCODE_DISPATCH: // process incoming command { - int iSeq = pRoot["s"].as_int(); + int iSeq = json["s"].as_int(); if (iSeq != 0) - m_iGatewaySeq = iSeq; + p->m_iGatewaySeq = iSeq; + + CMStringW wszCommand = json["t"].as_mstring(); + p->debugLogA("got a server command to dispatch: %S", wszCommand.c_str()); - CMStringW wszCommand = pRoot["t"].as_mstring(); - debugLogA("got a server command to dispatch: %S", wszCommand.c_str()); - - GatewayHandlerFunc pFunc = GetHandler(wszCommand); + GatewayHandlerFunc pFunc = p->GetHandler(wszCommand); if (pFunc) - (this->*pFunc)(pRoot["d"]); + (p->*pFunc)(json["d"]); } break; case OPCODE_RECONNECT: // we need to reconnect asap - debugLogA("we need to reconnect, leaving worker thread"); - return true; + p->debugLogA("we need to reconnect, leaving worker thread"); + p->m_bTerminated = true; + return; case OPCODE_INVALID_SESSION: // session invalidated - if (pRoot["d"].as_bool()) // session can be resumed - GatewaySendResume(); + if (json["d"].as_bool()) // session can be resumed + p->GatewaySendResume(); else { Sleep(5000); // 5 seconds - recommended timeout - GatewaySendIdentify(); + p->GatewaySendIdentify(); } break; case OPCODE_HELLO: // hello - m_iHartbeatInterval = pRoot["d"]["heartbeat_interval"].as_int(); + p->m_iHartbeatInterval = json["d"]["heartbeat_interval"].as_int(); - GatewaySendIdentify(); + p->GatewaySendIdentify(); break; - + case OPCODE_HEARTBEAT_ACK: // heartbeat ack break; default: - debugLogA("ACHTUNG! Unknown opcode: %d, report it to developer", opCode); + p->debugLogA("ACHTUNG! Unknown opcode: %d, report it to developer", opCode); } - - return false; } ////////////////////////////////////////////////////////////////////////////////////// diff --git a/protocols/Discord/src/proto.cpp b/protocols/Discord/src/proto.cpp index aef8da827b..7cfeba6d56 100644 --- a/protocols/Discord/src/proto.cpp +++ b/protocols/Discord/src/proto.cpp @@ -206,8 +206,8 @@ void CDiscordProto::OnShutdown() for (auto &it : arGuilds) it->SaveToFile(); - if (m_hGatewayConnection) - Netlib_Shutdown(m_hGatewayConnection); + if (m_ws) + m_ws->terminate(); if (g_plugin.bVoiceService) CallService(MS_VOICESERVICE_UNREGISTER, (WPARAM)m_szModuleName, 0); diff --git a/protocols/Discord/src/proto.h b/protocols/Discord/src/proto.h index 5f3e88d4c9..2946e677f8 100644 --- a/protocols/Discord/src/proto.h +++ b/protocols/Discord/src/proto.h @@ -173,8 +173,9 @@ class CDiscordVoiceCall : public MZeroedObject CDiscordProto *ppro; + JsonWebSocket *m_ws; + CTimer m_timer; - HNETLIBCONN m_hConn; HNETLIBBIND m_hBind; mir_cs m_cs; bool m_bTerminated; @@ -189,10 +190,8 @@ class CDiscordVoiceCall : public MZeroedObject void onTimer(CTimer *) { - if (m_hConn) { - JSONNode d; d << INT_PARAM("", rand()); - write(3, d); - } + JSONNode d; d << INT_PARAM("", rand()); + write(3, d); } static void GetConnection(HNETLIBCONN /*hNewConnection*/, uint32_t /*dwRemoteIP*/, void *pExtra); @@ -209,7 +208,6 @@ public: return !m_bTerminated; } - bool connect(HNETLIBUSER); void write(int op, JSONNode &d); void process(const JSONNode &node); @@ -298,6 +296,7 @@ class CDiscordProto : public PROTO friend class CMfaDialog; friend class CGroupchatInviteDlg; friend class CDiscordVoiceCall; + friend class JsonWebSocket; class CDiscordProtoImpl { @@ -370,13 +369,12 @@ class CDiscordProto : public PROTO m_szWSCookie; // cookie used for establishing websocket connection HNETLIBUSER m_hGatewayNetlibUser; // the separate netlib user handle for gateways - HNETLIBCONN m_hGatewayConnection; // gateway connection - + JsonWebSocket *m_ws; + void __cdecl GatewayThread(void*); bool GatewayThreadWorker(void); bool GatewaySend(const JSONNode &pNode); - bool GatewayProcess(const JSONNode &pNode); void GatewaySendGuildInfo(CDiscordGuild *pGuild); void GatewaySendHeartbeat(void); diff --git a/protocols/Discord/src/voice_client.cpp b/protocols/Discord/src/voice_client.cpp index 09e1908007..29b051799f 100644 --- a/protocols/Discord/src/voice_client.cpp +++ b/protocols/Discord/src/voice_client.cpp @@ -32,6 +32,8 @@ CDiscordVoiceCall::CDiscordVoiceCall(CDiscordProto *pOwner) : CDiscordVoiceCall::~CDiscordVoiceCall() { + delete m_ws; + m_timer.StopSafe(); if (m_encoder) { @@ -44,50 +46,17 @@ CDiscordVoiceCall::~CDiscordVoiceCall() m_repacketizer = nullptr; } - if (m_hConn) { - Netlib_CloseHandle(m_hConn); - m_hConn = nullptr; - } - if (m_hBind) { Netlib_CloseHandle(m_hBind); m_hBind = nullptr; } } -bool CDiscordVoiceCall::connect(HNETLIBUSER hServer) -{ - int nLoops = 0; - time_t lastLoopTime = time(0); - - while (true) { - time_t currTime = time(0); - if (currTime - lastLoopTime > 3) - nLoops = 0; - - nLoops++; - if (nLoops > 5) - break; - - lastLoopTime = currTime; - - MHttpHeaders hdrs; - hdrs.AddHeader("Origin", "https://discord.com"); - - NLHR_PTR pReply(WebSocket_Connect(hServer, szEndpoint + "/?encoding=json&v=8", &hdrs)); - if (pReply && pReply->resultCode == 101) { - m_hConn = pReply->nlc; - return true; - } - - SleepEx(5000, TRUE); - } - - return false; -} - void CDiscordVoiceCall::write(int op, JSONNode &d) { + if (m_ws == nullptr) + return; + d.set_name("d"); JSONNode payload; @@ -97,7 +66,12 @@ void CDiscordVoiceCall::write(int op, JSONNode &d) ppro->debugLogA("Voice JSON sent: %s", json.c_str()); mir_cslock lck(m_cs); - WebSocket_SendText(m_hConn, json.c_str()); + m_ws->sendText(json.c_str()); +} + +void JsonWebSocket::process(const JSONNode &json) +{ + p->process(json); } ////////////////////////////////////////////////////////////////////////////////////////// @@ -186,111 +160,20 @@ void CDiscordProto::VoiceClientThread(void *param) auto *pCall = (CDiscordVoiceCall *)param; debugLogA("Entering voice websocket thread"); - if (!pCall->connect(m_hGatewayNetlibUser)) { + MHttpHeaders hdrs; + hdrs.AddHeader("Origin", "https://discord.com"); + + JsonWebSocket ws(pCall); + + NLHR_PTR pReply(ws.connect(m_hGatewayNetlibUser, pCall->szEndpoint + "/?encoding=json&v=8", &hdrs)); + if (!pReply || pReply->resultCode != 101) { debugLogA("Voice gateway connection failed, exiting"); return; } - int offset = 0; - MBinBuffer netbuf; - - while (*pCall) { - if (m_bTerminated) - break; - - unsigned char buf[2048]; - int bufSize = Netlib_Recv(pCall->m_hConn, (char *)buf + offset, _countof(buf) - offset, MSG_NODUMP); - if (bufSize == 0) { - debugLogA("Voice gateway connection gracefully closed"); - pCall->m_bTerminated = !m_bTerminated; - break; - } - if (bufSize < 0) { - debugLogA("Voice gateway connection error, exiting"); - break; - } - - WSHeader hdr; - if (!WebSocket_InitHeader(hdr, buf, bufSize)) { - offset += bufSize; - continue; - } - offset = 0; - - // we have some additional data, not only opcode - if ((size_t)bufSize > hdr.headerSize) { - size_t currPacketSize = bufSize - hdr.headerSize; - netbuf.append(buf, bufSize); - while (currPacketSize < hdr.payloadSize) { - int result = Netlib_Recv(pCall->m_hConn, (char *)buf, _countof(buf), MSG_NODUMP); - if (result == 0) { - debugLogA("Voice gateway connection gracefully closed"); - pCall->m_bTerminated = !m_bTerminated; - break; - } - if (result < 0) { - debugLogA("Voice gateway connection error, exiting"); - break; - } - currPacketSize += result; - netbuf.append(buf, result); - } - } - - // read all payloads from the current buffer, one by one - size_t prevSize = 0; - while (true) { - switch (hdr.opCode) { - case 0: // text packet - case 1: // binary packet - case 2: // continuation - if (hdr.bIsFinal) { - // process a packet here - CMStringA szJson((char *)netbuf.data() + hdr.headerSize, (int)hdr.payloadSize); - debugLogA("Voice JSON received:\n%s", szJson.c_str()); - JSONNode root = JSONNode::parse(szJson); - if (root) - pCall->process(root); - } - break; - - case 8: // close - debugLogA("Voice server required to exit"); - pCall->m_bTerminated = true; // simply reconnect, don't exit - break; - - case 9: // ping - debugLogA("ping received"); - Netlib_Send(pCall->m_hConn, (char *)buf + hdr.headerSize, bufSize - int(hdr.headerSize), 0); - break; - } - - if (hdr.bIsFinal) - netbuf.remove(hdr.headerSize + hdr.payloadSize); - - if (netbuf.length() == 0) - break; - - // if we have not enough data for header, continue reading - if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length())) - break; - - // if we have not enough data for data, continue reading - if (hdr.headerSize + hdr.payloadSize > netbuf.length()) - break; - - if (prevSize == netbuf.length()) { - netbuf.remove(prevSize); - debugLogA("dropping current packet, exiting"); - break; - } - - prevSize = netbuf.length(); - } - } - - debugLogA("Exiting voice websocket thread"); - Netlib_CloseHandle(pCall->m_hConn); pCall->m_hConn = 0; + pCall->m_ws = &ws; + ws.run(); + pCall->m_ws = nullptr; } ///////////////////////////////////////////////////////////////////////////////////////// diff --git a/protocols/Steam/src/steam_login.cpp b/protocols/Steam/src/steam_login.cpp index ce235b635c..e7a583dac1 100644 --- a/protocols/Steam/src/steam_login.cpp +++ b/protocols/Steam/src/steam_login.cpp @@ -123,7 +123,7 @@ void CSteamProto::DeleteAuthSettings() bool CSteamProto::IsOnline() { - return m_iStatus > ID_STATUS_OFFLINE && m_hServerConn != nullptr; + return m_iStatus > ID_STATUS_OFFLINE && m_ws != nullptr; } bool CSteamProto::IsMe(const char *steamId) diff --git a/protocols/Steam/src/steam_proto.cpp b/protocols/Steam/src/steam_proto.cpp index 7cb7ec5ae5..77dc767621 100644 --- a/protocols/Steam/src/steam_proto.cpp +++ b/protocols/Steam/src/steam_proto.cpp @@ -290,7 +290,7 @@ int CSteamProto::SetStatus(int new_status) Logout(); } - else if (m_hServerConn == nullptr && !IsStatusConnecting(m_iStatus)) { + else if (m_ws == nullptr && !IsStatusConnecting(m_iStatus)) { m_iStatus = ID_STATUS_CONNECTING; ForkThread(&CSteamProto::ServerThread); diff --git a/protocols/Steam/src/steam_proto.h b/protocols/Steam/src/steam_proto.h index 7110861a58..18c077e5ae 100644 --- a/protocols/Steam/src/steam_proto.h +++ b/protocols/Steam/src/steam_proto.h @@ -66,6 +66,7 @@ class CSteamProto : public PROTO friend class CSteamOptionsMain; friend class CSteamOptionsBlockList; friend class PollRequest; + friend class WebSocket; ptrW m_password; bool m_bTerminated; @@ -87,7 +88,8 @@ class CSteamProto : public PROTO std::map m_mpOutMessages; // connection - HNETLIBCONN m_hServerConn; + WebSocket *m_ws; + void __cdecl ServerThread(void *); bool ServerThreadStub(const char *szHost); @@ -95,7 +97,6 @@ class CSteamProto : public PROTO OBJLIST m_arRequests; void ProcessMulti(const uint8_t *buf, size_t cbLen); - void ProcessPacket(const uint8_t *buf, size_t cbLen); void ProcessMessage(const uint8_t *buf, size_t cbLen); void WSSend(EMsg msgType, const ProtobufCppMessage &msg); diff --git a/protocols/Steam/src/steam_server.cpp b/protocols/Steam/src/steam_server.cpp index c2c9f1638c..8dc8274240 100644 --- a/protocols/Steam/src/steam_server.cpp +++ b/protocols/Steam/src/steam_server.cpp @@ -30,7 +30,7 @@ void __cdecl CSteamProto::ServerThread(void *) } srand(time(0)); - m_hServerConn = nullptr; + m_ws = nullptr; CMStringA szHost; do { @@ -44,7 +44,9 @@ void __cdecl CSteamProto::ServerThread(void *) bool CSteamProto::ServerThreadStub(const char *szHost) { - NLHR_PTR pReply(WebSocket_Connect(m_hNetlibUser, szHost)); + WebSocket ws(this); + + NLHR_PTR pReply(ws.connect(m_hNetlibUser, szHost)); if (pReply == nullptr) { debugLogA("websocket connection failed"); return false; @@ -55,116 +57,21 @@ bool CSteamProto::ServerThreadStub(const char *szHost) return false; } - m_hServerConn = pReply->nlc; + m_ws = &ws; + debugLogA("Websocket connection succeeded"); // Send init packets Login(); - bool bExit = false; - int offset = 0; - MBinBuffer netbuf; - - while (!bExit) { - if (m_bTerminated) - break; - - unsigned char buf[2048]; - int bufSize = Netlib_Recv(m_hServerConn, (char *)buf + offset, _countof(buf) - offset, 0); - if (bufSize == 0) { - debugLogA("Gateway connection gracefully closed"); - bExit = !m_bTerminated; - break; - } - if (bufSize < 0) { - debugLogA("Gateway connection error, exiting"); - break; - } - - WSHeader hdr; - if (!WebSocket_InitHeader(hdr, buf, bufSize)) { - offset += bufSize; - continue; - } - offset = 0; - - debugLogA("Got packet: buffer = %d, opcode = %d, headerSize = %d, final = %d, masked = %d", bufSize, hdr.opCode, hdr.headerSize, hdr.bIsFinal, hdr.bIsMasked); - - // we have some additional data, not only opcode - if ((size_t)bufSize > hdr.headerSize) { - size_t currPacketSize = bufSize - hdr.headerSize; - netbuf.append(buf, bufSize); - while (currPacketSize < hdr.payloadSize) { - int result = Netlib_Recv(m_hServerConn, (char *)buf, _countof(buf), 0); - if (result == 0) { - debugLogA("Gateway connection gracefully closed"); - bExit = !m_bTerminated; - break; - } - if (result < 0) { - debugLogA("Gateway connection error, exiting"); - break; - } - currPacketSize += result; - netbuf.append(buf, result); - } - } - - // read all payloads from the current buffer, one by one - size_t prevSize = 0; - while (true) { - switch (hdr.opCode) { - case 0: // text packet - case 1: // binary packet - case 2: // continuation - if (hdr.bIsFinal) - ProcessPacket((const uint8_t *)netbuf.data() + hdr.headerSize, hdr.payloadSize); - break; - - case 8: // close - debugLogA("server required to exit"); - bExit = true; // simply reconnect, don't exit - break; - - case 9: // ping - debugLogA("ping received"); - Netlib_Send(m_hServerConn, (char *)buf + hdr.headerSize, bufSize - int(hdr.headerSize), 0); - break; - } - - if (hdr.bIsFinal) - netbuf.remove(hdr.headerSize + hdr.payloadSize); - - if (netbuf.length() == 0) - break; - - // if we have not enough data for header, continue reading - if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length())) - break; - - // if we have not enough data for data, continue reading - if (hdr.headerSize + hdr.payloadSize > netbuf.length()) - break; - - debugLogA("Got inner packet: buffer = %d, opcode = %d, headerSize = %d, payloadSize = %d, final = %d, masked = %d", netbuf.length(), hdr.opCode, hdr.headerSize, hdr.payloadSize, hdr.bIsFinal, hdr.bIsMasked); - if (prevSize == netbuf.length()) { - netbuf.remove(prevSize); - debugLogA("dropping current packet, exiting"); - break; - } - - prevSize = netbuf.length(); - } - } - - Netlib_CloseHandle(m_hServerConn); - m_hServerConn = nullptr; - return bExit; + ws.run(); + m_ws = nullptr; + return false; } ///////////////////////////////////////////////////////////////////////////////////////// -void CSteamProto::ProcessPacket(const uint8_t *buf, size_t cbLen) +void WebSocket::process(const uint8_t *buf, size_t cbLen) { uint32_t dwSign = *(uint32_t *)buf; EMsg msgType = (EMsg)(dwSign & ~STEAM_PROTOCOL_MASK); @@ -172,9 +79,9 @@ void CSteamProto::ProcessPacket(const uint8_t *buf, size_t cbLen) // now process the body if (msgType == EMsg::Multi) { buf += 8; cbLen -= 8; - ProcessMulti(buf, cbLen); + p->ProcessMulti(buf, cbLen); } - else ProcessMessage(buf, cbLen); + else p->ProcessMessage(buf, cbLen); } void CSteamProto::ProcessMulti(const uint8_t *buf, size_t cbLen) diff --git a/protocols/Steam/src/steam_utils.cpp b/protocols/Steam/src/steam_utils.cpp index 1faf408f5d..ed90bc5178 100644 --- a/protocols/Steam/src/steam_utils.cpp +++ b/protocols/Steam/src/steam_utils.cpp @@ -37,14 +37,12 @@ void CSteamProto::WSSendHeader(EMsg msgType, const CMsgProtoBufHeader &hdr, cons uint32_t type = (uint32_t)msgType; type |= STEAM_PROTOCOL_MASK; hdrbuf.appendBefore(&type, sizeof(type)); - Netlib_Dump(m_hServerConn, hdrbuf.data(), hdrbuf.length(), true, 0); MBinBuffer body(protobuf_c_message_get_packed_size(&msg)); protobuf_c_message_pack(&msg, body.data()); - Netlib_Dump(m_hServerConn, body.data(), body.length(), true, 0); hdrbuf.append(body); - WebSocket_SendBinary(m_hServerConn, hdrbuf.data(), hdrbuf.length()); + m_ws->sendBinary(hdrbuf.data(), hdrbuf.length()); } void CSteamProto::WSSendService(const char *pszServiceName, const ProtobufCppMessage &msg, MsgCallback pCallback) diff --git a/protocols/WhatsApp/src/message.cpp b/protocols/WhatsApp/src/message.cpp index a664e49953..2acf0b3a4e 100644 --- a/protocols/WhatsApp/src/message.cpp +++ b/protocols/WhatsApp/src/message.cpp @@ -152,7 +152,7 @@ void WhatsAppProto::OnReceiveMessage(const WANode &node) if (WAJid(szChatId).isUser()) pszReceiptTo = szAuthor; } - else if (!m_hServerConn) + else if (!m_ws) pszReceiptType = "inactive"; SendReceipt(szChatId, pszReceiptTo, msgId, pszReceiptType); diff --git a/protocols/WhatsApp/src/proto.cpp b/protocols/WhatsApp/src/proto.cpp index 9cf48f0d6b..38071deaf6 100644 --- a/protocols/WhatsApp/src/proto.cpp +++ b/protocols/WhatsApp/src/proto.cpp @@ -223,19 +223,19 @@ int WhatsAppProto::SetStatus(int iNewStatus) if (m_iDesiredStatus == ID_STATUS_OFFLINE) { SetServerStatus(m_iDesiredStatus); - if (m_hServerConn != nullptr) - Netlib_Shutdown(m_hServerConn); + if (m_ws != nullptr) + m_ws->terminate(); m_iStatus = m_iDesiredStatus = ID_STATUS_OFFLINE; ProtoBroadcastAck(NULL, ACKTYPE_STATUS, ACKRESULT_SUCCESS, (HANDLE)oldStatus, m_iStatus); } - else if (m_hServerConn == nullptr && !IsStatusConnecting(m_iStatus)) { + else if (m_ws == nullptr && !IsStatusConnecting(m_iStatus)) { m_iStatus = ID_STATUS_CONNECTING; ProtoBroadcastAck(NULL, ACKTYPE_STATUS, ACKRESULT_SUCCESS, (HANDLE)oldStatus, m_iStatus); ForkThread(&WhatsAppProto::ServerThread); } - else if (m_hServerConn != nullptr) { + else if (m_ws != nullptr) { SetServerStatus(m_iDesiredStatus); m_iStatus = m_iDesiredStatus; diff --git a/protocols/WhatsApp/src/proto.h b/protocols/WhatsApp/src/proto.h index 3a7842bd1f..8170101d2f 100644 --- a/protocols/WhatsApp/src/proto.h +++ b/protocols/WhatsApp/src/proto.h @@ -164,6 +164,7 @@ struct WACollection class WANoise { friend class WhatsAppProto; + friend class WebSocket; WhatsAppProto *ppro; uint32_t readCounter = 0, writeCounter = 0; @@ -268,6 +269,7 @@ class WhatsAppProto : public PROTO friend class WANoise; friend class CWhatsAppQRDlg; friend class COptionsDlg; + friend class WebSocket; class CWhatsAppProtoImpl { @@ -347,7 +349,7 @@ class WhatsAppProto : public PROTO /// Network //////////////////////////////////////////////////////////////////////////// time_t m_lastRecvTime; - HNETLIBCONN m_hServerConn; + WebSocket *m_ws; mir_cs m_csPacketQueue; OBJLIST m_arPacketQueue; @@ -365,7 +367,6 @@ class WhatsAppProto : public PROTO void ProcessReceipt(MCONTACT hContact, const char *msgId, bool bRead); - bool WSReadPacket(const WSHeader &hdr, MBinBuffer &buf); int WSSend(const ProtobufCMessage &msg); int WSSendNode(WANode &node); int WSSendNode(WANode &node, WA_PKT_HANDLER); @@ -450,7 +451,7 @@ public: ~WhatsAppProto(); __forceinline bool isOnline() const - { return m_hServerConn != 0; + { return m_ws != 0; } __forceinline void writeStr(const char *pszSetting, const JSONNode &node) diff --git a/protocols/WhatsApp/src/server.cpp b/protocols/WhatsApp/src/server.cpp index e6305d3dd5..bf7bf142c6 100644 --- a/protocols/WhatsApp/src/server.cpp +++ b/protocols/WhatsApp/src/server.cpp @@ -27,7 +27,9 @@ void WhatsAppProto::ServerThreadWorker() MHttpHeaders hdrs; hdrs.AddHeader("Origin", "https://web.whatsapp.com"); - NLHR_PTR pReply(WebSocket_Connect(m_hNetlibUser, "web.whatsapp.com/ws/chat", &hdrs)); + WebSocket ws(this); + + NLHR_PTR pReply(ws.connect(m_hNetlibUser, "web.whatsapp.com/ws/chat", &hdrs)); if (pReply == nullptr) { debugLogA("Server connection failed, exiting"); return; @@ -41,7 +43,7 @@ void WhatsAppProto::ServerThreadWorker() m_noise->init(); debugLogA("Server connection succeeded"); - m_hServerConn = pReply->nlc; + m_ws = &ws; m_lastRecvTime = time(0); m_iPacketId = 1; @@ -55,134 +57,18 @@ void WhatsAppProto::ServerThreadWorker() msg.clienthello = &client; WSSend(msg); - MBinBuffer netbuf; - - for (m_bTerminated = false; !m_bTerminated;) { - unsigned char buf[2048]; - int bufSize = Netlib_Recv(m_hServerConn, (char *)buf, _countof(buf), MSG_NODUMP); - if (bufSize == 0) { - debugLogA("Gateway connection gracefully closed"); - break; - } - if (bufSize < 0) { - debugLogA("Gateway connection error, exiting"); - break; - } - - netbuf.append(buf, bufSize); - - WSHeader hdr; - if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length())) - continue; - - // we lack some data, let's read them - if (netbuf.length() < hdr.headerSize + hdr.payloadSize) - if (!WSReadPacket(hdr, netbuf)) - break; - - // debugLogA("Got packet: buffer = %d, opcode = %d, headerSize = %d, payloadSize = %d, final = %d, masked = %d", - // netbuf.length(), hdr.opCode, hdr.headerSize, hdr.payloadSize, hdr.bIsFinal, hdr.bIsMasked); - // Netlib_Dump(m_hServerConn, netbuf.data(), netbuf.length(), false, 0); - - m_lastRecvTime = time(0); - - // read all payloads from the current buffer, one by one - while (true) { - MBinBuffer currPacket; - currPacket.assign(netbuf.data() + hdr.headerSize, hdr.payloadSize); - - switch (hdr.opCode) { - case 1: // json packet - debugLogA("Text packet, skipping"); - /* - currPacket.append("", 1); // add 0 to use strchr safely - CMStringA szJson(pos, (int)dataSize); - - JSONNode root = JSONNode::parse(szJson); - if (root) { - debugLogA("JSON received:\n%s", start); - - CMStringA szPrefix(start, int(pos - start - 1)); - auto *pReq = m_arPacketQueue.find((WARequest *)&szPrefix); - if (pReq != nullptr) { - root << CHAR_PARAM("$id$", szPrefix); - } - } - } - */ - break; - - case 2: // binary packet - if (hdr.payloadSize > 32) - ProcessBinaryPacket(currPacket.data(), hdr.payloadSize); - break; - - case 8: // close - debugLogA("server required to exit"); - m_bRespawn = m_bTerminated = true; // simply reconnect, don't exit - break; - - default: - Netlib_Dump(m_hServerConn, currPacket.data(), hdr.payloadSize, false, 0); - } - - netbuf.remove(hdr.headerSize + hdr.payloadSize); - // debugLogA("%d bytes removed from network buffer, %d bytes remain", hdr.headerSize + hdr.payloadSize, netbuf.length()); - if (netbuf.length() == 0) - break; - - // if we have not enough data for header, continue reading - if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length())) { - debugLogA("not enough data for header, continue reading"); - break; - } - - // if we have not enough data for data, continue reading - if (hdr.headerSize + hdr.payloadSize > netbuf.length()) { - debugLogA("not enough place for data (%d+%d > %d), continue reading", hdr.headerSize, hdr.payloadSize, netbuf.length()); - break; - } - - debugLogA("Got inner packet: buffer = %d, opcode = %d, headerSize = %d, payloadSize = %d, final = %d, masked = %d", - netbuf.length(), hdr.opCode, hdr.headerSize, hdr.payloadSize, hdr.bIsFinal, hdr.bIsMasked); - } - } - - debugLogA("Server connection dropped"); - Netlib_CloseHandle(m_hServerConn); - m_hServerConn = nullptr; -} - -bool WhatsAppProto::WSReadPacket(const WSHeader &hdr, MBinBuffer &res) -{ - size_t currPacketSize = res.length() - hdr.headerSize; - - char buf[1024]; - while (currPacketSize < hdr.payloadSize) { - int result = Netlib_Recv(m_hServerConn, buf, _countof(buf), MSG_NODUMP); - if (result == 0) { - debugLogA("Gateway connection gracefully closed"); - return false; - } - if (result < 0) { - debugLogA("Gateway connection error, exiting"); - return false; - } - - currPacketSize += result; - res.append(buf, result); - } - return true; + ws.run(); + m_ws = nullptr; } ///////////////////////////////////////////////////////////////////////////////////////// // Binary data processing -void WhatsAppProto::ProcessBinaryPacket(const uint8_t *pData, size_t cbDataLen) +void WebSocket::process(const uint8_t *pData, size_t cbDataLen) { - while (size_t payloadLen = m_noise->decodeFrame(pData, cbDataLen)) { - if (m_noise->bInitFinished) { - MBinBuffer buf = m_noise->decrypt(pData, payloadLen); + while (size_t payloadLen = p->m_noise->decodeFrame(pData, cbDataLen)) { + if (p->m_noise->bInitFinished) { + MBinBuffer buf = p->m_noise->decrypt(pData, payloadLen); WAReader rdr(buf.data(), buf.length()); auto b = rdr.readInt8(); @@ -195,22 +81,22 @@ void WhatsAppProto::ProcessBinaryPacket(const uint8_t *pData, size_t cbDataLen) if (WANode *pNode = rdr.readNode()) { CMStringA szText; pNode->print(szText); - debugLogA("Got binary node:\n%s", szText.c_str()); + p->debugLogA("Got binary node:\n%s", szText.c_str()); - auto pHandler = FindPersistentHandler(*pNode); + auto pHandler = p->FindPersistentHandler(*pNode); if (pHandler) - (this->*pHandler)(*pNode); + (p->*pHandler)(*pNode); else - debugLogA("cannot handle incoming message"); + p->debugLogA("cannot handle incoming message"); delete pNode; } else { - debugLogA("wrong or broken payload"); - Netlib_Dump(m_hServerConn, pData, cbDataLen, false, 0); + p->debugLogA("wrong or broken payload"); + Netlib_Dump(m_hConn, pData, cbDataLen, false, 0); } } - else OnProcessHandshake(pData, (int)payloadLen); + else p->OnProcessHandshake(pData, (int)payloadLen); pData = (BYTE*)pData + payloadLen; cbDataLen -= payloadLen; @@ -408,8 +294,8 @@ void WhatsAppProto::ShutdownSession() debugLogA("WhatsAppProto::ShutdownSession"); // shutdown all resources - if (m_hServerConn) - Netlib_Shutdown(m_hServerConn); + if (m_ws) + m_ws->terminate(); OnLoggedOut(); } diff --git a/protocols/WhatsApp/src/utils.cpp b/protocols/WhatsApp/src/utils.cpp index ff1a5b5daf..133584014c 100644 --- a/protocols/WhatsApp/src/utils.cpp +++ b/protocols/WhatsApp/src/utils.cpp @@ -169,14 +169,14 @@ CMStringA WhatsAppProto::GenerateMessageId() int WhatsAppProto::WSSend(const ProtobufCMessage &msg) { - if (m_hServerConn == nullptr) + if (m_ws == nullptr) return -1; MBinBuffer buf(proto::Serialize(&msg)); - Netlib_Dump(m_hServerConn, buf.data(), buf.length(), true, 0); + // Netlib_Dump(m_hServerConn, buf.data(), buf.length(), true, 0); MBinBuffer payload = m_noise->encodeFrame(buf.data(), buf.length()); - WebSocket_SendBinary(m_hServerConn, payload.data(), payload.length()); + m_ws->sendBinary(payload.data(), payload.length()); return 0; } @@ -184,7 +184,7 @@ int WhatsAppProto::WSSend(const ProtobufCMessage &msg) int WhatsAppProto::WSSendNode(WANode &node) { - if (m_hServerConn == nullptr) + if (m_ws == nullptr) return 0; CMStringA szText; @@ -196,13 +196,13 @@ int WhatsAppProto::WSSendNode(WANode &node) MBinBuffer encData = m_noise->encrypt(writer.body.data(), writer.body.length()); MBinBuffer payload = m_noise->encodeFrame(encData.data(), encData.length()); - WebSocket_SendBinary(m_hServerConn, payload.data(), payload.length()); + m_ws->sendBinary(payload.data(), payload.length()); return 1; } int WhatsAppProto::WSSendNode(WANode &node, WA_PKT_HANDLER pHandler) { - if (m_hServerConn == nullptr) + if (m_ws == nullptr) return 0; CMStringA id(GenerateMessageId()); @@ -217,7 +217,7 @@ int WhatsAppProto::WSSendNode(WANode &node, WA_PKT_HANDLER pHandler) int WhatsAppProto::WSSendNode(WANode &node, WA_PKT_HANDLER_FULL pHandler, void *pUserInfo) { - if (m_hServerConn == nullptr) + if (m_ws == nullptr) return 0; CMStringA id(GenerateMessageId()); diff --git a/src/mir_app/src/mir_app.def b/src/mir_app/src/mir_app.def index db2c589d2f..9486812235 100644 --- a/src/mir_app/src/mir_app.def +++ b/src/mir_app/src/mir_app.def @@ -676,8 +676,8 @@ UnregisterHppLogger @786 ?RegisterSrmmLog@@YGPAXPAUCMPlugin@@PBDPB_WP6APAVCSrmmLogWindow@@AAVCMsgDialog@@@Z@Z @805 NONAME ?UnregisterSrmmLog@@YGXPAX@Z @806 NONAME ?GetType@CRtfLogWindow@@UAEHXZ @807 NONAME -_WebSocket_InitHeader@12 @809 NONAME -_WebSocket_Connect@12 @810 NONAME +?run@MWebSocket@@QAEXXZ @809 NONAME +?connect@MWebSocket@@QAEPAUMHttpResponse@@PAXPBDPBUMHttpHeaders@@@Z @810 NONAME ?log@CSrmmBaseDialog@@QBEPAVCSrmmLogWindow@@XZ @811 NONAME Netlib_Dump @812 NONAME ?ProtoBroadcastAsync@PROTO_INTERFACE@@QAEXIHHPAXJ@Z @813 NONAME @@ -736,8 +736,8 @@ _Netlib_GetTlsUnique@12 @831 NONAME ?IsPluginOnWhiteList@@YG_NPBD@Z @866 NONAME ?SetPluginOnWhiteList@@YGXPBD_N@Z @867 NONAME Chat_Mute @868 -_WebSocket_SendBinary@12 @869 NONAME -_WebSocket_SendText@8 @870 NONAME +?sendBinary@MWebSocket@@QAEXPBXI@Z @869 NONAME +?sendText@MWebSocket@@QAEXPBD@Z @870 NONAME ?OnContactAdded@PROTO_INTERFACE@@UAEXI@Z @871 NONAME _Netlib_SslConnect@12 @872 NONAME _Netlib_SslFree@4 @873 NONAME @@ -977,3 +977,11 @@ g_hevEventSetJson @1109 NONAME ?addReaction@EventInfo@DB@@QAEXPBD@Z @1113 NONAME ?delReaction@EventInfo@DB@@QAEXPBD@Z @1114 NONAME ?getText@EventInfo@DB@@QBEPA_WXZ @1115 NONAME +??0MWebSocket@@QAE@XZ @1116 NONAME +??1MWebSocket@@QAE@XZ @1117 NONAME +??_7MWebSocket@@6B@ @1118 NONAME +?terminate@MWebSocket@@QAEXXZ @1119 NONAME +??0MJsonWebSocket@@QAE@XZ @1120 NONAME +??1MJsonWebSocket@@QAE@XZ @1121 NONAME +??_7MJsonWebSocket@@6B@ @1122 NONAME +?process@MJsonWebSocket@@EAEXPBEI@Z @1123 NONAME diff --git a/src/mir_app/src/mir_app64.def b/src/mir_app/src/mir_app64.def index f4c00a0a66..a1a44174a0 100644 --- a/src/mir_app/src/mir_app64.def +++ b/src/mir_app/src/mir_app64.def @@ -676,8 +676,8 @@ UnregisterHppLogger @786 ?RegisterSrmmLog@@YAPEAXPEAUCMPlugin@@PEBDPEB_WP6APEAVCSrmmLogWindow@@AEAVCMsgDialog@@@Z@Z @805 NONAME ?UnregisterSrmmLog@@YAXPEAX@Z @806 NONAME ?GetType@CRtfLogWindow@@UEAAHXZ @807 NONAME -WebSocket_InitHeader @809 NONAME -WebSocket_Connect @810 NONAME +?run@MWebSocket@@QEAAXXZ @809 NONAME +?connect@MWebSocket@@QEAAPEAUMHttpResponse@@PEAXPEBDPEBUMHttpHeaders@@@Z @810 NONAME ?log@CSrmmBaseDialog@@QEBAPEAVCSrmmLogWindow@@XZ @811 NONAME Netlib_Dump @812 NONAME ?ProtoBroadcastAsync@PROTO_INTERFACE@@QEAAXIHHPEAX_J@Z @813 NONAME @@ -736,8 +736,8 @@ Netlib_GetTlsUnique @831 NONAME ?IsPluginOnWhiteList@@YA_NPEBD@Z @866 NONAME ?SetPluginOnWhiteList@@YAXPEBD_N@Z @867 NONAME Chat_Mute @868 -WebSocket_SendBinary @869 NONAME -WebSocket_SendText @870 NONAME +?sendBinary@MWebSocket@@QEAAXPEBX_K@Z @869 NONAME +?sendText@MWebSocket@@QEAAXPEBD@Z @870 NONAME ?OnContactAdded@PROTO_INTERFACE@@UEAAXI@Z @871 NONAME Netlib_SslConnect @872 NONAME Netlib_SslFree @873 NONAME @@ -977,3 +977,11 @@ g_hevEventSetJson @1103 NONAME ?addReaction@EventInfo@DB@@QEAAXPEBD@Z @1107 NONAME ?delReaction@EventInfo@DB@@QEAAXPEBD@Z @1108 NONAME ?getText@EventInfo@DB@@QEBAPEA_WXZ @1109 NONAME +??0MWebSocket@@QEAA@XZ @1110 NONAME +??1MWebSocket@@QEAA@XZ @1111 NONAME +??_7MWebSocket@@6B@ @1112 NONAME +?terminate@MWebSocket@@QEAAXXZ @1113 NONAME +??0MJsonWebSocket@@QEAA@XZ @1114 NONAME +??1MJsonWebSocket@@QEAA@XZ @1115 NONAME +??_7MJsonWebSocket@@6B@ @1116 NONAME +?process@MJsonWebSocket@@EEAAXPEBE_K@Z @1117 NONAME diff --git a/src/mir_app/src/netlib_websocket.cpp b/src/mir_app/src/netlib_websocket.cpp index 9ce6f72be9..b92d3c168f 100644 --- a/src/mir_app/src/netlib_websocket.cpp +++ b/src/mir_app/src/netlib_websocket.cpp @@ -27,8 +27,73 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. #include "../../libs/zlib/src/zlib.h" -MIR_APP_DLL(MHttpResponse*) WebSocket_Connect(HNETLIBUSER nlu, const char *szHost, const MHttpHeaders *pHeaders) +struct WSHeader { + WSHeader() + { + memset(this, 0, sizeof(*this)); + } + + bool bIsFinal, bIsMasked; + int opCode, firstByte; + size_t payloadSize, headerSize; + + bool init(const void *pData, size_t bufSize) + { + if (bufSize < 2) + return false; + + auto *buf = (const uint8_t *)pData; + bIsFinal = (buf[0] & 0x80) != 0; + bIsMasked = (buf[1] & 0x80) != 0; + opCode = buf[0] & 0x0F; + firstByte = buf[1] & 0x7F; + headerSize = 2 + (firstByte == 0x7E ? 2 : 0) + (firstByte == 0x7F ? 8 : 0) + (bIsMasked ? 4 : 0); + if (bufSize < headerSize) + return false; + + uint64_t tmpSize = 0; + switch (firstByte) { + case 0x7F: + tmpSize += ((uint64_t)buf[2]) << 56; + tmpSize += ((uint64_t)buf[3]) << 48; + tmpSize += ((uint64_t)buf[4]) << 40; + tmpSize += ((uint64_t)buf[5]) << 32; + tmpSize += ((uint64_t)buf[6]) << 24; + tmpSize += ((uint64_t)buf[7]) << 16; + tmpSize += ((uint64_t)buf[8]) << 8; + tmpSize += ((uint64_t)buf[9]); + break; + + case 0x7E: + tmpSize += ((uint64_t)buf[2]) << 8; + tmpSize += ((uint64_t)buf[3]); + break; + + default: + tmpSize = firstByte; + } + payloadSize = tmpSize; + return true; + } +}; + +MWebSocket::MWebSocket() +{ +} + +MWebSocket::~MWebSocket() +{ + if (m_hConn) + Netlib_CloseHandle(m_hConn); +} + +//////////////////////////////////////////////////////////////////////////////////////// + +MHttpResponse* MWebSocket::connect(HANDLE nlu, const char *szHost, const MHttpHeaders *pHeaders) +{ + m_nlu = (HNETLIBUSER)nlu; + CMStringA tmpHost(szHost); // connect to the gateway server @@ -54,57 +119,19 @@ MIR_APP_DLL(MHttpResponse*) WebSocket_Connect(HNETLIBUSER nlu, const char *szHos for (auto &it: *pHeaders) nlhr.AddHeader(it->szName, it->szValue); - auto *pReply = Netlib_HttpTransaction(nlu, &nlhr); + auto *pReply = Netlib_HttpTransaction(m_nlu, &nlhr); if (pReply == nullptr) { - Netlib_Logf(nlu, "Error establishing WebSocket connection to %s, send failed", tmpHost.c_str()); + Netlib_Logf(m_nlu, "Error establishing WebSocket connection to %s, send failed", tmpHost.c_str()); return nullptr; } + m_hConn = pReply->nlc; if (pReply->resultCode != 101) - Netlib_Logf(nlu, "Error establishing WebSocket connection to %s, status %d", tmpHost.c_str(), pReply->resultCode); + Netlib_Logf(m_nlu, "Error establishing WebSocket connection to %s, status %d", tmpHost.c_str(), pReply->resultCode); return pReply; } -MIR_APP_DLL(bool) WebSocket_InitHeader(WSHeader &hdr, const void *pData, size_t bufSize) -{ - if (bufSize < 2) - return false; - - auto *buf = (const uint8_t *)pData; - hdr.bIsFinal = (buf[0] & 0x80) != 0; - hdr.bIsMasked = (buf[1] & 0x80) != 0; - hdr.opCode = buf[0] & 0x0F; - hdr.firstByte = buf[1] & 0x7F; - hdr.headerSize = 2 + (hdr.firstByte == 0x7E ? 2 : 0) + (hdr.firstByte == 0x7F ? 8 : 0) + (hdr.bIsMasked ? 4 : 0); - if (bufSize < hdr.headerSize) - return false; - - uint64_t tmpSize = 0; - switch (hdr.firstByte) { - case 0x7F: - tmpSize += ((uint64_t)buf[2]) << 56; - tmpSize += ((uint64_t)buf[3]) << 48; - tmpSize += ((uint64_t)buf[4]) << 40; - tmpSize += ((uint64_t)buf[5]) << 32; - tmpSize += ((uint64_t)buf[6]) << 24; - tmpSize += ((uint64_t)buf[7]) << 16; - tmpSize += ((uint64_t)buf[8]) << 8; - tmpSize += ((uint64_t)buf[9]); - break; - - case 0x7E: - tmpSize += ((uint64_t)buf[2]) << 8; - tmpSize += ((uint64_t)buf[3]); - break; - - default: - tmpSize = hdr.firstByte; - } - hdr.payloadSize = tmpSize; - return true; -} - ///////////////////////////////////////////////////////////////////////////////////////// static void WebSocket_Send(HNETLIBCONN nlc, const void *pData, int64_t dataLen, uint8_t opCode) @@ -156,14 +183,131 @@ static void WebSocket_Send(HNETLIBCONN nlc, const void *pData, int64_t dataLen, Netlib_Send(nlc, sendBuf, int(dataLen + cbLen), MSG_NODUMP); } -MIR_APP_DLL(void) WebSocket_SendText(HNETLIBCONN nlc, const char *pData) +void MWebSocket::sendText(const char *pData) +{ + if (m_hConn && pData) { + mir_cslock lck(m_cs); + WebSocket_Send(m_hConn, pData, strlen(pData), 1); + } +} + +void MWebSocket::sendBinary(const void *pData, size_t dataLen) { - if (nlc && pData) - WebSocket_Send(nlc, pData, strlen(pData), 1); + if (m_hConn && pData) { + mir_cslock lck(m_cs); + WebSocket_Send(m_hConn, pData, dataLen, 2); + } +} + +///////////////////////////////////////////////////////////////////////////////////////// + +void MWebSocket::terminate() +{ + m_bTerminated = true; + + if (m_hConn) + Netlib_Shutdown(m_hConn); +} + +///////////////////////////////////////////////////////////////////////////////////////// + +void MWebSocket::run() +{ + int offset = 0; + MBinBuffer netbuf; + + while (!m_bTerminated) { + unsigned char buf[2048]; + int bufSize = Netlib_Recv(m_hConn, (char *)buf + offset, _countof(buf) - offset, MSG_NODUMP); + if (bufSize == 0) { + Netlib_Log(m_nlu, "Websocket connection gracefully closed"); + break; + } + if (bufSize < 0) { + Netlib_Log(m_nlu, "Websocket connection error, exiting"); + break; + } + + WSHeader hdr; + if (!hdr.init(buf, bufSize)) { + offset += bufSize; + continue; + } + offset = 0; + + // we have some additional data, not only opcode + if ((size_t)bufSize > hdr.headerSize) { + size_t currPacketSize = bufSize - hdr.headerSize; + netbuf.append(buf, bufSize); + while (currPacketSize < hdr.payloadSize) { + int result = Netlib_Recv(m_hConn, (char *)buf, _countof(buf), MSG_NODUMP); + if (result == 0) { + Netlib_Log(m_nlu, "Websocket connection gracefully closed"); + break; + } + if (result < 0) { + Netlib_Log(m_nlu, "Websocket connection error, exiting"); + break; + } + currPacketSize += result; + netbuf.append(buf, result); + } + } + + // read all payloads from the current buffer, one by one + size_t prevSize = 0; + while (true) { + switch (hdr.opCode) { + case 0: // text packet + case 1: // binary packet + case 2: // continuation + if (hdr.bIsFinal) { + // process a packet here + process((uint8_t*)netbuf.data() + hdr.headerSize, (int)hdr.payloadSize); + } + break; + + case 8: // close + Netlib_Log(m_nlu, "server required to exit"); + m_bTerminated = true; // simply reconnect, don't exit + break; + + case 9: // ping + Netlib_Log(m_nlu, "ping received"); + Netlib_Send(m_hConn, (char *)buf + hdr.headerSize, bufSize - int(hdr.headerSize), 0); + break; + } + + if (hdr.bIsFinal) + netbuf.remove(hdr.headerSize + hdr.payloadSize); + + if (netbuf.length() == 0) + break; + + // if we have not enough data for header, continue reading + if (!hdr.init(netbuf.data(), netbuf.length())) + break; + + // if we have not enough data for data, continue reading + if (hdr.headerSize + hdr.payloadSize > netbuf.length()) + break; + + if (prevSize == netbuf.length()) { + netbuf.remove(prevSize); + break; + } + + prevSize = netbuf.length(); + } + } } -MIR_APP_DLL(void) WebSocket_SendBinary(HNETLIBCONN nlc, const void *pData, size_t dataLen) +void MJsonWebSocket::process(const uint8_t *buf, size_t cbLen) { - if (nlc && pData) - WebSocket_Send(nlc, pData, dataLen, 2); + CMStringA szJson((char*)buf, (int)cbLen); + Netlib_Logf(m_nlu, "JSON received:\n%s", szJson.c_str()); + + JSONNode root = JSONNode::parse(szJson); + if (root) + process(root); } -- cgit v1.2.3