summaryrefslogtreecommitdiff
path: root/plugins/!NotAdopted/VypressChat/msgloop.c
blob: 6815394c07ebaca646ad5f03ac5a136eaa7d156b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
/*
 * Miranda-IM Vypress Chat/quickChat plugins
 * Copyright (C) Saulius Menkevicius
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 * $Id: msgloop.c,v 1.13 2005/03/08 17:42:25 bobas Exp $
 */

#include "main.h"
#include "msgloop.h"
#include "pthread.h"

/* constants and struct defs
 */

/* max packet output rate is 2 packets per sec */
#define PACKET_OUTPUT_RATE	500

struct msgloop_message_list_entry {
	struct msgloop_message_list_entry * prev, * next;
	vqp_msg_t msg;
};

/* static data
 */
static PAPCFUNC	s_lpfnMsgHandleApc;
static HANDLE	s_hMainThread, s_hLoopThread;
static BOOL	s_fStopLoop = TRUE,
		s_fLoopStarted;
static HANDLE	s_hStopWaitEvent;

static UINT_PTR s_outputTimer;
static struct msgloop_message_list_entry
		* s_outputListHead, * s_outputListTail;
static BOOL	s_outputSentThisTick = FALSE, s_outputSentThisTickInTimer;

/* static routines
 */

static void CALLBACK msgloop_apc_stop(ULONG_PTR unused)
{
	s_fStopLoop = TRUE;
}

static void CALLBACK msgloop_apc_send_msg(ULONG_PTR vqmsg)
{
	vqp_link_send((vqp_msg_t)vqmsg);
	vqp_msg_free((vqp_msg_t)vqmsg);
}

static void msgloop_loop(void * vqlink)
{
	WSAEVENT hReadEvent;
	SOCKET rx_socket = vqp_link_rx_socket((vqp_link_t) vqlink);

	s_fLoopStarted = TRUE;

	hReadEvent = WSACreateEvent();
	WSAEventSelect(rx_socket, hReadEvent, FD_READ);

	while(!Miranda_Terminated() && !s_fStopLoop) {
		DWORD nEvent = WSAWaitForMultipleEvents(
				1, &hReadEvent, FALSE, WSA_INFINITE, TRUE);

		if(nEvent==WSA_WAIT_EVENT_0) {
			vqp_msg_t msg;

			WSAResetEvent(hReadEvent);
	
			if(!vqp_link_recv((vqp_link_t) vqlink, &msg)) {
				QueueUserAPC(
					s_lpfnMsgHandleApc,
					s_hMainThread, (ULONG_PTR)msg);
			}
		}
	}

	WSACloseEvent(hReadEvent);

	/* wait for pending user APC */
	while(SleepEx(10, TRUE) == WAIT_IO_COMPLETION) /* nothing */;

	/* signal that we've finished */
	SetEvent(s_hStopWaitEvent);
}

static void CALLBACK msgloop_message_output_timer_cb(
	HWND hwnd, UINT nmsg, UINT_PTR idevent, DWORD dwtime)
{
	if(s_outputListTail && (!s_outputSentThisTick
					|| (s_outputSentThisTick && s_outputSentThisTickInTimer)))
	{
		struct msgloop_message_list_entry * entry = s_outputListTail;
		QueueUserAPC(msgloop_apc_send_msg, s_hLoopThread, (ULONG_PTR)entry->msg);

		s_outputListTail = entry->next;
		if(s_outputListTail) {
			s_outputListTail->prev = NULL;
		} else {
			s_outputListHead = NULL;
		}

		free(entry);

		s_outputSentThisTick = TRUE;
		s_outputSentThisTickInTimer = TRUE;
	} else {
		s_outputSentThisTick = FALSE;
	}
}

/* exported routines
 */

/* msgloop_start:
 *	starts msg loop
 */
