summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Hazan <george.hazan@gmail.com>2024-06-13 16:39:12 +0300
committerGeorge Hazan <george.hazan@gmail.com>2024-06-13 16:39:12 +0300
commit004f3d1f49c54bc62743a838161ac157ffc37e41 (patch)
tree722b78c8e8f32c16cf9959068d86f7bf775e41cf
parent778558984c08b787d0e73d692086b6935e56a156 (diff)
websocket internal code went to MWebSocket
-rw-r--r--include/m_netlib.h68
-rw-r--r--libs/win32/mir_app.libbin295310 -> 298040 bytes
-rw-r--r--libs/win64/mir_app.libbin295140 -> 298056 bytes
-rw-r--r--protocols/Discord/src/connection.cpp4
-rw-r--r--protocols/Discord/src/gateway.cpp155
-rw-r--r--protocols/Discord/src/proto.cpp4
-rw-r--r--protocols/Discord/src/proto.h16
-rw-r--r--protocols/Discord/src/voice_client.cpp159
-rw-r--r--protocols/Steam/src/steam_login.cpp2
-rw-r--r--protocols/Steam/src/steam_proto.cpp2
-rw-r--r--protocols/Steam/src/steam_proto.h5
-rw-r--r--protocols/Steam/src/steam_server.cpp117
-rw-r--r--protocols/Steam/src/steam_utils.cpp4
-rw-r--r--protocols/WhatsApp/src/message.cpp2
-rw-r--r--protocols/WhatsApp/src/proto.cpp8
-rw-r--r--protocols/WhatsApp/src/proto.h7
-rw-r--r--protocols/WhatsApp/src/server.cpp152
-rw-r--r--protocols/WhatsApp/src/utils.cpp14
-rw-r--r--src/mir_app/src/mir_app.def16
-rw-r--r--src/mir_app/src/mir_app64.def16
-rw-r--r--src/mir_app/src/netlib_websocket.cpp242
21 files changed, 382 insertions, 611 deletions
diff --git a/include/m_netlib.h b/include/m_netlib.h
index f4630248cb..5157c118e5 100644
--- a/include/m_netlib.h
+++ b/include/m_netlib.h
@@ -755,27 +755,65 @@ EXTERN_C MIR_APP_DLL(void*) Netlib_GetTlsUnique(HNETLIBCONN nlc, int &cbLen, int
/////////////////////////////////////////////////////////////////////////////////////////
// WebSocket support
-struct WSHeader
+class MIR_APP_EXPORT MWebSocket : public MNonCopyable
{
- WSHeader()
- {
- memset(this, 0, sizeof(*this));
- }
+ mir_cs m_cs;
+ bool m_bTerminated = false;
+
+protected:
+ HNETLIBUSER m_nlu = 0;
+ HNETLIBCONN m_hConn = 0;
+
+public:
+ MWebSocket();
+ ~MWebSocket();
+
+ // packet processor
+ virtual void process(const uint8_t *buf, size_t cbLen) = 0;
+
+ // connects to a WebSocket server
+ MHttpResponse* connect(HANDLE nlu, const char *szHost, const MHttpHeaders *pHeaders = nullptr);
+
+ // runs a socket reading cycle
+ void run();
+
+ // terminates a reading cycle
+ void terminate();
+
+ // sends a packet to WebSocket
+ void sendText(const char *pData);
+ void sendBinary(const void *pData, size_t strLen);
+};
+
+class MIR_APP_EXPORT MJsonWebSocket : public MWebSocket
+{
+ void process(const uint8_t *buf, size_t cbLen) override;
+
+public:
+ MJsonWebSocket() {}
+
+ virtual void process(const class JSONNode &json) = 0;
+};
+
+template<class T> class WebSocket : public MWebSocket
+{
+ T *p;
+
+public:
+ WebSocket(T *_1) : p(_1) {}
- bool bIsFinal, bIsMasked;
- int opCode, firstByte;
- size_t payloadSize, headerSize;
+ void process(const uint8_t *buf, size_t cbLen) override;
};
-// connects to a WebSocket server
-EXTERN_C MIR_APP_DLL(MHttpResponse*) WebSocket_Connect(HNETLIBUSER, const char *szHost, const MHttpHeaders *pHeaders = nullptr);
+template<class T> class JsonWebSocket : public MJsonWebSocket
+{
+ T *p;
-// validates that the provided buffer contains full WebSocket datagram
-EXTERN_C MIR_APP_DLL(bool) WebSocket_InitHeader(WSHeader &hdr, const void *pData, size_t bufSize);
+public:
+ JsonWebSocket(T *_1) : p(_1) {}
-// sends a packet to WebSocket
-EXTERN_C MIR_APP_DLL(void) WebSocket_SendText(HNETLIBCONN nlc, const char *pData);
-EXTERN_C MIR_APP_DLL(void) WebSocket_SendBinary(HNETLIBCONN nlc, const void *pData, size_t strLen);
+ void process(const JSONNode &node) override;
+};
/////////////////////////////////////////////////////////////////////////////////////////
// Netlib hooks (0.8+)
diff --git a/libs/win32/mir_app.lib b/libs/win32/mir_app.lib
index 37b8b0a798..24ebf7eb34 100644
--- a/libs/win32/mir_app.lib
+++ b/libs/win32/mir_app.lib
Binary files differ
diff --git a/libs/win64/mir_app.lib b/libs/win64/mir_app.lib
index 80a84c2a02..e8b7a9e6f0 100644
--- a/libs/win64/mir_app.lib
+++ b/libs/win64/mir_app.lib
Binary files differ
diff --git a/protocols/Discord/src/connection.cpp b/protocols/Discord/src/connection.cpp
index 96cc32ee25..ff7e765ecb 100644
--- a/protocols/Discord/src/connection.cpp
+++ b/protocols/Discord/src/connection.cpp
@@ -93,8 +93,8 @@ void CDiscordProto::ShutdownSession()
pMfaDialog->Close();
if (m_hWorkerThread)
SetEvent(m_evRequestsQueue);
- if (m_hGatewayConnection)
- Netlib_Shutdown(m_hGatewayConnection);
+ if (m_ws)
+ m_ws->terminate();
if (m_hAPIConnection)
Netlib_Shutdown(m_hAPIConnection);
diff --git a/protocols/Discord/src/gateway.cpp b/protocols/Discord/src/gateway.cpp
index dc7d1da26e..d6b7846eeb 100644
--- a/protocols/Discord/src/gateway.cpp
+++ b/protocols/Discord/src/gateway.cpp
@@ -22,12 +22,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
bool CDiscordProto::GatewaySend(const JSONNode &pRoot)
{
- if (m_hGatewayConnection == nullptr)
+ if (m_ws == nullptr)
return false;
json_string szText = pRoot.write();
debugLogA("Gateway send: %s", szText.c_str());
- WebSocket_SendText(m_hGatewayConnection, szText.c_str());
+ m_ws->sendText(szText.c_str());
return true;
}
@@ -51,7 +51,8 @@ bool CDiscordProto::GatewayThreadWorker()
hdrs.AddHeader("Cookie", m_szWSCookie);
}
- NLHR_PTR pReply(WebSocket_Connect(m_hGatewayNetlibUser, m_szGateway + "/?encoding=json&v=8", &hdrs));
+ JsonWebSocket<CDiscordProto> ws(this);
+ NLHR_PTR pReply(ws.connect(m_hGatewayNetlibUser, m_szGateway + "/?encoding=json&v=8", &hdrs));
if (pReply == nullptr) {
debugLogA("Gateway connection failed, exiting");
return false;
@@ -72,164 +73,62 @@ bool CDiscordProto::GatewayThreadWorker()
// succeeded!
debugLogA("Gateway connection succeeded");
- m_hGatewayConnection = pReply->nlc;
-
- bool bExit = false;
- int offset = 0;
- MBinBuffer netbuf;
-
- while (!bExit) {
- if (m_bTerminated)
- break;
-
- unsigned char buf[2048];
- int bufSize = Netlib_Recv(m_hGatewayConnection, (char*)buf + offset, _countof(buf) - offset, MSG_NODUMP);
- if (bufSize == 0) {
- debugLogA("Gateway connection gracefully closed");
- bExit = !m_bTerminated;
- break;
- }
- if (bufSize < 0) {
- debugLogA("Gateway connection error, exiting");
- break;
- }
- WSHeader hdr;
- if (!WebSocket_InitHeader(hdr, buf, bufSize)) {
- offset += bufSize;
- continue;
- }
- offset = 0;
-
- debugLogA("Got packet: buffer = %d, opcode = %d, headerSize = %d, final = %d, masked = %d", bufSize, hdr.opCode, hdr.headerSize, hdr.bIsFinal, hdr.bIsMasked);
-
- // we have some additional data, not only opcode
- if ((size_t)bufSize > hdr.headerSize) {
- size_t currPacketSize = bufSize - hdr.headerSize;
- netbuf.append(buf, bufSize);
- while (currPacketSize < hdr.payloadSize) {
- int result = Netlib_Recv(m_hGatewayConnection, (char*)buf, _countof(buf), MSG_NODUMP);
- if (result == 0) {
- debugLogA("Gateway connection gracefully closed");
- bExit = !m_bTerminated;
- break;
- }
- if (result < 0) {
- debugLogA("Gateway connection error, exiting");
- break;
- }
- currPacketSize += result;
- netbuf.append(buf, result);
- }
- }
-
- // read all payloads from the current buffer, one by one
- size_t prevSize = 0;
- while (true) {
- switch (hdr.opCode) {
- case 0: // text packet
- case 1: // binary packet
- case 2: // continuation
- if (hdr.bIsFinal) {
- // process a packet here
- CMStringA szJson((char*)netbuf.data() + hdr.headerSize, (int)hdr.payloadSize);
- debugLogA("JSON received:\n%s", szJson.c_str());
- JSONNode root = JSONNode::parse(szJson);
- if (root)
- bExit = GatewayProcess(root);
- }
- break;
-
- case 8: // close
- debugLogA("server required to exit");
- bExit = true; // simply reconnect, don't exit
- break;
-
- case 9: // ping
- debugLogA("ping received");
- Netlib_Send(m_hGatewayConnection, (char*)buf + hdr.headerSize, bufSize - int(hdr.headerSize), 0);
- break;
- }
-
- if (hdr.bIsFinal)
- netbuf.remove(hdr.headerSize + hdr.payloadSize);
-
- if (netbuf.length() == 0)
- break;
-
- // if we have not enough data for header, continue reading
- if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length()))
- break;
-
- // if we have not enough data for data, continue reading
- if (hdr.headerSize + hdr.payloadSize > netbuf.length())
- break;
-
- debugLogA("Got inner packet: buffer = %d, opcode = %d, headerSize = %d, payloadSize = %d, final = %d, masked = %d", netbuf.length(), hdr.opCode, hdr.headerSize, hdr.payloadSize, hdr.bIsFinal, hdr.bIsMasked);
- if (prevSize == netbuf.length()) {
- netbuf.remove(prevSize);
- debugLogA("dropping current packet, exiting");
- break;
- }
-
- prevSize = netbuf.length();
- }
- }
+ m_ws = &ws;
+ ws.run();
- Netlib_CloseHandle(m_hGatewayConnection);
- m_hGatewayConnection = nullptr;
- return bExit;
+ m_ws = nullptr;
+ return true;
}
//////////////////////////////////////////////////////////////////////////////////////
// handles server commands
-bool CDiscordProto::GatewayProcess(const JSONNode &pRoot)
+void JsonWebSocket<CDiscordProto>::process(const JSONNode &json)
{
- int opCode = pRoot["op"].as_int();
+ int opCode = json["op"].as_int();
switch (opCode) {
case OPCODE_DISPATCH: // process incoming command
{
- int iSeq = pRoot["s"].as_int();
+ int iSeq = json["s"].as_int();
if (iSeq != 0)
- m_iGatewaySeq = iSeq;
+ p->m_iGatewaySeq = iSeq;
+
+ CMStringW wszCommand = json["t"].as_mstring();
+ p->debugLogA("got a server command to dispatch: %S", wszCommand.c_str());
- CMStringW wszCommand = pRoot["t"].as_mstring();
- debugLogA("got a server command to dispatch: %S", wszCommand.c_str());
-
- GatewayHandlerFunc pFunc = GetHandler(wszCommand);
+ GatewayHandlerFunc pFunc = p->GetHandler(wszCommand);
if (pFunc)
- (this->*pFunc)(pRoot["d"]);
+ (p->*pFunc)(json["d"]);
}
break;
case OPCODE_RECONNECT: // we need to reconnect asap
- debugLogA("we need to reconnect, leaving worker thread");
- return true;
+ p->debugLogA("we need to reconnect, leaving worker thread");
+ p->m_bTerminated = true;
+ return;
case OPCODE_INVALID_SESSION: // session invalidated
- if (pRoot["d"].as_bool()) // session can be resumed
- GatewaySendResume();
+ if (json["d"].as_bool()) // session can be resumed
+ p->GatewaySendResume();
else {
Sleep(5000); // 5 seconds - recommended timeout
- GatewaySendIdentify();
+ p->GatewaySendIdentify();
}
break;
case OPCODE_HELLO: // hello
- m_iHartbeatInterval = pRoot["d"]["heartbeat_interval"].as_int();
+ p->m_iHartbeatInterval = json["d"]["heartbeat_interval"].as_int();
- GatewaySendIdentify();
+ p->GatewaySendIdentify();
break;
-
+
case OPCODE_HEARTBEAT_ACK: // heartbeat ack
break;
default:
- debugLogA("ACHTUNG! Unknown opcode: %d, report it to developer", opCode);
+ p->debugLogA("ACHTUNG! Unknown opcode: %d, report it to developer", opCode);
}
-
- return false;
}
//////////////////////////////////////////////////////////////////////////////////////
diff --git a/protocols/Discord/src/proto.cpp b/protocols/Discord/src/proto.cpp
index aef8da827b..7cfeba6d56 100644
--- a/protocols/Discord/src/proto.cpp
+++ b/protocols/Discord/src/proto.cpp
@@ -206,8 +206,8 @@ void CDiscordProto::OnShutdown()
for (auto &it : arGuilds)
it->SaveToFile();
- if (m_hGatewayConnection)
- Netlib_Shutdown(m_hGatewayConnection);
+ if (m_ws)
+ m_ws->terminate();
if (g_plugin.bVoiceService)
CallService(MS_VOICESERVICE_UNREGISTER, (WPARAM)m_szModuleName, 0);
diff --git a/protocols/Discord/src/proto.h b/protocols/Discord/src/proto.h
index 5f3e88d4c9..2946e677f8 100644
--- a/protocols/Discord/src/proto.h
+++ b/protocols/Discord/src/proto.h
@@ -173,8 +173,9 @@ class CDiscordVoiceCall : public MZeroedObject
CDiscordProto *ppro;
+ JsonWebSocket<CDiscordVoiceCall> *m_ws;
+
CTimer m_timer;
- HNETLIBCONN m_hConn;
HNETLIBBIND m_hBind;
mir_cs m_cs;
bool m_bTerminated;
@@ -189,10 +190,8 @@ class CDiscordVoiceCall : public MZeroedObject
void onTimer(CTimer *)
{
- if (m_hConn) {
- JSONNode d; d << INT_PARAM("", rand());
- write(3, d);
- }
+ JSONNode d; d << INT_PARAM("", rand());
+ write(3, d);
}
static void GetConnection(HNETLIBCONN /*hNewConnection*/, uint32_t /*dwRemoteIP*/, void *pExtra);
@@ -209,7 +208,6 @@ public:
return !m_bTerminated;
}
- bool connect(HNETLIBUSER);
void write(int op, JSONNode &d);
void process(const JSONNode &node);
@@ -298,6 +296,7 @@ class CDiscordProto : public PROTO<CDiscordProto>
friend class CMfaDialog;
friend class CGroupchatInviteDlg;
friend class CDiscordVoiceCall;
+ friend class JsonWebSocket<CDiscordProto>;
class CDiscordProtoImpl
{
@@ -370,13 +369,12 @@ class CDiscordProto : public PROTO<CDiscordProto>
m_szWSCookie; // cookie used for establishing websocket connection
HNETLIBUSER m_hGatewayNetlibUser; // the separate netlib user handle for gateways
- HNETLIBCONN m_hGatewayConnection; // gateway connection
-
+ JsonWebSocket<CDiscordProto> *m_ws;
+
void __cdecl GatewayThread(void*);
bool GatewayThreadWorker(void);
bool GatewaySend(const JSONNode &pNode);
- bool GatewayProcess(const JSONNode &pNode);
void GatewaySendGuildInfo(CDiscordGuild *pGuild);
void GatewaySendHeartbeat(void);
diff --git a/protocols/Discord/src/voice_client.cpp b/protocols/Discord/src/voice_client.cpp
index 09e1908007..29b051799f 100644
--- a/protocols/Discord/src/voice_client.cpp
+++ b/protocols/Discord/src/voice_client.cpp
@@ -32,6 +32,8 @@ CDiscordVoiceCall::CDiscordVoiceCall(CDiscordProto *pOwner) :
CDiscordVoiceCall::~CDiscordVoiceCall()
{
+ delete m_ws;
+
m_timer.StopSafe();
if (m_encoder) {
@@ -44,50 +46,17 @@ CDiscordVoiceCall::~CDiscordVoiceCall()
m_repacketizer = nullptr;
}
- if (m_hConn) {
- Netlib_CloseHandle(m_hConn);
- m_hConn = nullptr;
- }
-
if (m_hBind) {
Netlib_CloseHandle(m_hBind);
m_hBind = nullptr;
}
}
-bool CDiscordVoiceCall::connect(HNETLIBUSER hServer)
-{
- int nLoops = 0;
- time_t lastLoopTime = time(0);
-
- while (true) {
- time_t currTime = time(0);
- if (currTime - lastLoopTime > 3)
- nLoops = 0;
-
- nLoops++;
- if (nLoops > 5)
- break;
-
- lastLoopTime = currTime;
-
- MHttpHeaders hdrs;
- hdrs.AddHeader("Origin", "https://discord.com");
-
- NLHR_PTR pReply(WebSocket_Connect(hServer, szEndpoint + "/?encoding=json&v=8", &hdrs));
- if (pReply && pReply->resultCode == 101) {
- m_hConn = pReply->nlc;
- return true;
- }
-
- SleepEx(5000, TRUE);
- }
-
- return false;
-}
-
void CDiscordVoiceCall::write(int op, JSONNode &d)
{
+ if (m_ws == nullptr)
+ return;
+
d.set_name("d");
JSONNode payload;
@@ -97,7 +66,12 @@ void CDiscordVoiceCall::write(int op, JSONNode &d)
ppro->debugLogA("Voice JSON sent: %s", json.c_str());
mir_cslock lck(m_cs);
- WebSocket_SendText(m_hConn, json.c_str());
+ m_ws->sendText(json.c_str());
+}
+
+void JsonWebSocket<CDiscordVoiceCall>::process(const JSONNode &json)
+{
+ p->process(json);
}
//////////////////////////////////////////////////////////////////////////////////////////
@@ -186,111 +160,20 @@ void CDiscordProto::VoiceClientThread(void *param)
auto *pCall = (CDiscordVoiceCall *)param;
debugLogA("Entering voice websocket thread");
- if (!pCall->connect(m_hGatewayNetlibUser)) {
+ MHttpHeaders hdrs;
+ hdrs.AddHeader("Origin", "https://discord.com");
+
+ JsonWebSocket<CDiscordVoiceCall> ws(pCall);
+
+ NLHR_PTR pReply(ws.connect(m_hGatewayNetlibUser, pCall->szEndpoint + "/?encoding=json&v=8", &hdrs));
+ if (!pReply || pReply->resultCode != 101) {
debugLogA("Voice gateway connection failed, exiting");
return;
}
- int offset = 0;
- MBinBuffer netbuf;
-
- while (*pCall) {
- if (m_bTerminated)
- break;
-
- unsigned char buf[2048];
- int bufSize = Netlib_Recv(pCall->m_hConn, (char *)buf + offset, _countof(buf) - offset, MSG_NODUMP);
- if (bufSize == 0) {
- debugLogA("Voice gateway connection gracefully closed");
- pCall->m_bTerminated = !m_bTerminated;
- break;
- }
- if (bufSize < 0) {
- debugLogA("Voice gateway connection error, exiting");
- break;
- }
-
- WSHeader hdr;
- if (!WebSocket_InitHeader(hdr, buf, bufSize)) {
- offset += bufSize;
- continue;
- }
- offset = 0;
-
- // we have some additional data, not only opcode
- if ((size_t)bufSize > hdr.headerSize) {
- size_t currPacketSize = bufSize - hdr.headerSize;
- netbuf.append(buf, bufSize);
- while (currPacketSize < hdr.payloadSize) {
- int result = Netlib_Recv(pCall->m_hConn, (char *)buf, _countof(buf), MSG_NODUMP);
- if (result == 0) {
- debugLogA("Voice gateway connection gracefully closed");
- pCall->m_bTerminated = !m_bTerminated;
- break;
- }
- if (result < 0) {
- debugLogA("Voice gateway connection error, exiting");
- break;
- }
- currPacketSize += result;
- netbuf.append(buf, result);
- }
- }
-
- // read all payloads from the current buffer, one by one
- size_t prevSize = 0;
- while (true) {
- switch (hdr.opCode) {
- case 0: // text packet
- case 1: // binary packet
- case 2: // continuation
- if (hdr.bIsFinal) {
- // process a packet here
- CMStringA szJson((char *)netbuf.data() + hdr.headerSize, (int)hdr.payloadSize);
- debugLogA("Voice JSON received:\n%s", szJson.c_str());
- JSONNode root = JSONNode::parse(szJson);
- if (root)
- pCall->process(root);
- }
- break;
-
- case 8: // close
- debugLogA("Voice server required to exit");
- pCall->m_bTerminated = true; // simply reconnect, don't exit
- break;
-
- case 9: // ping
- debugLogA("ping received");
- Netlib_Send(pCall->m_hConn, (char *)buf + hdr.headerSize, bufSize - int(hdr.headerSize), 0);
- break;
- }
-
- if (hdr.bIsFinal)
- netbuf.remove(hdr.headerSize + hdr.payloadSize);
-
- if (netbuf.length() == 0)
- break;
-
- // if we have not enough data for header, continue reading
- if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length()))
- break;
-
- // if we have not enough data for data, continue reading
- if (hdr.headerSize + hdr.payloadSize > netbuf.length())
- break;
-
- if (prevSize == netbuf.length()) {
- netbuf.remove(prevSize);
- debugLogA("dropping current packet, exiting");
- break;
- }
-
- prevSize = netbuf.length();
- }
- }
-
- debugLogA("Exiting voice websocket thread");
- Netlib_CloseHandle(pCall->m_hConn); pCall->m_hConn = 0;
+ pCall->m_ws = &ws;
+ ws.run();
+ pCall->m_ws = nullptr;
}
/////////////////////////////////////////////////////////////////////////////////////////
diff --git a/protocols/Steam/src/steam_login.cpp b/protocols/Steam/src/steam_login.cpp
index ce235b635c..e7a583dac1 100644
--- a/protocols/Steam/src/steam_login.cpp
+++ b/protocols/Steam/src/steam_login.cpp
@@ -123,7 +123,7 @@ void CSteamProto::DeleteAuthSettings()
bool CSteamProto::IsOnline()
{
- return m_iStatus > ID_STATUS_OFFLINE && m_hServerConn != nullptr;
+ return m_iStatus > ID_STATUS_OFFLINE && m_ws != nullptr;
}
bool CSteamProto::IsMe(const char *steamId)
diff --git a/protocols/Steam/src/steam_proto.cpp b/protocols/Steam/src/steam_proto.cpp
index 7cb7ec5ae5..77dc767621 100644
--- a/protocols/Steam/src/steam_proto.cpp
+++ b/protocols/Steam/src/steam_proto.cpp
@@ -290,7 +290,7 @@ int CSteamProto::SetStatus(int new_status)
Logout();
}
- else if (m_hServerConn == nullptr && !IsStatusConnecting(m_iStatus)) {
+ else if (m_ws == nullptr && !IsStatusConnecting(m_iStatus)) {
m_iStatus = ID_STATUS_CONNECTING;
ForkThread(&CSteamProto::ServerThread);
diff --git a/protocols/Steam/src/steam_proto.h b/protocols/Steam/src/steam_proto.h
index 7110861a58..18c077e5ae 100644
--- a/protocols/Steam/src/steam_proto.h
+++ b/protocols/Steam/src/steam_proto.h
@@ -66,6 +66,7 @@ class CSteamProto : public PROTO<CSteamProto>
friend class CSteamOptionsMain;
friend class CSteamOptionsBlockList;
friend class PollRequest;
+ friend class WebSocket<CSteamProto>;
ptrW m_password;
bool m_bTerminated;
@@ -87,7 +88,8 @@ class CSteamProto : public PROTO<CSteamProto>
std::map<HANDLE, time_t> m_mpOutMessages;
// connection
- HNETLIBCONN m_hServerConn;
+ WebSocket<CSteamProto> *m_ws;
+
void __cdecl ServerThread(void *);
bool ServerThreadStub(const char *szHost);
@@ -95,7 +97,6 @@ class CSteamProto : public PROTO<CSteamProto>
OBJLIST<ProtoRequest> m_arRequests;
void ProcessMulti(const uint8_t *buf, size_t cbLen);
- void ProcessPacket(const uint8_t *buf, size_t cbLen);
void ProcessMessage(const uint8_t *buf, size_t cbLen);
void WSSend(EMsg msgType, const ProtobufCppMessage &msg);
diff --git a/protocols/Steam/src/steam_server.cpp b/protocols/Steam/src/steam_server.cpp
index c2c9f1638c..8dc8274240 100644
--- a/protocols/Steam/src/steam_server.cpp
+++ b/protocols/Steam/src/steam_server.cpp
@@ -30,7 +30,7 @@ void __cdecl CSteamProto::ServerThread(void *)
}
srand(time(0));
- m_hServerConn = nullptr;
+ m_ws = nullptr;
CMStringA szHost;
do {
@@ -44,7 +44,9 @@ void __cdecl CSteamProto::ServerThread(void *)
bool CSteamProto::ServerThreadStub(const char *szHost)
{
- NLHR_PTR pReply(WebSocket_Connect(m_hNetlibUser, szHost));
+ WebSocket<CSteamProto> ws(this);
+
+ NLHR_PTR pReply(ws.connect(m_hNetlibUser, szHost));
if (pReply == nullptr) {
debugLogA("websocket connection failed");
return false;
@@ -55,116 +57,21 @@ bool CSteamProto::ServerThreadStub(const char *szHost)
return false;
}
- m_hServerConn = pReply->nlc;
+ m_ws = &ws;
+
debugLogA("Websocket connection succeeded");
// Send init packets
Login();
- bool bExit = false;
- int offset = 0;
- MBinBuffer netbuf;
-
- while (!bExit) {
- if (m_bTerminated)
- break;
-
- unsigned char buf[2048];
- int bufSize = Netlib_Recv(m_hServerConn, (char *)buf + offset, _countof(buf) - offset, 0);
- if (bufSize == 0) {
- debugLogA("Gateway connection gracefully closed");
- bExit = !m_bTerminated;
- break;
- }
- if (bufSize < 0) {
- debugLogA("Gateway connection error, exiting");
- break;
- }
-
- WSHeader hdr;
- if (!WebSocket_InitHeader(hdr, buf, bufSize)) {
- offset += bufSize;
- continue;
- }
- offset = 0;
-
- debugLogA("Got packet: buffer = %d, opcode = %d, headerSize = %d, final = %d, masked = %d", bufSize, hdr.opCode, hdr.headerSize, hdr.bIsFinal, hdr.bIsMasked);
-
- // we have some additional data, not only opcode
- if ((size_t)bufSize > hdr.headerSize) {
- size_t currPacketSize = bufSize - hdr.headerSize;
- netbuf.append(buf, bufSize);
- while (currPacketSize < hdr.payloadSize) {
- int result = Netlib_Recv(m_hServerConn, (char *)buf, _countof(buf), 0);
- if (result == 0) {
- debugLogA("Gateway connection gracefully closed");
- bExit = !m_bTerminated;
- break;
- }
- if (result < 0) {
- debugLogA("Gateway connection error, exiting");
- break;
- }
- currPacketSize += result;
- netbuf.append(buf, result);
- }
- }
-
- // read all payloads from the current buffer, one by one
- size_t prevSize = 0;
- while (true) {
- switch (hdr.opCode) {
- case 0: // text packet
- case 1: // binary packet
- case 2: // continuation
- if (hdr.bIsFinal)
- ProcessPacket((const uint8_t *)netbuf.data() + hdr.headerSize, hdr.payloadSize);
- break;
-
- case 8: // close
- debugLogA("server required to exit");
- bExit = true; // simply reconnect, don't exit
- break;
-
- case 9: // ping
- debugLogA("ping received");
- Netlib_Send(m_hServerConn, (char *)buf + hdr.headerSize, bufSize - int(hdr.headerSize), 0);
- break;
- }
-
- if (hdr.bIsFinal)
- netbuf.remove(hdr.headerSize + hdr.payloadSize);
-
- if (netbuf.length() == 0)
- break;
-
- // if we have not enough data for header, continue reading
- if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length()))
- break;
-
- // if we have not enough data for data, continue reading
- if (hdr.headerSize + hdr.payloadSize > netbuf.length())
- break;
-
- debugLogA("Got inner packet: buffer = %d, opcode = %d, headerSize = %d, payloadSize = %d, final = %d, masked = %d", netbuf.length(), hdr.opCode, hdr.headerSize, hdr.payloadSize, hdr.bIsFinal, hdr.bIsMasked);
- if (prevSize == netbuf.length()) {
- netbuf.remove(prevSize);
- debugLogA("dropping current packet, exiting");
- break;
- }
-
- prevSize = netbuf.length();
- }
- }
-
- Netlib_CloseHandle(m_hServerConn);
- m_hServerConn = nullptr;
- return bExit;
+ ws.run();
+ m_ws = nullptr;
+ return false;
}
/////////////////////////////////////////////////////////////////////////////////////////
-void CSteamProto::ProcessPacket(const uint8_t *buf, size_t cbLen)
+void WebSocket<CSteamProto>::process(const uint8_t *buf, size_t cbLen)
{
uint32_t dwSign = *(uint32_t *)buf;
EMsg msgType = (EMsg)(dwSign & ~STEAM_PROTOCOL_MASK);
@@ -172,9 +79,9 @@ void CSteamProto::ProcessPacket(const uint8_t *buf, size_t cbLen)
// now process the body
if (msgType == EMsg::Multi) {
buf += 8; cbLen -= 8;
- ProcessMulti(buf, cbLen);
+ p->ProcessMulti(buf, cbLen);
}
- else ProcessMessage(buf, cbLen);
+ else p->ProcessMessage(buf, cbLen);
}
void CSteamProto::ProcessMulti(const uint8_t *buf, size_t cbLen)
diff --git a/protocols/Steam/src/steam_utils.cpp b/protocols/Steam/src/steam_utils.cpp
index 1faf408f5d..ed90bc5178 100644
--- a/protocols/Steam/src/steam_utils.cpp
+++ b/protocols/Steam/src/steam_utils.cpp
@@ -37,14 +37,12 @@ void CSteamProto::WSSendHeader(EMsg msgType, const CMsgProtoBufHeader &hdr, cons
uint32_t type = (uint32_t)msgType;
type |= STEAM_PROTOCOL_MASK;
hdrbuf.appendBefore(&type, sizeof(type));
- Netlib_Dump(m_hServerConn, hdrbuf.data(), hdrbuf.length(), true, 0);
MBinBuffer body(protobuf_c_message_get_packed_size(&msg));
protobuf_c_message_pack(&msg, body.data());
- Netlib_Dump(m_hServerConn, body.data(), body.length(), true, 0);
hdrbuf.append(body);
- WebSocket_SendBinary(m_hServerConn, hdrbuf.data(), hdrbuf.length());
+ m_ws->sendBinary(hdrbuf.data(), hdrbuf.length());
}
void CSteamProto::WSSendService(const char *pszServiceName, const ProtobufCppMessage &msg, MsgCallback pCallback)
diff --git a/protocols/WhatsApp/src/message.cpp b/protocols/WhatsApp/src/message.cpp
index a664e49953..2acf0b3a4e 100644
--- a/protocols/WhatsApp/src/message.cpp
+++ b/protocols/WhatsApp/src/message.cpp
@@ -152,7 +152,7 @@ void WhatsAppProto::OnReceiveMessage(const WANode &node)
if (WAJid(szChatId).isUser())
pszReceiptTo = szAuthor;
}
- else if (!m_hServerConn)
+ else if (!m_ws)
pszReceiptType = "inactive";
SendReceipt(szChatId, pszReceiptTo, msgId, pszReceiptType);
diff --git a/protocols/WhatsApp/src/proto.cpp b/protocols/WhatsApp/src/proto.cpp
index 9cf48f0d6b..38071deaf6 100644
--- a/protocols/WhatsApp/src/proto.cpp
+++ b/protocols/WhatsApp/src/proto.cpp
@@ -223,19 +223,19 @@ int WhatsAppProto::SetStatus(int iNewStatus)
if (m_iDesiredStatus == ID_STATUS_OFFLINE) {
SetServerStatus(m_iDesiredStatus);
- if (m_hServerConn != nullptr)
- Netlib_Shutdown(m_hServerConn);
+ if (m_ws != nullptr)
+ m_ws->terminate();
m_iStatus = m_iDesiredStatus = ID_STATUS_OFFLINE;
ProtoBroadcastAck(NULL, ACKTYPE_STATUS, ACKRESULT_SUCCESS, (HANDLE)oldStatus, m_iStatus);
}
- else if (m_hServerConn == nullptr && !IsStatusConnecting(m_iStatus)) {
+ else if (m_ws == nullptr && !IsStatusConnecting(m_iStatus)) {
m_iStatus = ID_STATUS_CONNECTING;
ProtoBroadcastAck(NULL, ACKTYPE_STATUS, ACKRESULT_SUCCESS, (HANDLE)oldStatus, m_iStatus);
ForkThread(&WhatsAppProto::ServerThread);
}
- else if (m_hServerConn != nullptr) {
+ else if (m_ws != nullptr) {
SetServerStatus(m_iDesiredStatus);
m_iStatus = m_iDesiredStatus;
diff --git a/protocols/WhatsApp/src/proto.h b/protocols/WhatsApp/src/proto.h
index 3a7842bd1f..8170101d2f 100644
--- a/protocols/WhatsApp/src/proto.h
+++ b/protocols/WhatsApp/src/proto.h
@@ -164,6 +164,7 @@ struct WACollection
class WANoise
{
friend class WhatsAppProto;
+ friend class WebSocket<WhatsAppProto>;
WhatsAppProto *ppro;
uint32_t readCounter = 0, writeCounter = 0;
@@ -268,6 +269,7 @@ class WhatsAppProto : public PROTO<WhatsAppProto>
friend class WANoise;
friend class CWhatsAppQRDlg;
friend class COptionsDlg;
+ friend class WebSocket<WhatsAppProto>;
class CWhatsAppProtoImpl
{
@@ -347,7 +349,7 @@ class WhatsAppProto : public PROTO<WhatsAppProto>
/// Network ////////////////////////////////////////////////////////////////////////////
time_t m_lastRecvTime;
- HNETLIBCONN m_hServerConn;
+ WebSocket<WhatsAppProto> *m_ws;
mir_cs m_csPacketQueue;
OBJLIST<WARequestBase> m_arPacketQueue;
@@ -365,7 +367,6 @@ class WhatsAppProto : public PROTO<WhatsAppProto>
void ProcessReceipt(MCONTACT hContact, const char *msgId, bool bRead);
- bool WSReadPacket(const WSHeader &hdr, MBinBuffer &buf);
int WSSend(const ProtobufCMessage &msg);
int WSSendNode(WANode &node);
int WSSendNode(WANode &node, WA_PKT_HANDLER);
@@ -450,7 +451,7 @@ public:
~WhatsAppProto();
__forceinline bool isOnline() const
- { return m_hServerConn != 0;
+ { return m_ws != 0;
}
__forceinline void writeStr(const char *pszSetting, const JSONNode &node)
diff --git a/protocols/WhatsApp/src/server.cpp b/protocols/WhatsApp/src/server.cpp
index e6305d3dd5..bf7bf142c6 100644
--- a/protocols/WhatsApp/src/server.cpp
+++ b/protocols/WhatsApp/src/server.cpp
@@ -27,7 +27,9 @@ void WhatsAppProto::ServerThreadWorker()
MHttpHeaders hdrs;
hdrs.AddHeader("Origin", "https://web.whatsapp.com");
- NLHR_PTR pReply(WebSocket_Connect(m_hNetlibUser, "web.whatsapp.com/ws/chat", &hdrs));
+ WebSocket<WhatsAppProto> ws(this);
+
+ NLHR_PTR pReply(ws.connect(m_hNetlibUser, "web.whatsapp.com/ws/chat", &hdrs));
if (pReply == nullptr) {
debugLogA("Server connection failed, exiting");
return;
@@ -41,7 +43,7 @@ void WhatsAppProto::ServerThreadWorker()
m_noise->init();
debugLogA("Server connection succeeded");
- m_hServerConn = pReply->nlc;
+ m_ws = &ws;
m_lastRecvTime = time(0);
m_iPacketId = 1;
@@ -55,134 +57,18 @@ void WhatsAppProto::ServerThreadWorker()
msg.clienthello = &client;
WSSend(msg);
- MBinBuffer netbuf;
-
- for (m_bTerminated = false; !m_bTerminated;) {
- unsigned char buf[2048];
- int bufSize = Netlib_Recv(m_hServerConn, (char *)buf, _countof(buf), MSG_NODUMP);
- if (bufSize == 0) {
- debugLogA("Gateway connection gracefully closed");
- break;
- }
- if (bufSize < 0) {
- debugLogA("Gateway connection error, exiting");
- break;
- }
-
- netbuf.append(buf, bufSize);
-
- WSHeader hdr;
- if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length()))
- continue;
-
- // we lack some data, let's read them
- if (netbuf.length() < hdr.headerSize + hdr.payloadSize)
- if (!WSReadPacket(hdr, netbuf))
- break;
-
- // debugLogA("Got packet: buffer = %d, opcode = %d, headerSize = %d, payloadSize = %d, final = %d, masked = %d",
- // netbuf.length(), hdr.opCode, hdr.headerSize, hdr.payloadSize, hdr.bIsFinal, hdr.bIsMasked);
- // Netlib_Dump(m_hServerConn, netbuf.data(), netbuf.length(), false, 0);
-
- m_lastRecvTime = time(0);
-
- // read all payloads from the current buffer, one by one
- while (true) {
- MBinBuffer currPacket;
- currPacket.assign(netbuf.data() + hdr.headerSize, hdr.payloadSize);
-
- switch (hdr.opCode) {
- case 1: // json packet
- debugLogA("Text packet, skipping");
- /*
- currPacket.append("", 1); // add 0 to use strchr safely
- CMStringA szJson(pos, (int)dataSize);
-
- JSONNode root = JSONNode::parse(szJson);
- if (root) {
- debugLogA("JSON received:\n%s", start);
-
- CMStringA szPrefix(start, int(pos - start - 1));
- auto *pReq = m_arPacketQueue.find((WARequest *)&szPrefix);
- if (pReq != nullptr) {
- root << CHAR_PARAM("$id$", szPrefix);
- }
- }
- }
- */
- break;
-
- case 2: // binary packet
- if (hdr.payloadSize > 32)
- ProcessBinaryPacket(currPacket.data(), hdr.payloadSize);
- break;
-
- case 8: // close
- debugLogA("server required to exit");
- m_bRespawn = m_bTerminated = true; // simply reconnect, don't exit
- break;
-
- default:
- Netlib_Dump(m_hServerConn, currPacket.data(), hdr.payloadSize, false, 0);
- }
-
- netbuf.remove(hdr.headerSize + hdr.payloadSize);
- // debugLogA("%d bytes removed from network buffer, %d bytes remain", hdr.headerSize + hdr.payloadSize, netbuf.length());
- if (netbuf.length() == 0)
- break;
-
- // if we have not enough data for header, continue reading
- if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length())) {
- debugLogA("not enough data for header, continue reading");
- break;
- }
-
- // if we have not enough data for data, continue reading
- if (hdr.headerSize + hdr.payloadSize > netbuf.length()) {
- debugLogA("not enough place for data (%d+%d > %d), continue reading", hdr.headerSize, hdr.payloadSize, netbuf.length());
- break;
- }
-
- debugLogA("Got inner packet: buffer = %d, opcode = %d, headerSize = %d, payloadSize = %d, final = %d, masked = %d",
- netbuf.length(), hdr.opCode, hdr.headerSize, hdr.payloadSize, hdr.bIsFinal, hdr.bIsMasked);
- }
- }
-
- debugLogA("Server connection dropped");
- Netlib_CloseHandle(m_hServerConn);
- m_hServerConn = nullptr;
-}
-
-bool WhatsAppProto::WSReadPacket(const WSHeader &hdr, MBinBuffer &res)
-{
- size_t currPacketSize = res.length() - hdr.headerSize;
-
- char buf[1024];
- while (currPacketSize < hdr.payloadSize) {
- int result = Netlib_Recv(m_hServerConn, buf, _countof(buf), MSG_NODUMP);
- if (result == 0) {
- debugLogA("Gateway connection gracefully closed");
- return false;
- }
- if (result < 0) {
- debugLogA("Gateway connection error, exiting");
- return false;
- }
-
- currPacketSize += result;
- res.append(buf, result);
- }
- return true;
+ ws.run();
+ m_ws = nullptr;
}
/////////////////////////////////////////////////////////////////////////////////////////
// Binary data processing
-void WhatsAppProto::ProcessBinaryPacket(const uint8_t *pData, size_t cbDataLen)
+void WebSocket<WhatsAppProto>::process(const uint8_t *pData, size_t cbDataLen)
{
- while (size_t payloadLen = m_noise->decodeFrame(pData, cbDataLen)) {
- if (m_noise->bInitFinished) {
- MBinBuffer buf = m_noise->decrypt(pData, payloadLen);
+ while (size_t payloadLen = p->m_noise->decodeFrame(pData, cbDataLen)) {
+ if (p->m_noise->bInitFinished) {
+ MBinBuffer buf = p->m_noise->decrypt(pData, payloadLen);
WAReader rdr(buf.data(), buf.length());
auto b = rdr.readInt8();
@@ -195,22 +81,22 @@ void WhatsAppProto::ProcessBinaryPacket(const uint8_t *pData, size_t cbDataLen)
if (WANode *pNode = rdr.readNode()) {
CMStringA szText;
pNode->print(szText);
- debugLogA("Got binary node:\n%s", szText.c_str());
+ p->debugLogA("Got binary node:\n%s", szText.c_str());
- auto pHandler = FindPersistentHandler(*pNode);
+ auto pHandler = p->FindPersistentHandler(*pNode);
if (pHandler)
- (this->*pHandler)(*pNode);
+ (p->*pHandler)(*pNode);
else
- debugLogA("cannot handle incoming message");
+ p->debugLogA("cannot handle incoming message");
delete pNode;
}
else {
- debugLogA("wrong or broken payload");
- Netlib_Dump(m_hServerConn, pData, cbDataLen, false, 0);
+ p->debugLogA("wrong or broken payload");
+ Netlib_Dump(m_hConn, pData, cbDataLen, false, 0);
}
}
- else OnProcessHandshake(pData, (int)payloadLen);
+ else p->OnProcessHandshake(pData, (int)payloadLen);
pData = (BYTE*)pData + payloadLen;
cbDataLen -= payloadLen;
@@ -408,8 +294,8 @@ void WhatsAppProto::ShutdownSession()
debugLogA("WhatsAppProto::ShutdownSession");
// shutdown all resources
- if (m_hServerConn)
- Netlib_Shutdown(m_hServerConn);
+ if (m_ws)
+ m_ws->terminate();
OnLoggedOut();
}
diff --git a/protocols/WhatsApp/src/utils.cpp b/protocols/WhatsApp/src/utils.cpp
index ff1a5b5daf..133584014c 100644
--- a/protocols/WhatsApp/src/utils.cpp
+++ b/protocols/WhatsApp/src/utils.cpp
@@ -169,14 +169,14 @@ CMStringA WhatsAppProto::GenerateMessageId()
int WhatsAppProto::WSSend(const ProtobufCMessage &msg)
{
- if (m_hServerConn == nullptr)
+ if (m_ws == nullptr)
return -1;
MBinBuffer buf(proto::Serialize(&msg));
- Netlib_Dump(m_hServerConn, buf.data(), buf.length(), true, 0);
+ // Netlib_Dump(m_hServerConn, buf.data(), buf.length(), true, 0);
MBinBuffer payload = m_noise->encodeFrame(buf.data(), buf.length());
- WebSocket_SendBinary(m_hServerConn, payload.data(), payload.length());
+ m_ws->sendBinary(payload.data(), payload.length());
return 0;
}
@@ -184,7 +184,7 @@ int WhatsAppProto::WSSend(const ProtobufCMessage &msg)
int WhatsAppProto::WSSendNode(WANode &node)
{
- if (m_hServerConn == nullptr)
+ if (m_ws == nullptr)
return 0;
CMStringA szText;
@@ -196,13 +196,13 @@ int WhatsAppProto::WSSendNode(WANode &node)
MBinBuffer encData = m_noise->encrypt(writer.body.data(), writer.body.length());
MBinBuffer payload = m_noise->encodeFrame(encData.data(), encData.length());
- WebSocket_SendBinary(m_hServerConn, payload.data(), payload.length());
+ m_ws->sendBinary(payload.data(), payload.length());
return 1;
}
int WhatsAppProto::WSSendNode(WANode &node, WA_PKT_HANDLER pHandler)
{
- if (m_hServerConn == nullptr)
+ if (m_ws == nullptr)
return 0;
CMStringA id(GenerateMessageId());
@@ -217,7 +217,7 @@ int WhatsAppProto::WSSendNode(WANode &node, WA_PKT_HANDLER pHandler)
int WhatsAppProto::WSSendNode(WANode &node, WA_PKT_HANDLER_FULL pHandler, void *pUserInfo)
{
- if (m_hServerConn == nullptr)
+ if (m_ws == nullptr)
return 0;
CMStringA id(GenerateMessageId());
diff --git a/src/mir_app/src/mir_app.def b/src/mir_app/src/mir_app.def
index db2c589d2f..9486812235 100644
--- a/src/mir_app/src/mir_app.def
+++ b/src/mir_app/src/mir_app.def
@@ -676,8 +676,8 @@ UnregisterHppLogger @786
?RegisterSrmmLog@@YGPAXPAUCMPlugin@@PBDPB_WP6APAVCSrmmLogWindow@@AAVCMsgDialog@@@Z@Z @805 NONAME
?UnregisterSrmmLog@@YGXPAX@Z @806 NONAME
?GetType@CRtfLogWindow@@UAEHXZ @807 NONAME
-_WebSocket_InitHeader@12 @809 NONAME
-_WebSocket_Connect@12 @810 NONAME
+?run@MWebSocket@@QAEXXZ @809 NONAME
+?connect@MWebSocket@@QAEPAUMHttpResponse@@PAXPBDPBUMHttpHeaders@@@Z @810 NONAME
?log@CSrmmBaseDialog@@QBEPAVCSrmmLogWindow@@XZ @811 NONAME
Netlib_Dump @812 NONAME
?ProtoBroadcastAsync@PROTO_INTERFACE@@QAEXIHHPAXJ@Z @813 NONAME
@@ -736,8 +736,8 @@ _Netlib_GetTlsUnique@12 @831 NONAME
?IsPluginOnWhiteList@@YG_NPBD@Z @866 NONAME
?SetPluginOnWhiteList@@YGXPBD_N@Z @867 NONAME
Chat_Mute @868
-_WebSocket_SendBinary@12 @869 NONAME
-_WebSocket_SendText@8 @870 NONAME
+?sendBinary@MWebSocket@@QAEXPBXI@Z @869 NONAME
+?sendText@MWebSocket@@QAEXPBD@Z @870 NONAME
?OnContactAdded@PROTO_INTERFACE@@UAEXI@Z @871 NONAME
_Netlib_SslConnect@12 @872 NONAME
_Netlib_SslFree@4 @873 NONAME
@@ -977,3 +977,11 @@ g_hevEventSetJson @1109 NONAME
?addReaction@EventInfo@DB@@QAEXPBD@Z @1113 NONAME
?delReaction@EventInfo@DB@@QAEXPBD@Z @1114 NONAME
?getText@EventInfo@DB@@QBEPA_WXZ @1115 NONAME
+??0MWebSocket@@QAE@XZ @1116 NONAME
+??1MWebSocket@@QAE@XZ @1117 NONAME
+??_7MWebSocket@@6B@ @1118 NONAME
+?terminate@MWebSocket@@QAEXXZ @1119 NONAME
+??0MJsonWebSocket@@QAE@XZ @1120 NONAME
+??1MJsonWebSocket@@QAE@XZ @1121 NONAME
+??_7MJsonWebSocket@@6B@ @1122 NONAME
+?process@MJsonWebSocket@@EAEXPBEI@Z @1123 NONAME
diff --git a/src/mir_app/src/mir_app64.def b/src/mir_app/src/mir_app64.def
index f4c00a0a66..a1a44174a0 100644
--- a/src/mir_app/src/mir_app64.def
+++ b/src/mir_app/src/mir_app64.def
@@ -676,8 +676,8 @@ UnregisterHppLogger @786
?RegisterSrmmLog@@YAPEAXPEAUCMPlugin@@PEBDPEB_WP6APEAVCSrmmLogWindow@@AEAVCMsgDialog@@@Z@Z @805 NONAME
?UnregisterSrmmLog@@YAXPEAX@Z @806 NONAME
?GetType@CRtfLogWindow@@UEAAHXZ @807 NONAME
-WebSocket_InitHeader @809 NONAME
-WebSocket_Connect @810 NONAME
+?run@MWebSocket@@QEAAXXZ @809 NONAME
+?connect@MWebSocket@@QEAAPEAUMHttpResponse@@PEAXPEBDPEBUMHttpHeaders@@@Z @810 NONAME
?log@CSrmmBaseDialog@@QEBAPEAVCSrmmLogWindow@@XZ @811 NONAME
Netlib_Dump @812 NONAME
?ProtoBroadcastAsync@PROTO_INTERFACE@@QEAAXIHHPEAX_J@Z @813 NONAME
@@ -736,8 +736,8 @@ Netlib_GetTlsUnique @831 NONAME
?IsPluginOnWhiteList@@YA_NPEBD@Z @866 NONAME
?SetPluginOnWhiteList@@YAXPEBD_N@Z @867 NONAME
Chat_Mute @868
-WebSocket_SendBinary @869 NONAME
-WebSocket_SendText @870 NONAME
+?sendBinary@MWebSocket@@QEAAXPEBX_K@Z @869 NONAME
+?sendText@MWebSocket@@QEAAXPEBD@Z @870 NONAME
?OnContactAdded@PROTO_INTERFACE@@UEAAXI@Z @871 NONAME
Netlib_SslConnect @872 NONAME
Netlib_SslFree @873 NONAME
@@ -977,3 +977,11 @@ g_hevEventSetJson @1103 NONAME
?addReaction@EventInfo@DB@@QEAAXPEBD@Z @1107 NONAME
?delReaction@EventInfo@DB@@QEAAXPEBD@Z @1108 NONAME
?getText@EventInfo@DB@@QEBAPEA_WXZ @1109 NONAME
+??0MWebSocket@@QEAA@XZ @1110 NONAME
+??1MWebSocket@@QEAA@XZ @1111 NONAME
+??_7MWebSocket@@6B@ @1112 NONAME
+?terminate@MWebSocket@@QEAAXXZ @1113 NONAME
+??0MJsonWebSocket@@QEAA@XZ @1114 NONAME
+??1MJsonWebSocket@@QEAA@XZ @1115 NONAME
+??_7MJsonWebSocket@@6B@ @1116 NONAME
+?process@MJsonWebSocket@@EEAAXPEBE_K@Z @1117 NONAME
diff --git a/src/mir_app/src/netlib_websocket.cpp b/src/mir_app/src/netlib_websocket.cpp
index 9ce6f72be9..b92d3c168f 100644
--- a/src/mir_app/src/netlib_websocket.cpp
+++ b/src/mir_app/src/netlib_websocket.cpp
@@ -27,8 +27,73 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#include "../../libs/zlib/src/zlib.h"
-MIR_APP_DLL(MHttpResponse*) WebSocket_Connect(HNETLIBUSER nlu, const char *szHost, const MHttpHeaders *pHeaders)
+struct WSHeader
{
+ WSHeader()
+ {
+ memset(this, 0, sizeof(*this));
+ }
+
+ bool bIsFinal, bIsMasked;
+ int opCode, firstByte;
+ size_t payloadSize, headerSize;
+
+ bool init(const void *pData, size_t bufSize)
+ {
+ if (bufSize < 2)
+ return false;
+
+ auto *buf = (const uint8_t *)pData;
+ bIsFinal = (buf[0] & 0x80) != 0;
+ bIsMasked = (buf[1] & 0x80) != 0;
+ opCode = buf[0] & 0x0F;
+ firstByte = buf[1] & 0x7F;
+ headerSize = 2 + (firstByte == 0x7E ? 2 : 0) + (firstByte == 0x7F ? 8 : 0) + (bIsMasked ? 4 : 0);
+ if (bufSize < headerSize)
+ return false;
+
+ uint64_t tmpSize = 0;
+ switch (firstByte) {
+ case 0x7F:
+ tmpSize += ((uint64_t)buf[2]) << 56;
+ tmpSize += ((uint64_t)buf[3]) << 48;
+ tmpSize += ((uint64_t)buf[4]) << 40;
+ tmpSize += ((uint64_t)buf[5]) << 32;
+ tmpSize += ((uint64_t)buf[6]) << 24;
+ tmpSize += ((uint64_t)buf[7]) << 16;
+ tmpSize += ((uint64_t)buf[8]) << 8;
+ tmpSize += ((uint64_t)buf[9]);
+ break;
+
+ case 0x7E:
+ tmpSize += ((uint64_t)buf[2]) << 8;
+ tmpSize += ((uint64_t)buf[3]);
+ break;
+
+ default:
+ tmpSize = firstByte;
+ }
+ payloadSize = tmpSize;
+ return true;
+ }
+};
+
+MWebSocket::MWebSocket()
+{
+}
+
+MWebSocket::~MWebSocket()
+{
+ if (m_hConn)
+ Netlib_CloseHandle(m_hConn);
+}
+
+////////////////////////////////////////////////////////////////////////////////////////
+
+MHttpResponse* MWebSocket::connect(HANDLE nlu, const char *szHost, const MHttpHeaders *pHeaders)
+{
+ m_nlu = (HNETLIBUSER)nlu;
+
CMStringA tmpHost(szHost);
// connect to the gateway server
@@ -54,57 +119,19 @@ MIR_APP_DLL(MHttpResponse*) WebSocket_Connect(HNETLIBUSER nlu, const char *szHos
for (auto &it: *pHeaders)
nlhr.AddHeader(it->szName, it->szValue);
- auto *pReply = Netlib_HttpTransaction(nlu, &nlhr);
+ auto *pReply = Netlib_HttpTransaction(m_nlu, &nlhr);
if (pReply == nullptr) {
- Netlib_Logf(nlu, "Error establishing WebSocket connection to %s, send failed", tmpHost.c_str());
+ Netlib_Logf(m_nlu, "Error establishing WebSocket connection to %s, send failed", tmpHost.c_str());
return nullptr;
}
+ m_hConn = pReply->nlc;
if (pReply->resultCode != 101)
- Netlib_Logf(nlu, "Error establishing WebSocket connection to %s, status %d", tmpHost.c_str(), pReply->resultCode);
+ Netlib_Logf(m_nlu, "Error establishing WebSocket connection to %s, status %d", tmpHost.c_str(), pReply->resultCode);
return pReply;
}
-MIR_APP_DLL(bool) WebSocket_InitHeader(WSHeader &hdr, const void *pData, size_t bufSize)
-{
- if (bufSize < 2)
- return false;
-
- auto *buf = (const uint8_t *)pData;
- hdr.bIsFinal = (buf[0] & 0x80) != 0;
- hdr.bIsMasked = (buf[1] & 0x80) != 0;
- hdr.opCode = buf[0] & 0x0F;
- hdr.firstByte = buf[1] & 0x7F;
- hdr.headerSize = 2 + (hdr.firstByte == 0x7E ? 2 : 0) + (hdr.firstByte == 0x7F ? 8 : 0) + (hdr.bIsMasked ? 4 : 0);
- if (bufSize < hdr.headerSize)
- return false;
-
- uint64_t tmpSize = 0;
- switch (hdr.firstByte) {
- case 0x7F:
- tmpSize += ((uint64_t)buf[2]) << 56;
- tmpSize += ((uint64_t)buf[3]) << 48;
- tmpSize += ((uint64_t)buf[4]) << 40;
- tmpSize += ((uint64_t)buf[5]) << 32;
- tmpSize += ((uint64_t)buf[6]) << 24;
- tmpSize += ((uint64_t)buf[7]) << 16;
- tmpSize += ((uint64_t)buf[8]) << 8;
- tmpSize += ((uint64_t)buf[9]);
- break;
-
- case 0x7E:
- tmpSize += ((uint64_t)buf[2]) << 8;
- tmpSize += ((uint64_t)buf[3]);
- break;
-
- default:
- tmpSize = hdr.firstByte;
- }
- hdr.payloadSize = tmpSize;
- return true;
-}
-
/////////////////////////////////////////////////////////////////////////////////////////
static void WebSocket_Send(HNETLIBCONN nlc, const void *pData, int64_t dataLen, uint8_t opCode)
@@ -156,14 +183,131 @@ static void WebSocket_Send(HNETLIBCONN nlc, const void *pData, int64_t dataLen,
Netlib_Send(nlc, sendBuf, int(dataLen + cbLen), MSG_NODUMP);
}
-MIR_APP_DLL(void) WebSocket_SendText(HNETLIBCONN nlc, const char *pData)
+void MWebSocket::sendText(const char *pData)
+{
+ if (m_hConn && pData) {
+ mir_cslock lck(m_cs);
+ WebSocket_Send(m_hConn, pData, strlen(pData), 1);
+ }
+}
+
+void MWebSocket::sendBinary(const void *pData, size_t dataLen)
{
- if (nlc && pData)
- WebSocket_Send(nlc, pData, strlen(pData), 1);
+ if (m_hConn && pData) {
+ mir_cslock lck(m_cs);
+ WebSocket_Send(m_hConn, pData, dataLen, 2);
+ }
+}
+
+/////////////////////////////////////////////////////////////////////////////////////////
+
+void MWebSocket::terminate()
+{
+ m_bTerminated = true;
+
+ if (m_hConn)
+ Netlib_Shutdown(m_hConn);
+}
+
+/////////////////////////////////////////////////////////////////////////////////////////
+
+void MWebSocket::run()
+{
+ int offset = 0;
+ MBinBuffer netbuf;
+
+ while (!m_bTerminated) {
+ unsigned char buf[2048];
+ int bufSize = Netlib_Recv(m_hConn, (char *)buf + offset, _countof(buf) - offset, MSG_NODUMP);
+ if (bufSize == 0) {
+ Netlib_Log(m_nlu, "Websocket connection gracefully closed");
+ break;
+ }
+ if (bufSize < 0) {
+ Netlib_Log(m_nlu, "Websocket connection error, exiting");
+ break;
+ }
+
+ WSHeader hdr;
+ if (!hdr.init(buf, bufSize)) {
+ offset += bufSize;
+ continue;
+ }
+ offset = 0;
+
+ // we have some additional data, not only opcode
+ if ((size_t)bufSize > hdr.headerSize) {
+ size_t currPacketSize = bufSize - hdr.headerSize;
+ netbuf.append(buf, bufSize);
+ while (currPacketSize < hdr.payloadSize) {
+ int result = Netlib_Recv(m_hConn, (char *)buf, _countof(buf), MSG_NODUMP);
+ if (result == 0) {
+ Netlib_Log(m_nlu, "Websocket connection gracefully closed");
+ break;
+ }
+ if (result < 0) {
+ Netlib_Log(m_nlu, "Websocket connection error, exiting");
+ break;
+ }
+ currPacketSize += result;
+ netbuf.append(buf, result);
+ }
+ }
+
+ // read all payloads from the current buffer, one by one
+ size_t prevSize = 0;
+ while (true) {
+ switch (hdr.opCode) {
+ case 0: // text packet
+ case 1: // binary packet
+ case 2: // continuation
+ if (hdr.bIsFinal) {
+ // process a packet here
+ process((uint8_t*)netbuf.data() + hdr.headerSize, (int)hdr.payloadSize);
+ }
+ break;
+
+ case 8: // close
+ Netlib_Log(m_nlu, "server required to exit");
+ m_bTerminated = true; // simply reconnect, don't exit
+ break;
+
+ case 9: // ping
+ Netlib_Log(m_nlu, "ping received");
+ Netlib_Send(m_hConn, (char *)buf + hdr.headerSize, bufSize - int(hdr.headerSize), 0);
+ break;
+ }
+
+ if (hdr.bIsFinal)
+ netbuf.remove(hdr.headerSize + hdr.payloadSize);
+
+ if (netbuf.length() == 0)
+ break;
+
+ // if we have not enough data for header, continue reading
+ if (!hdr.init(netbuf.data(), netbuf.length()))
+ break;
+
+ // if we have not enough data for data, continue reading
+ if (hdr.headerSize + hdr.payloadSize > netbuf.length())
+ break;
+
+ if (prevSize == netbuf.length()) {
+ netbuf.remove(prevSize);
+ break;
+ }
+
+ prevSize = netbuf.length();
+ }
+ }
}
-MIR_APP_DLL(void) WebSocket_SendBinary(HNETLIBCONN nlc, const void *pData, size_t dataLen)
+void MJsonWebSocket::process(const uint8_t *buf, size_t cbLen)
{
- if (nlc && pData)
- WebSocket_Send(nlc, pData, dataLen, 2);
+ CMStringA szJson((char*)buf, (int)cbLen);
+ Netlib_Logf(m_nlu, "JSON received:\n%s", szJson.c_str());
+
+ JSONNode root = JSONNode::parse(szJson);
+ if (root)
+ process(root);
}