/************************************************************************************************* * Threading devices * Copyright (C) 2009-2012 FAL Labs * This file is part of Kyoto Cabinet. * This program is free software: you can redistribute it and/or modify it under the terms of * the GNU General Public License as published by the Free Software Foundation, either version * 3 of the License, or any later version. * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU General Public License for more details. * You should have received a copy of the GNU General Public License along with this program. * If not, see . *************************************************************************************************/ #ifndef _KCTHREAD_H // duplication check #define _KCTHREAD_H #include #include namespace kyotocabinet { // common namespace /** * Threading device. */ class Thread { public: /** * Default constructor. */ explicit Thread(); /** * Destructor. */ virtual ~Thread(); /** * Perform the concrete process. */ virtual void run() = 0; /** * Start the thread. */ void start(); /** * Wait for the thread to finish. */ void join(); /** * Put the thread in the detached state. */ void detach(); /** * Terminate the running thread. */ static void exit(); /** * Yield the processor from the current thread. */ static void yield(); /** * Chill the processor by suspending execution for a quick moment. */ static void chill(); /** * Suspend execution of the current thread. * @param sec the interval of the suspension in seconds. * @return true on success, or false on failure. */ static bool sleep(double sec); /** * Get the hash value of the current thread. * @return the hash value of the current thread. */ static int64_t hash(); private: /** Dummy constructor to forbid the use. */ Thread(const Thread&); /** Dummy Operator to forbid the use. */ Thread& operator =(const Thread&); /** Opaque pointer. */ void* opq_; }; /** * Basic mutual exclusion device. */ class Mutex { friend class CondVar; public: /** * Type of the behavior for double locking. */ enum Type { FAST, ///< no operation ERRORCHECK, ///< check error RECURSIVE ///< allow recursive locking }; /** * Default constructor. */ explicit Mutex(); /** * Constructor. * @param type the behavior for double locking. */ explicit Mutex(Type type); /** * Destructor. */ ~Mutex(); /** * Get the lock. */ void lock(); /** * Try to get the lock. * @return true on success, or false on failure. */ bool lock_try(); /** * Try to get the lock. * @param sec the interval of the suspension in seconds. * @return true on success, or false on failure. */ bool lock_try(double sec); /** * Release the lock. */ void unlock(); private: /** Dummy constructor to forbid the use. */ Mutex(const Mutex&); /** Dummy Operator to forbid the use. */ Mutex& operator =(const Mutex&); /** Opaque pointer. */ void* opq_; }; /** * Scoped mutex device. */ class ScopedMutex { public: /** * Constructor. * @param mutex a mutex to lock the block. */ explicit ScopedMutex(Mutex* mutex) : mutex_(mutex) { _assert_(mutex); mutex_->lock(); } /** * Destructor. */ ~ScopedMutex() { _assert_(true); mutex_->unlock(); } private: /** Dummy constructor to forbid the use. */ ScopedMutex(const ScopedMutex&); /** Dummy Operator to forbid the use. */ ScopedMutex& operator =(const ScopedMutex&); /** The inner device. */ Mutex* mutex_; }; /** * Slotted mutex device. */ class SlottedMutex { public: /** * Constructor. * @param slotnum the number of slots. */ explicit SlottedMutex(size_t slotnum); /** * Destructor. */ ~SlottedMutex(); /** * Get the lock of a slot. * @param idx the index of a slot. */ void lock(size_t idx); /** * Release the lock of a slot. * @param idx the index of a slot. */ void unlock(size_t idx); /** * Get the locks of all slots. */ void lock_all(); /** * Release the locks of all slots. */ void unlock_all(); private: /** Opaque pointer. */ void* opq_; }; /** * Condition variable. */ class CondVar { public: /** * Default constructor. */ explicit CondVar(); /** * Destructor. */ ~CondVar(); /** * Wait for the signal. * @param mutex a locked mutex. */ void wait(Mutex* mutex); /** * Wait for the signal. * @param mutex a locked mutex. * @param sec the interval of the suspension in seconds. * @return true on catched signal, or false on timeout. */ bool wait(Mutex* mutex, double sec); /** * Send the wake-up signal to another waiting thread. * @note The mutex used for the wait method should be locked by the caller. */ void signal(); /** * Send the wake-up signals to all waiting threads. * @note The mutex used for the wait method should be locked by the caller. */ void broadcast(); private: /** Dummy constructor to forbid the use. */ CondVar(const CondVar&); /** Dummy Operator to forbid the use. */ CondVar& operator =(const CondVar&); /** Opaque pointer. */ void* opq_; }; /** * Assosiative condition variable. */ class CondMap { private: struct Count; struct Slot; /** An alias of set of counters. */ typedef std::map CountMap; /** The number of slots. */ static const size_t SLOTNUM = 64; public: /** * Default constructor. */ explicit CondMap() : slots_() { _assert_(true); } /** * Destructor. */ ~CondMap() { _assert_(true); } /** * Wait for a signal. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param sec the interval of the suspension in seconds. If it is negative, no timeout is * specified. * @return true on catched signal, or false on timeout. */ bool wait(const char* kbuf, size_t ksiz, double sec = -1) { _assert_(kbuf && ksiz <= MEMMAXSIZ); std::string key(kbuf, ksiz); return wait(key, sec); } /** * Wait for a signal by a key. * @param key the key. * @param sec the interval of the suspension in seconds. If it is negative, no timeout is * specified. * @return true on catched signal, or false on timeout. */ bool wait(const std::string& key, double sec = -1) { _assert_(true); double invtime = sec < 0 ? 1.0 : sec; double curtime = time(); double endtime = curtime + (sec < 0 ? UINT32MAX : sec); Slot* slot = get_slot(key); while (curtime < endtime) { ScopedMutex lock(&slot->mutex); CountMap::iterator cit = slot->counter.find(key); if (cit == slot->counter.end()) { Count cnt = { 1, false }; slot->counter[key] = cnt; } else { cit->second.num++; } slot->cond.wait(&slot->mutex, invtime); cit = slot->counter.find(key); cit->second.num--; if (cit->second.wake > 0) { cit->second.wake--; if (cit->second.num < 1) slot->counter.erase(cit); return true; } if (cit->second.num < 1) slot->counter.erase(cit); curtime = time(); } return false; } /** * Send a wake-up signal to another thread waiting by a key. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @return the number of threads waiting for the signal. */ size_t signal(const char* kbuf, size_t ksiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ); std::string key(kbuf, ksiz); return signal(key); } /** * Send a wake-up signal to another thread waiting by a key. * @param key the key. * @return the number of threads waiting for the signal. */ size_t signal(const std::string& key) { _assert_(true); Slot* slot = get_slot(key); ScopedMutex lock(&slot->mutex); CountMap::iterator cit = slot->counter.find(key); if (cit == slot->counter.end() || cit->second.num < 1) return 0; if (cit->second.wake < cit->second.num) cit->second.wake++; slot->cond.broadcast(); return cit->second.num; } /** * Send wake-up signals to all threads waiting by a key. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @return the number of threads waiting for the signal. */ size_t broadcast(const char* kbuf, size_t ksiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ); std::string key(kbuf, ksiz); return broadcast(key); } /** * Send wake-up signals to all threads waiting by a key. * @param key the key. * @return the number of threads waiting for the signal. */ size_t broadcast(const std::string& key) { _assert_(true); Slot* slot = get_slot(key); ScopedMutex lock(&slot->mutex); CountMap::iterator cit = slot->counter.find(key); if (cit == slot->counter.end() || cit->second.num < 1) return 0; cit->second.wake = cit->second.num; slot->cond.broadcast(); return cit->second.num; } /** * Send wake-up signals to all threads waiting by each key. * @return the number of threads waiting for the signal. */ size_t broadcast_all() { _assert_(true); size_t sum = 0; for (size_t i = 0; i < SLOTNUM; i++) { Slot* slot = slots_ + i; ScopedMutex lock(&slot->mutex); CountMap::iterator cit = slot->counter.begin(); CountMap::iterator citend = slot->counter.end(); while (cit != citend) { if (cit->second.num > 0) { cit->second.wake = cit->second.num; sum += cit->second.num; } slot->cond.broadcast(); ++cit; } } return sum; } /** * Get the total number of threads waiting for signals. * @return the total number of threads waiting for signals. */ size_t count() { _assert_(true); size_t sum = 0; for (size_t i = 0; i < SLOTNUM; i++) { Slot* slot = slots_ + i; ScopedMutex lock(&slot->mutex); CountMap::iterator cit = slot->counter.begin(); CountMap::iterator citend = slot->counter.end(); while (cit != citend) { sum += cit->second.num; ++cit; } } return sum; } private: /** * Counter for waiting threads. */ struct Count { size_t num; ///< waiting threads size_t wake; ///< waking threads }; /** * Slot of a key space. */ struct Slot { CondVar cond; ///< condition variable Mutex mutex; ///< mutex CountMap counter; ///< counter }; /** * Get the slot corresponding a key. * @param key the key. * @return the slot corresponding the key. */ Slot* get_slot(const std::string& key) { return slots_ + hashmurmur(key.data(), key.size()) % SLOTNUM; } /** The slot array. */ Slot slots_[SLOTNUM]; }; /** * Key of thread specific data. */ class TSDKey { public: /** * Default constructor. */ explicit TSDKey(); /** * Constructor. * @param dstr the destructor for the value. */ explicit TSDKey(void (*dstr)(void*)); /** * Destructor. */ ~TSDKey(); /** * Set the value. * @param ptr an arbitrary pointer. */ void set(void* ptr); /** * Get the value. * @return the value. */ void* get() const ; private: /** Opaque pointer. */ void* opq_; }; /** * Smart pointer to thread specific data. */ template class TSD { public: /** * Default constructor. */ explicit TSD() : key_(delete_value) { _assert_(true); } /** * Destructor. */ ~TSD() { _assert_(true); TYPE* obj = (TYPE*)key_.get(); if (obj) { delete obj; key_.set(NULL); } } /** * Dereference operator. * @return the reference to the inner object. */ TYPE& operator *() { _assert_(true); TYPE* obj = (TYPE*)key_.get(); if (!obj) { obj = new TYPE; key_.set(obj); } return *obj; } /** * Member reference operator. * @return the pointer to the inner object. */ TYPE* operator ->() { _assert_(true); TYPE* obj = (TYPE*)key_.get(); if (!obj) { obj = new TYPE; key_.set(obj); } return obj; } /** * Cast operator to the original type. * @return the copy of the inner object. */ operator TYPE() const { _assert_(true); TYPE* obj = (TYPE*)key_.get(); if (!obj) return TYPE(); return *obj; } private: /** * Delete the inner object. * @param obj the inner object. */ static void delete_value(void* obj) { _assert_(true); delete (TYPE*)obj; } /** Dummy constructor to forbid the use. */ TSD(const TSD&); /** Dummy Operator to forbid the use. */ TSD& operator =(const TSD&); /** Key of thread specific data. */ TSDKey key_; }; /** * Integer with atomic operations. */ class AtomicInt64 { public: /** * Default constructor. */ explicit AtomicInt64() : value_(0) { _assert_(true); } /** * Copy constructor. * @param src the source object. */ AtomicInt64(const AtomicInt64& src) : value_(src.get()) { _assert_(true); }; /** * Constructor. * @param num the initial value. */ AtomicInt64(int64_t num) : value_(num) { _assert_(true); } /** * Destructor. */ ~AtomicInt64() { _assert_(true); } /** * Set the new value. * @param val the new value. * @return the old value. */ int64_t set(int64_t val); /** * Add a value. * @param val the additional value. * @return the old value. */ int64_t add(int64_t val); /** * Perform compare-and-swap. * @param oval the old value. * @param nval the new value. * @return true on success, or false on failure. */ bool cas(int64_t oval, int64_t nval); /** * Get the current value. * @return the current value. */ int64_t get() const; /** * Assignment operator from the self type. * @param right the right operand. * @return the reference to itself. */ AtomicInt64& operator =(const AtomicInt64& right) { _assert_(true); if (&right == this) return *this; set(right.get()); return *this; } /** * Assignment operator from integer. * @param right the right operand. * @return the reference to itself. */ AtomicInt64& operator =(const int64_t& right) { _assert_(true); set(right); return *this; } /** * Cast operator to integer. * @return the current value. */ operator int64_t() const { _assert_(true); return get(); } /** * Summation assignment operator by integer. * @param right the right operand. * @return the reference to itself. */ AtomicInt64& operator +=(int64_t right) { _assert_(true); add(right); return *this; } /** * Subtraction assignment operator by integer. * @param right the right operand. * @return the reference to itself. */ AtomicInt64& operator -=(int64_t right) { _assert_(true); add(-right); return *this; } /** * Secure the least value * @param val the least value * @return the current value. */ int64_t secure_least(int64_t val) { _assert_(true); while (true) { int64_t cur = get(); if (cur >= val) return cur; if (cas(cur, val)) break; } return val; } private: /** The value. */ volatile int64_t value_; }; /** * Task queue device. */ class TaskQueue { public: class Task; private: class WorkerThread; /** An alias of list of tasks. */ typedef std::list TaskList; public: /** * Interface of a task. */ class Task { friend class TaskQueue; public: /** * Default constructor. */ explicit Task() : id_(0), thid_(0), aborted_(false) { _assert_(true); } /** * Destructor. */ virtual ~Task() { _assert_(true); } /** * Get the ID number of the task. * @return the ID number of the task, which is incremented from 1. */ uint64_t id() const { _assert_(true); return id_; } /** * Get the ID number of the worker thread. * @return the ID number of the worker thread. It is from 0 to less than the number of * worker threads. */ uint32_t thread_id() const { _assert_(true); return thid_; } /** * Check whether the thread is to be aborted. * @return true if the thread is to be aborted, or false if not. */ bool aborted() const { _assert_(true); return aborted_; } private: /** The task ID number. */ uint64_t id_; /** The thread ID number. */ uint64_t thid_; /** The flag to be aborted. */ bool aborted_; }; /** * Default Constructor. */ TaskQueue() : thary_(NULL), thnum_(0), tasks_(), count_(0), mutex_(), cond_(), seed_(0) { _assert_(true); } /** * Destructor. */ virtual ~TaskQueue() { _assert_(true); } /** * Process a task. * @param task a task object. */ virtual void do_task(Task* task) = 0; /** * Process the starting event. * @param task a task object. * @note This is called for each thread on starting. */ virtual void do_start(const Task* task) { _assert_(true); } /** * Process the finishing event. * @param task a task object. * @note This is called for each thread on finishing. */ virtual void do_finish(const Task* task) { _assert_(true); } /** * Start the task queue. * @param thnum the number of worker threads. */ void start(size_t thnum) { _assert_(thnum > 0 && thnum <= MEMMAXSIZ); thary_ = new WorkerThread[thnum]; for (size_t i = 0; i < thnum; i++) { thary_[i].id_ = i; thary_[i].queue_ = this; thary_[i].start(); } thnum_ = thnum; } /** * Finish the task queue. * @note This function blocks until all tasks in the queue are popped. */ void finish() { _assert_(true); mutex_.lock(); TaskList::iterator it = tasks_.begin(); TaskList::iterator itend = tasks_.end(); while (it != itend) { Task* task = *it; task->aborted_ = true; ++it; } cond_.broadcast(); mutex_.unlock(); Thread::yield(); for (double wsec = 1.0 / CLOCKTICK; true; wsec *= 2) { mutex_.lock(); if (tasks_.empty()) { mutex_.unlock(); break; } mutex_.unlock(); if (wsec > 1.0) wsec = 1.0; Thread::sleep(wsec); } mutex_.lock(); for (size_t i = 0; i < thnum_; i++) { thary_[i].aborted_ = true; } cond_.broadcast(); mutex_.unlock(); for (size_t i = 0; i < thnum_; i++) { thary_[i].join(); } delete[] thary_; } /** * Add a task. * @param task a task object. * @return the number of tasks in the queue. */ int64_t add_task(Task* task) { _assert_(task); mutex_.lock(); task->id_ = ++seed_; tasks_.push_back(task); int64_t count = ++count_; cond_.signal(); mutex_.unlock(); return count; } /** * Get the number of tasks in the queue. * @return the number of tasks in the queue. */ int64_t count() { _assert_(true); mutex_.lock(); int64_t count = count_; mutex_.unlock(); return count; } private: /** * Implementation of the worker thread. */ class WorkerThread : public Thread { friend class TaskQueue; public: explicit WorkerThread() : id_(0), queue_(NULL), aborted_(false) { _assert_(true); } private: void run() { _assert_(true); Task* stask = new Task; stask->thid_ = id_; queue_->do_start(stask); delete stask; bool empty = false; while (true) { queue_->mutex_.lock(); if (aborted_) { queue_->mutex_.unlock(); break; } if (empty) queue_->cond_.wait(&queue_->mutex_, 1.0); Task * task = NULL; if (queue_->tasks_.empty()) { empty = true; } else { task = queue_->tasks_.front(); task->thid_ = id_; queue_->tasks_.pop_front(); queue_->count_--; empty = false; } queue_->mutex_.unlock(); if (task) queue_->do_task(task); } Task* ftask = new Task; ftask->thid_ = id_; ftask->aborted_ = true; queue_->do_finish(ftask); delete ftask; } uint32_t id_; TaskQueue* queue_; Task* task_; bool aborted_; }; /** Dummy constructor to forbid the use. */ TaskQueue(const TaskQueue&); /** Dummy Operator to forbid the use. */ TaskQueue& operator =(const TaskQueue&); /** The array of worker threads. */ WorkerThread* thary_; /** The number of worker threads. */ size_t thnum_; /** The list of tasks. */ TaskList tasks_; /** The number of the tasks. */ int64_t count_; /** The mutex for the task list. */ Mutex mutex_; /** The condition variable for the task list. */ CondVar cond_; /** The seed of ID numbers. */ uint64_t seed_; }; } // common namespace #endif // duplication check // END OF FILE