void msgloop_start(vqp_link_t vqlink, PAPCFUNC lpfMsgHandlerApc)
{
	ASSERT_RETURNIFFAIL(s_fStopLoop);
			
	s_lpfnMsgHandleApc = lpfMsgHandlerApc;
	DuplicateHandle(
		GetCurrentProcess(), GetCurrentThread(),
		GetCurrentProcess(), &s_hMainThread,
		THREAD_SET_CONTEXT, FALSE, 0);
	
	s_fStopLoop = FALSE;
	s_fLoopStarted = FALSE;	/* this will be set in msgloop_loop() */
	s_hLoopThread = (HANDLE)pthread_create(msgloop_loop, (void*)vqlink);

	/* start scheduler timer:
	 *	this will make packets to be send no faster than specific rate
	 */
	s_outputTimer = SetTimer(NULL, 0, PACKET_OUTPUT_RATE, msgloop_message_output_timer_cb);
	s_outputSentThisTick = FALSE;
}

/* msgloop_stop:
 *	msgloop_stop should be called from miranda's gui thread
 *	to stop the qcs_msg loop.
 */
void msgloop_stop()
{
	struct msgloop_message_list_entry * entry;

	ASSERT_RETURNIFFAIL(!s_fStopLoop);

	/* stop packet scheduler timer
	 */
	KillTimer(NULL, s_outputTimer);
	s_outputTimer = 0;

	/* signal the thread to stop */
	s_hStopWaitEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
	QueueUserAPC(msgloop_apc_stop, s_hLoopThread, (ULONG_PTR)NULL);
	
	/* wait for the thread to stop
	 * and handle all the queued APCs
	 */
	while(WaitForSingleObjectEx(s_hStopWaitEvent, 10, TRUE) == WAIT_IO_COMPLETION)
		/* do nothing */ ;

	CloseHandle(s_hStopWaitEvent);
	CloseHandle(s_hMainThread);

	/* do cleanup */
	s_fLoopStarted = FALSE;
	
	/* send all the queued packets that didn't get sent
	 * by packet scheduler in msgloop_packet_output_timer_c()
	 */
	entry = s_outputListTail;
	while(entry) {
		struct msgloop_message_list_entry * next = entry->next;
		vqp_link_send(entry->msg);
		vqp_msg_free(entry->msg);
		free(entry);

		entry = next;
	}
	s_outputListTail = NULL;
	s_outputListHead = NULL;
}

/* msgloop_send:
 *	msgloop_send should be used to send a message from miranda's
 *	gui thread to send a message with qcs_msg loop.
 */
void msgloop_send(vqp_msg_t msg, int never_wait)
{
	ASSERT_RETURNIFFAIL(VALIDPTR(msg));

	/* send this message */
	if(s_fLoopStarted && s_outputTimer) {
		/* check if we've sent a packet this tick,
		 * and if we did, we'll have to wait for the next tick
		 */
		if(s_outputSentThisTick && !never_wait) {
			/* add msg to message queue for scheduler timer
			 */
			struct msgloop_message_list_entry * entry;
			entry = malloc(sizeof(struct msgloop_message_list_entry));
			entry->msg = msg;
			entry->next = NULL;
			entry->prev = s_outputListHead;
			if(entry->prev) {
				entry->prev->next = entry;
			} else {
				s_outputListTail = entry;
			}
			s_outputListHead = entry;
		} else {
			QueueUserAPC(msgloop_apc_send_msg, s_hLoopThread, (ULONG_PTR)msg);
			s_outputSentThisTick = TRUE;
		}
	} else {
		if(s_outputSentThisTick && !never_wait) {
			Sleep(PACKET_OUTPUT_RATE);
		}
		vqp_link_send(msg);
		vqp_msg_free(msg);
		s_outputSentThisTick = TRUE;
		s_outputSentThisTickInTimer = FALSE;
	}
}

/* msgloop_send_to:
 *	sends message to the specified address only
 */
void msgloop_send_to(vqp_msg_t msg, int never_wait, vqp_addr_t addr) {
	vqp_msg_set_dst_addr(msg, addr);
	msgloop_send(msg, never_wait);
}