diff options
Diffstat (limited to 'protocols/Discord')
-rw-r--r-- | protocols/Discord/src/connection.cpp | 4 | ||||
-rw-r--r-- | protocols/Discord/src/gateway.cpp | 155 | ||||
-rw-r--r-- | protocols/Discord/src/proto.cpp | 4 | ||||
-rw-r--r-- | protocols/Discord/src/proto.h | 16 | ||||
-rw-r--r-- | protocols/Discord/src/voice_client.cpp | 159 |
5 files changed, 59 insertions, 279 deletions
diff --git a/protocols/Discord/src/connection.cpp b/protocols/Discord/src/connection.cpp index 96cc32ee25..ff7e765ecb 100644 --- a/protocols/Discord/src/connection.cpp +++ b/protocols/Discord/src/connection.cpp @@ -93,8 +93,8 @@ void CDiscordProto::ShutdownSession() pMfaDialog->Close();
if (m_hWorkerThread)
SetEvent(m_evRequestsQueue);
- if (m_hGatewayConnection)
- Netlib_Shutdown(m_hGatewayConnection);
+ if (m_ws)
+ m_ws->terminate();
if (m_hAPIConnection)
Netlib_Shutdown(m_hAPIConnection);
diff --git a/protocols/Discord/src/gateway.cpp b/protocols/Discord/src/gateway.cpp index dc7d1da26e..d6b7846eeb 100644 --- a/protocols/Discord/src/gateway.cpp +++ b/protocols/Discord/src/gateway.cpp @@ -22,12 +22,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. bool CDiscordProto::GatewaySend(const JSONNode &pRoot)
{
- if (m_hGatewayConnection == nullptr)
+ if (m_ws == nullptr)
return false;
json_string szText = pRoot.write();
debugLogA("Gateway send: %s", szText.c_str());
- WebSocket_SendText(m_hGatewayConnection, szText.c_str());
+ m_ws->sendText(szText.c_str());
return true;
}
@@ -51,7 +51,8 @@ bool CDiscordProto::GatewayThreadWorker() hdrs.AddHeader("Cookie", m_szWSCookie);
}
- NLHR_PTR pReply(WebSocket_Connect(m_hGatewayNetlibUser, m_szGateway + "/?encoding=json&v=8", &hdrs));
+ JsonWebSocket<CDiscordProto> ws(this);
+ NLHR_PTR pReply(ws.connect(m_hGatewayNetlibUser, m_szGateway + "/?encoding=json&v=8", &hdrs));
if (pReply == nullptr) {
debugLogA("Gateway connection failed, exiting");
return false;
@@ -72,164 +73,62 @@ bool CDiscordProto::GatewayThreadWorker() // succeeded!
debugLogA("Gateway connection succeeded");
- m_hGatewayConnection = pReply->nlc;
-
- bool bExit = false;
- int offset = 0;
- MBinBuffer netbuf;
-
- while (!bExit) {
- if (m_bTerminated)
- break;
-
- unsigned char buf[2048];
- int bufSize = Netlib_Recv(m_hGatewayConnection, (char*)buf + offset, _countof(buf) - offset, MSG_NODUMP);
- if (bufSize == 0) {
- debugLogA("Gateway connection gracefully closed");
- bExit = !m_bTerminated;
- break;
- }
- if (bufSize < 0) {
- debugLogA("Gateway connection error, exiting");
- break;
- }
- WSHeader hdr;
- if (!WebSocket_InitHeader(hdr, buf, bufSize)) {
- offset += bufSize;
- continue;
- }
- offset = 0;
-
- debugLogA("Got packet: buffer = %d, opcode = %d, headerSize = %d, final = %d, masked = %d", bufSize, hdr.opCode, hdr.headerSize, hdr.bIsFinal, hdr.bIsMasked);
-
- // we have some additional data, not only opcode
- if ((size_t)bufSize > hdr.headerSize) {
- size_t currPacketSize = bufSize - hdr.headerSize;
- netbuf.append(buf, bufSize);
- while (currPacketSize < hdr.payloadSize) {
- int result = Netlib_Recv(m_hGatewayConnection, (char*)buf, _countof(buf), MSG_NODUMP);
- if (result == 0) {
- debugLogA("Gateway connection gracefully closed");
- bExit = !m_bTerminated;
- break;
- }
- if (result < 0) {
- debugLogA("Gateway connection error, exiting");
- break;
- }
- currPacketSize += result;
- netbuf.append(buf, result);
- }
- }
-
- // read all payloads from the current buffer, one by one
- size_t prevSize = 0;
- while (true) {
- switch (hdr.opCode) {
- case 0: // text packet
- case 1: // binary packet
- case 2: // continuation
- if (hdr.bIsFinal) {
- // process a packet here
- CMStringA szJson((char*)netbuf.data() + hdr.headerSize, (int)hdr.payloadSize);
- debugLogA("JSON received:\n%s", szJson.c_str());
- JSONNode root = JSONNode::parse(szJson);
- if (root)
- bExit = GatewayProcess(root);
- }
- break;
-
- case 8: // close
- debugLogA("server required to exit");
- bExit = true; // simply reconnect, don't exit
- break;
-
- case 9: // ping
- debugLogA("ping received");
- Netlib_Send(m_hGatewayConnection, (char*)buf + hdr.headerSize, bufSize - int(hdr.headerSize), 0);
- break;
- }
-
- if (hdr.bIsFinal)
- netbuf.remove(hdr.headerSize + hdr.payloadSize);
-
- if (netbuf.length() == 0)
- break;
-
- // if we have not enough data for header, continue reading
- if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length()))
- break;
-
- // if we have not enough data for data, continue reading
- if (hdr.headerSize + hdr.payloadSize > netbuf.length())
- break;
-
- debugLogA("Got inner packet: buffer = %d, opcode = %d, headerSize = %d, payloadSize = %d, final = %d, masked = %d", netbuf.length(), hdr.opCode, hdr.headerSize, hdr.payloadSize, hdr.bIsFinal, hdr.bIsMasked);
- if (prevSize == netbuf.length()) {
- netbuf.remove(prevSize);
- debugLogA("dropping current packet, exiting");
- break;
- }
-
- prevSize = netbuf.length();
- }
- }
+ m_ws = &ws;
+ ws.run();
- Netlib_CloseHandle(m_hGatewayConnection);
- m_hGatewayConnection = nullptr;
- return bExit;
+ m_ws = nullptr;
+ return true;
}
//////////////////////////////////////////////////////////////////////////////////////
// handles server commands
-bool CDiscordProto::GatewayProcess(const JSONNode &pRoot)
+void JsonWebSocket<CDiscordProto>::process(const JSONNode &json)
{
- int opCode = pRoot["op"].as_int();
+ int opCode = json["op"].as_int();
switch (opCode) {
case OPCODE_DISPATCH: // process incoming command
{
- int iSeq = pRoot["s"].as_int();
+ int iSeq = json["s"].as_int();
if (iSeq != 0)
- m_iGatewaySeq = iSeq;
+ p->m_iGatewaySeq = iSeq;
+
+ CMStringW wszCommand = json["t"].as_mstring();
+ p->debugLogA("got a server command to dispatch: %S", wszCommand.c_str());
- CMStringW wszCommand = pRoot["t"].as_mstring();
- debugLogA("got a server command to dispatch: %S", wszCommand.c_str());
-
- GatewayHandlerFunc pFunc = GetHandler(wszCommand);
+ GatewayHandlerFunc pFunc = p->GetHandler(wszCommand);
if (pFunc)
- (this->*pFunc)(pRoot["d"]);
+ (p->*pFunc)(json["d"]);
}
break;
case OPCODE_RECONNECT: // we need to reconnect asap
- debugLogA("we need to reconnect, leaving worker thread");
- return true;
+ p->debugLogA("we need to reconnect, leaving worker thread");
+ p->m_bTerminated = true;
+ return;
case OPCODE_INVALID_SESSION: // session invalidated
- if (pRoot["d"].as_bool()) // session can be resumed
- GatewaySendResume();
+ if (json["d"].as_bool()) // session can be resumed
+ p->GatewaySendResume();
else {
Sleep(5000); // 5 seconds - recommended timeout
- GatewaySendIdentify();
+ p->GatewaySendIdentify();
}
break;
case OPCODE_HELLO: // hello
- m_iHartbeatInterval = pRoot["d"]["heartbeat_interval"].as_int();
+ p->m_iHartbeatInterval = json["d"]["heartbeat_interval"].as_int();
- GatewaySendIdentify();
+ p->GatewaySendIdentify();
break;
-
+
case OPCODE_HEARTBEAT_ACK: // heartbeat ack
break;
default:
- debugLogA("ACHTUNG! Unknown opcode: %d, report it to developer", opCode);
+ p->debugLogA("ACHTUNG! Unknown opcode: %d, report it to developer", opCode);
}
-
- return false;
}
//////////////////////////////////////////////////////////////////////////////////////
diff --git a/protocols/Discord/src/proto.cpp b/protocols/Discord/src/proto.cpp index aef8da827b..7cfeba6d56 100644 --- a/protocols/Discord/src/proto.cpp +++ b/protocols/Discord/src/proto.cpp @@ -206,8 +206,8 @@ void CDiscordProto::OnShutdown() for (auto &it : arGuilds)
it->SaveToFile();
- if (m_hGatewayConnection)
- Netlib_Shutdown(m_hGatewayConnection);
+ if (m_ws)
+ m_ws->terminate();
if (g_plugin.bVoiceService)
CallService(MS_VOICESERVICE_UNREGISTER, (WPARAM)m_szModuleName, 0);
diff --git a/protocols/Discord/src/proto.h b/protocols/Discord/src/proto.h index 5f3e88d4c9..2946e677f8 100644 --- a/protocols/Discord/src/proto.h +++ b/protocols/Discord/src/proto.h @@ -173,8 +173,9 @@ class CDiscordVoiceCall : public MZeroedObject CDiscordProto *ppro;
+ JsonWebSocket<CDiscordVoiceCall> *m_ws;
+
CTimer m_timer;
- HNETLIBCONN m_hConn;
HNETLIBBIND m_hBind;
mir_cs m_cs;
bool m_bTerminated;
@@ -189,10 +190,8 @@ class CDiscordVoiceCall : public MZeroedObject void onTimer(CTimer *)
{
- if (m_hConn) {
- JSONNode d; d << INT_PARAM("", rand());
- write(3, d);
- }
+ JSONNode d; d << INT_PARAM("", rand());
+ write(3, d);
}
static void GetConnection(HNETLIBCONN /*hNewConnection*/, uint32_t /*dwRemoteIP*/, void *pExtra);
@@ -209,7 +208,6 @@ public: return !m_bTerminated;
}
- bool connect(HNETLIBUSER);
void write(int op, JSONNode &d);
void process(const JSONNode &node);
@@ -298,6 +296,7 @@ class CDiscordProto : public PROTO<CDiscordProto> friend class CMfaDialog;
friend class CGroupchatInviteDlg;
friend class CDiscordVoiceCall;
+ friend class JsonWebSocket<CDiscordProto>;
class CDiscordProtoImpl
{
@@ -370,13 +369,12 @@ class CDiscordProto : public PROTO<CDiscordProto> m_szWSCookie; // cookie used for establishing websocket connection
HNETLIBUSER m_hGatewayNetlibUser; // the separate netlib user handle for gateways
- HNETLIBCONN m_hGatewayConnection; // gateway connection
-
+ JsonWebSocket<CDiscordProto> *m_ws;
+
void __cdecl GatewayThread(void*);
bool GatewayThreadWorker(void);
bool GatewaySend(const JSONNode &pNode);
- bool GatewayProcess(const JSONNode &pNode);
void GatewaySendGuildInfo(CDiscordGuild *pGuild);
void GatewaySendHeartbeat(void);
diff --git a/protocols/Discord/src/voice_client.cpp b/protocols/Discord/src/voice_client.cpp index 09e1908007..29b051799f 100644 --- a/protocols/Discord/src/voice_client.cpp +++ b/protocols/Discord/src/voice_client.cpp @@ -32,6 +32,8 @@ CDiscordVoiceCall::CDiscordVoiceCall(CDiscordProto *pOwner) : CDiscordVoiceCall::~CDiscordVoiceCall() { + delete m_ws; + m_timer.StopSafe(); if (m_encoder) { @@ -44,50 +46,17 @@ CDiscordVoiceCall::~CDiscordVoiceCall() m_repacketizer = nullptr; } - if (m_hConn) { - Netlib_CloseHandle(m_hConn); - m_hConn = nullptr; - } - if (m_hBind) { Netlib_CloseHandle(m_hBind); m_hBind = nullptr; } } -bool CDiscordVoiceCall::connect(HNETLIBUSER hServer) -{ - int nLoops = 0; - time_t lastLoopTime = time(0); - - while (true) { - time_t currTime = time(0); - if (currTime - lastLoopTime > 3) - nLoops = 0; - - nLoops++; - if (nLoops > 5) - break; - - lastLoopTime = currTime; - - MHttpHeaders hdrs; - hdrs.AddHeader("Origin", "https://discord.com"); - - NLHR_PTR pReply(WebSocket_Connect(hServer, szEndpoint + "/?encoding=json&v=8", &hdrs)); - if (pReply && pReply->resultCode == 101) { - m_hConn = pReply->nlc; - return true; - } - - SleepEx(5000, TRUE); - } - - return false; -} - void CDiscordVoiceCall::write(int op, JSONNode &d) { + if (m_ws == nullptr) + return; + d.set_name("d"); JSONNode payload; @@ -97,7 +66,12 @@ void CDiscordVoiceCall::write(int op, JSONNode &d) ppro->debugLogA("Voice JSON sent: %s", json.c_str()); mir_cslock lck(m_cs); - WebSocket_SendText(m_hConn, json.c_str()); + m_ws->sendText(json.c_str()); +} + +void JsonWebSocket<CDiscordVoiceCall>::process(const JSONNode &json) +{ + p->process(json); } ////////////////////////////////////////////////////////////////////////////////////////// @@ -186,111 +160,20 @@ void CDiscordProto::VoiceClientThread(void *param) auto *pCall = (CDiscordVoiceCall *)param; debugLogA("Entering voice websocket thread"); - if (!pCall->connect(m_hGatewayNetlibUser)) { + MHttpHeaders hdrs; + hdrs.AddHeader("Origin", "https://discord.com"); + + JsonWebSocket<CDiscordVoiceCall> ws(pCall); + + NLHR_PTR pReply(ws.connect(m_hGatewayNetlibUser, pCall->szEndpoint + "/?encoding=json&v=8", &hdrs)); + if (!pReply || pReply->resultCode != 101) { debugLogA("Voice gateway connection failed, exiting"); return; } - int offset = 0; - MBinBuffer netbuf; - - while (*pCall) { - if (m_bTerminated) - break; - - unsigned char buf[2048]; - int bufSize = Netlib_Recv(pCall->m_hConn, (char *)buf + offset, _countof(buf) - offset, MSG_NODUMP); - if (bufSize == 0) { - debugLogA("Voice gateway connection gracefully closed"); - pCall->m_bTerminated = !m_bTerminated; - break; - } - if (bufSize < 0) { - debugLogA("Voice gateway connection error, exiting"); - break; - } - - WSHeader hdr; - if (!WebSocket_InitHeader(hdr, buf, bufSize)) { - offset += bufSize; - continue; - } - offset = 0; - - // we have some additional data, not only opcode - if ((size_t)bufSize > hdr.headerSize) { - size_t currPacketSize = bufSize - hdr.headerSize; - netbuf.append(buf, bufSize); - while (currPacketSize < hdr.payloadSize) { - int result = Netlib_Recv(pCall->m_hConn, (char *)buf, _countof(buf), MSG_NODUMP); - if (result == 0) { - debugLogA("Voice gateway connection gracefully closed"); - pCall->m_bTerminated = !m_bTerminated; - break; - } - if (result < 0) { - debugLogA("Voice gateway connection error, exiting"); - break; - } - currPacketSize += result; - netbuf.append(buf, result); - } - } - - // read all payloads from the current buffer, one by one - size_t prevSize = 0; - while (true) { - switch (hdr.opCode) { - case 0: // text packet - case 1: // binary packet - case 2: // continuation - if (hdr.bIsFinal) { - // process a packet here - CMStringA szJson((char *)netbuf.data() + hdr.headerSize, (int)hdr.payloadSize); - debugLogA("Voice JSON received:\n%s", szJson.c_str()); - JSONNode root = JSONNode::parse(szJson); - if (root) - pCall->process(root); - } - break; - - case 8: // close - debugLogA("Voice server required to exit"); - pCall->m_bTerminated = true; // simply reconnect, don't exit - break; - - case 9: // ping - debugLogA("ping received"); - Netlib_Send(pCall->m_hConn, (char *)buf + hdr.headerSize, bufSize - int(hdr.headerSize), 0); - break; - } - - if (hdr.bIsFinal) - netbuf.remove(hdr.headerSize + hdr.payloadSize); - - if (netbuf.length() == 0) - break; - - // if we have not enough data for header, continue reading - if (!WebSocket_InitHeader(hdr, netbuf.data(), netbuf.length())) - break; - - // if we have not enough data for data, continue reading - if (hdr.headerSize + hdr.payloadSize > netbuf.length()) - break; - - if (prevSize == netbuf.length()) { - netbuf.remove(prevSize); - debugLogA("dropping current packet, exiting"); - break; - } - - prevSize = netbuf.length(); - } - } - - debugLogA("Exiting voice websocket thread"); - Netlib_CloseHandle(pCall->m_hConn); pCall->m_hConn = 0; + pCall->m_ws = &ws; + ws.run(); + pCall->m_ws = nullptr; } ///////////////////////////////////////////////////////////////////////////////////////// |