/* * libjingle * Copyright 2004--2005, Google Inc. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #ifdef POSIX extern "C" { #include } #endif #include "talk/base/common.h" #include "talk/base/logging.h" #include "talk/base/thread.h" #include "talk/base/time.h" #define MSDEV_SET_THREAD_NAME 0x406D1388 namespace talk_base { ThreadManager g_thmgr; #ifdef POSIX pthread_key_t ThreadManager::key_; ThreadManager::ThreadManager() { pthread_key_create(&key_, NULL); main_thread_ = new Thread(); SetCurrent(main_thread_); } ThreadManager::~ThreadManager() { pthread_key_delete(key_); delete main_thread_; } Thread *ThreadManager::CurrentThread() { return (Thread *)pthread_getspecific(key_); } void ThreadManager::SetCurrent(Thread *thread) { pthread_setspecific(key_, thread); } #endif #ifdef WIN32 DWORD ThreadManager::key_; ThreadManager::ThreadManager() { key_ = TlsAlloc(); main_thread_ = new Thread(); SetCurrent(main_thread_); } ThreadManager::~ThreadManager() { TlsFree(key_); delete main_thread_; } Thread *ThreadManager::CurrentThread() { return (Thread *)TlsGetValue(key_); } void ThreadManager::SetCurrent(Thread *thread) { TlsSetValue(key_, thread); } #endif void ThreadManager::Add(Thread *thread) { CritScope cs(&crit_); threads_.push_back(thread); } void ThreadManager::Remove(Thread *thread) { CritScope cs(&crit_); threads_.erase(std::remove(threads_.begin(), threads_.end(), thread), threads_.end()); } Thread::Thread(SocketServer* ss) : MessageQueue(ss), priority_(PRIORITY_NORMAL) { g_thmgr.Add(this); started_ = false; has_sends_ = false; } Thread::~Thread() { Stop(); if (active_) Clear(NULL); g_thmgr.Remove(this); } #ifdef POSIX void Thread::Start() { pthread_attr_t attr; pthread_attr_init(&attr); if (priority_ == PRIORITY_IDLE) { struct sched_param param; pthread_attr_getschedparam(&attr, ¶m); param.sched_priority = 15; // +15 = pthread_attr_setschedparam(&attr, ¶m); } pthread_create(&thread_, &attr, PreRun, this); started_ = true; } void Thread::Join() { if (started_) { void *pv; pthread_join(thread_, &pv); } } #endif #ifdef WIN32 typedef struct tagTHREADNAME_INFO { DWORD dwType; LPCSTR szName; DWORD dwThreadID; DWORD dwFlags; } THREADNAME_INFO; void SetThreadName( DWORD dwThreadID, LPCSTR szThreadName) { THREADNAME_INFO info; { info.dwType = 0x1000; info.szName = szThreadName; info.dwThreadID = dwThreadID; info.dwFlags = 0; } __try { RaiseException(MSDEV_SET_THREAD_NAME, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info ); } __except(EXCEPTION_CONTINUE_EXECUTION) { } } void Thread::Start() { DWORD flags = 0; if (priority_ != PRIORITY_NORMAL) { flags = CREATE_SUSPENDED; } thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, this, flags, NULL); if (thread_) { if (priority_ != PRIORITY_NORMAL) { if (priority_ == PRIORITY_IDLE) { ::SetThreadPriority(thread_, THREAD_PRIORITY_IDLE); } ::ResumeThread(thread_); } } started_ = true; } void Thread::Join() { if (started_) { WaitForSingleObject(thread_, INFINITE); CloseHandle(thread_); started_ = false; } } #endif void *Thread::PreRun(void *pv) { Thread *thread = (Thread *)pv; ThreadManager::SetCurrent(thread); #if defined(WIN32) && defined(_DEBUG) char buf[256]; _snprintf(buf, sizeof(buf), "Thread 0x%.8x", thread); SetThreadName(GetCurrentThreadId(), buf); #endif thread->Run(); return NULL; } void Thread::Run() { ProcessMessages(kForever); } void Thread::Stop() { MessageQueue::Stop(); Join(); } void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { if (fStop_) return; // Sent messages are sent to the MessageHandler directly, in the context // of "thread", like Win32 SendMessage. If in the right context, // call the handler directly. Message msg; msg.phandler = phandler; msg.message_id = id; msg.pdata = pdata; if (IsCurrent()) { phandler->OnMessage(&msg); return; } AutoThread thread; Thread *current_thread = Thread::Current(); ASSERT(current_thread != NULL); // AutoThread ensures this bool ready = false; { CritScope cs(&crit_); EnsureActive(); _SendMessage smsg; smsg.thread = current_thread; smsg.msg = msg; smsg.ready = &ready; sendlist_.push_back(smsg); has_sends_ = true; } // Wait for a reply ss_->WakeUp(); bool waited = false; while (!ready) { current_thread->ReceiveSends(); current_thread->socketserver()->Wait(kForever, false); waited = true; } // Our Wait loop above may have consumed some WakeUp events for this // MessageQueue, that weren't relevant to this Send. Losing these WakeUps can // cause problems for some SocketServers. // // Concrete example: // Win32SocketServer on thread A calls Send on thread B. While processing the // message, thread B Posts a message to A. We consume the wakeup for that // Post while waiting for the Send to complete, which means that when we exit // this loop, we need to issue another WakeUp, or else the Posted message // won't be processed in a timely manner. if (waited) { current_thread->socketserver()->WakeUp(); } } void Thread::ReceiveSends() { // Before entering critical section, check boolean. if (!has_sends_) return; // Receive a sent message. Cleanup scenarios: // - thread sending exits: We don't allow this, since thread can exit // only via Join, so Send must complete. // - thread receiving exits: Wakeup/set ready in Thread::Clear() // - object target cleared: Wakeup/set ready in Thread::Clear() crit_.Enter(); while (!sendlist_.empty()) { _SendMessage smsg = sendlist_.front(); sendlist_.pop_front(); crit_.Leave(); smsg.msg.phandler->OnMessage(&smsg.msg); crit_.Enter(); *smsg.ready = true; smsg.thread->socketserver()->WakeUp(); } has_sends_ = false; crit_.Leave(); } void Thread::Clear(MessageHandler *phandler, uint32 id) { CritScope cs(&crit_); // Remove messages on sendlist_ with phandler // Object target cleared: remove from send list, wakeup/set ready // if sender not NULL. std::list<_SendMessage>::iterator iter = sendlist_.begin(); while (iter != sendlist_.end()) { _SendMessage smsg = *iter; if (phandler == NULL || smsg.msg.phandler == phandler) { if (id == (uint32)-1 || smsg.msg.message_id == id) { iter = sendlist_.erase(iter); *smsg.ready = true; smsg.thread->socketserver()->WakeUp(); continue; } } ++iter; } MessageQueue::Clear(phandler, id); } bool Thread::ProcessMessages(int cmsLoop) { uint32 msEnd; if (cmsLoop != kForever) msEnd = GetMillisecondCount() + cmsLoop; int cmsNext = cmsLoop; while (true) { Message msg; if (!Get(&msg, cmsNext)) return false; Dispatch(&msg); if (cmsLoop != kForever) { uint32 msCur = GetMillisecondCount(); if (msCur >= msEnd) return true; cmsNext = msEnd - msCur; } } } AutoThread::AutoThread(SocketServer* ss) : Thread(ss) { if (!ThreadManager::CurrentThread()) { ThreadManager::SetCurrent(this); } } AutoThread::~AutoThread() { if (ThreadManager::CurrentThread() == this) { ThreadManager::SetCurrent(NULL); } } } // namespace talk_base