diff options
Diffstat (limited to 'plugins/Dbx_kyoto/src/kyotocabinet/kcthread.h')
-rw-r--r-- | plugins/Dbx_kyoto/src/kyotocabinet/kcthread.h | 933 |
1 files changed, 0 insertions, 933 deletions
diff --git a/plugins/Dbx_kyoto/src/kyotocabinet/kcthread.h b/plugins/Dbx_kyoto/src/kyotocabinet/kcthread.h deleted file mode 100644 index e570411ec4..0000000000 --- a/plugins/Dbx_kyoto/src/kyotocabinet/kcthread.h +++ /dev/null @@ -1,933 +0,0 @@ -/************************************************************************************************* - * Threading devices - * Copyright (C) 2009-2012 FAL Labs - * This file is part of Kyoto Cabinet. - * This program is free software: you can redistribute it and/or modify it under the terms of - * the GNU General Public License as published by the Free Software Foundation, either version - * 3 of the License, or any later version. - * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; - * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. - * You should have received a copy of the GNU General Public License along with this program. - * If not, see <http://www.gnu.org/licenses/>. - *************************************************************************************************/ - - -#ifndef _KCTHREAD_H // duplication check -#define _KCTHREAD_H - -#include <kccommon.h> -#include <kcutil.h> - -namespace kyotocabinet { // common namespace - - -/** - * Threading device. - */ -class Thread { - public: - /** - * Default constructor. - */ - explicit Thread(); - /** - * Destructor. - */ - virtual ~Thread(); - /** - * Perform the concrete process. - */ - virtual void run() = 0; - /** - * Start the thread. - */ - void start(); - /** - * Wait for the thread to finish. - */ - void join(); - /** - * Put the thread in the detached state. - */ - void detach(); - /** - * Terminate the running thread. - */ - static void exit(); - /** - * Yield the processor from the current thread. - */ - static void yield(); - /** - * Chill the processor by suspending execution for a quick moment. - */ - static void chill(); - /** - * Suspend execution of the current thread. - * @param sec the interval of the suspension in seconds. - * @return true on success, or false on failure. - */ - static bool sleep(double sec); - /** - * Get the hash value of the current thread. - * @return the hash value of the current thread. - */ - static int64_t hash(); - private: - /** Dummy constructor to forbid the use. */ - Thread(const Thread&); - /** Dummy Operator to forbid the use. */ - Thread& operator =(const Thread&); - /** Opaque pointer. */ - void* opq_; -}; - - -/** - * Basic mutual exclusion device. - */ -class Mutex { - friend class CondVar; - public: - /** - * Type of the behavior for double locking. - */ - enum Type { - FAST, ///< no operation - ERRORCHECK, ///< check error - RECURSIVE ///< allow recursive locking - }; - /** - * Default constructor. - */ - explicit Mutex(); - /** - * Constructor. - * @param type the behavior for double locking. - */ - explicit Mutex(Type type); - /** - * Destructor. - */ - ~Mutex(); - /** - * Get the lock. - */ - void lock(); - /** - * Try to get the lock. - * @return true on success, or false on failure. - */ - bool lock_try(); - /** - * Try to get the lock. - * @param sec the interval of the suspension in seconds. - * @return true on success, or false on failure. - */ - bool lock_try(double sec); - /** - * Release the lock. - */ - void unlock(); - private: - /** Dummy constructor to forbid the use. */ - Mutex(const Mutex&); - /** Dummy Operator to forbid the use. */ - Mutex& operator =(const Mutex&); - /** Opaque pointer. */ - void* opq_; -}; - - -/** - * Scoped mutex device. - */ -class ScopedMutex { - public: - /** - * Constructor. - * @param mutex a mutex to lock the block. - */ - explicit ScopedMutex(Mutex* mutex) : mutex_(mutex) { - _assert_(mutex); - mutex_->lock(); - } - /** - * Destructor. - */ - ~ScopedMutex() { - _assert_(true); - mutex_->unlock(); - } - private: - /** Dummy constructor to forbid the use. */ - ScopedMutex(const ScopedMutex&); - /** Dummy Operator to forbid the use. */ - ScopedMutex& operator =(const ScopedMutex&); - /** The inner device. */ - Mutex* mutex_; -}; - - -/** - * Slotted mutex device. - */ -class SlottedMutex { - public: - /** - * Constructor. - * @param slotnum the number of slots. - */ - explicit SlottedMutex(size_t slotnum); - /** - * Destructor. - */ - ~SlottedMutex(); - /** - * Get the lock of a slot. - * @param idx the index of a slot. - */ - void lock(size_t idx); - /** - * Release the lock of a slot. - * @param idx the index of a slot. - */ - void unlock(size_t idx); - /** - * Get the locks of all slots. - */ - void lock_all(); - /** - * Release the locks of all slots. - */ - void unlock_all(); - private: - /** Opaque pointer. */ - void* opq_; -}; - - -/** - * Condition variable. - */ -class CondVar { - public: - /** - * Default constructor. - */ - explicit CondVar(); - /** - * Destructor. - */ - ~CondVar(); - /** - * Wait for the signal. - * @param mutex a locked mutex. - */ - void wait(Mutex* mutex); - /** - * Wait for the signal. - * @param mutex a locked mutex. - * @param sec the interval of the suspension in seconds. - * @return true on catched signal, or false on timeout. - */ - bool wait(Mutex* mutex, double sec); - /** - * Send the wake-up signal to another waiting thread. - * @note The mutex used for the wait method should be locked by the caller. - */ - void signal(); - /** - * Send the wake-up signals to all waiting threads. - * @note The mutex used for the wait method should be locked by the caller. - */ - void broadcast(); - private: - /** Dummy constructor to forbid the use. */ - CondVar(const CondVar&); - /** Dummy Operator to forbid the use. */ - CondVar& operator =(const CondVar&); - /** Opaque pointer. */ - void* opq_; -}; - - -/** - * Assosiative condition variable. - */ -class CondMap { - private: - struct Count; - struct Slot; - /** An alias of set of counters. */ - typedef std::map<std::string, Count> CountMap; - /** The number of slots. */ - static const size_t SLOTNUM = 64; - public: - /** - * Default constructor. - */ - explicit CondMap() : slots_() { - _assert_(true); - } - /** - * Destructor. - */ - ~CondMap() { - _assert_(true); - } - /** - * Wait for a signal. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @param sec the interval of the suspension in seconds. If it is negative, no timeout is - * specified. - * @return true on catched signal, or false on timeout. - */ - bool wait(const char* kbuf, size_t ksiz, double sec = -1) { - _assert_(kbuf && ksiz <= MEMMAXSIZ); - std::string key(kbuf, ksiz); - return wait(key, sec); - } - /** - * Wait for a signal by a key. - * @param key the key. - * @param sec the interval of the suspension in seconds. If it is negative, no timeout is - * specified. - * @return true on catched signal, or false on timeout. - */ - bool wait(const std::string& key, double sec = -1) { - _assert_(true); - double invtime = sec < 0 ? 1.0 : sec; - double curtime = time(); - double endtime = curtime + (sec < 0 ? UINT32MAX : sec); - Slot* slot = get_slot(key); - while (curtime < endtime) { - ScopedMutex lock(&slot->mutex); - CountMap::iterator cit = slot->counter.find(key); - if (cit == slot->counter.end()) { - Count cnt = { 1, false }; - slot->counter[key] = cnt; - } else { - cit->second.num++; - } - slot->cond.wait(&slot->mutex, invtime); - cit = slot->counter.find(key); - cit->second.num--; - if (cit->second.wake > 0) { - cit->second.wake--; - if (cit->second.num < 1) slot->counter.erase(cit); - return true; - } - if (cit->second.num < 1) slot->counter.erase(cit); - curtime = time(); - } - return false; - } - /** - * Send a wake-up signal to another thread waiting by a key. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @return the number of threads waiting for the signal. - */ - size_t signal(const char* kbuf, size_t ksiz) { - _assert_(kbuf && ksiz <= MEMMAXSIZ); - std::string key(kbuf, ksiz); - return signal(key); - } - /** - * Send a wake-up signal to another thread waiting by a key. - * @param key the key. - * @return the number of threads waiting for the signal. - */ - size_t signal(const std::string& key) { - _assert_(true); - Slot* slot = get_slot(key); - ScopedMutex lock(&slot->mutex); - CountMap::iterator cit = slot->counter.find(key); - if (cit == slot->counter.end() || cit->second.num < 1) return 0; - if (cit->second.wake < cit->second.num) cit->second.wake++; - slot->cond.broadcast(); - return cit->second.num; - } - /** - * Send wake-up signals to all threads waiting by a key. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @return the number of threads waiting for the signal. - */ - size_t broadcast(const char* kbuf, size_t ksiz) { - _assert_(kbuf && ksiz <= MEMMAXSIZ); - std::string key(kbuf, ksiz); - return broadcast(key); - } - /** - * Send wake-up signals to all threads waiting by a key. - * @param key the key. - * @return the number of threads waiting for the signal. - */ - size_t broadcast(const std::string& key) { - _assert_(true); - Slot* slot = get_slot(key); - ScopedMutex lock(&slot->mutex); - CountMap::iterator cit = slot->counter.find(key); - if (cit == slot->counter.end() || cit->second.num < 1) return 0; - cit->second.wake = cit->second.num; - slot->cond.broadcast(); - return cit->second.num; - } - /** - * Send wake-up signals to all threads waiting by each key. - * @return the number of threads waiting for the signal. - */ - size_t broadcast_all() { - _assert_(true); - size_t sum = 0; - for (size_t i = 0; i < SLOTNUM; i++) { - Slot* slot = slots_ + i; - ScopedMutex lock(&slot->mutex); - CountMap::iterator cit = slot->counter.begin(); - CountMap::iterator citend = slot->counter.end(); - while (cit != citend) { - if (cit->second.num > 0) { - cit->second.wake = cit->second.num; - sum += cit->second.num; - } - slot->cond.broadcast(); - ++cit; - } - } - return sum; - } - /** - * Get the total number of threads waiting for signals. - * @return the total number of threads waiting for signals. - */ - size_t count() { - _assert_(true); - size_t sum = 0; - for (size_t i = 0; i < SLOTNUM; i++) { - Slot* slot = slots_ + i; - ScopedMutex lock(&slot->mutex); - CountMap::iterator cit = slot->counter.begin(); - CountMap::iterator citend = slot->counter.end(); - while (cit != citend) { - sum += cit->second.num; - ++cit; - } - } - return sum; - } - private: - /** - * Counter for waiting threads. - */ - struct Count { - size_t num; ///< waiting threads - size_t wake; ///< waking threads - }; - /** - * Slot of a key space. - */ - struct Slot { - CondVar cond; ///< condition variable - Mutex mutex; ///< mutex - CountMap counter; ///< counter - }; - /** - * Get the slot corresponding a key. - * @param key the key. - * @return the slot corresponding the key. - */ - Slot* get_slot(const std::string& key) { - return slots_ + hashmurmur(key.data(), key.size()) % SLOTNUM; - } - /** The slot array. */ - Slot slots_[SLOTNUM]; -}; - - -/** - * Key of thread specific data. - */ -class TSDKey { - public: - /** - * Default constructor. - */ - explicit TSDKey(); - /** - * Constructor. - * @param dstr the destructor for the value. - */ - explicit TSDKey(void (*dstr)(void*)); - /** - * Destructor. - */ - ~TSDKey(); - /** - * Set the value. - * @param ptr an arbitrary pointer. - */ - void set(void* ptr); - /** - * Get the value. - * @return the value. - */ - void* get() const ; - private: - /** Opaque pointer. */ - void* opq_; -}; - - -/** - * Smart pointer to thread specific data. - */ -template <class TYPE> -class TSD { - public: - /** - * Default constructor. - */ - explicit TSD() : key_(delete_value) { - _assert_(true); - } - /** - * Destructor. - */ - ~TSD() { - _assert_(true); - TYPE* obj = (TYPE*)key_.get(); - if (obj) { - delete obj; - key_.set(NULL); - } - } - /** - * Dereference operator. - * @return the reference to the inner object. - */ - TYPE& operator *() { - _assert_(true); - TYPE* obj = (TYPE*)key_.get(); - if (!obj) { - obj = new TYPE; - key_.set(obj); - } - return *obj; - } - /** - * Member reference operator. - * @return the pointer to the inner object. - */ - TYPE* operator ->() { - _assert_(true); - TYPE* obj = (TYPE*)key_.get(); - if (!obj) { - obj = new TYPE; - key_.set(obj); - } - return obj; - } - /** - * Cast operator to the original type. - * @return the copy of the inner object. - */ - operator TYPE() const { - _assert_(true); - TYPE* obj = (TYPE*)key_.get(); - if (!obj) return TYPE(); - return *obj; - } - private: - /** - * Delete the inner object. - * @param obj the inner object. - */ - static void delete_value(void* obj) { - _assert_(true); - delete (TYPE*)obj; - } - /** Dummy constructor to forbid the use. */ - TSD(const TSD&); - /** Dummy Operator to forbid the use. */ - TSD& operator =(const TSD&); - /** Key of thread specific data. */ - TSDKey key_; -}; - - -/** - * Integer with atomic operations. - */ -class AtomicInt64 { - public: - /** - * Default constructor. - */ - explicit AtomicInt64() : value_(0) { - _assert_(true); - } - /** - * Copy constructor. - * @param src the source object. - */ - AtomicInt64(const AtomicInt64& src) : value_(src.get()) { - _assert_(true); - }; - /** - * Constructor. - * @param num the initial value. - */ - AtomicInt64(int64_t num) : value_(num) { - _assert_(true); - } - /** - * Destructor. - */ - ~AtomicInt64() { - _assert_(true); - } - /** - * Set the new value. - * @param val the new value. - * @return the old value. - */ - int64_t set(int64_t val); - /** - * Add a value. - * @param val the additional value. - * @return the old value. - */ - int64_t add(int64_t val); - /** - * Perform compare-and-swap. - * @param oval the old value. - * @param nval the new value. - * @return true on success, or false on failure. - */ - bool cas(int64_t oval, int64_t nval); - /** - * Get the current value. - * @return the current value. - */ - int64_t get() const; - /** - * Assignment operator from the self type. - * @param right the right operand. - * @return the reference to itself. - */ - AtomicInt64& operator =(const AtomicInt64& right) { - _assert_(true); - if (&right == this) return *this; - set(right.get()); - return *this; - } - /** - * Assignment operator from integer. - * @param right the right operand. - * @return the reference to itself. - */ - AtomicInt64& operator =(const int64_t& right) { - _assert_(true); - set(right); - return *this; - } - /** - * Cast operator to integer. - * @return the current value. - */ - operator int64_t() const { - _assert_(true); - return get(); - } - /** - * Summation assignment operator by integer. - * @param right the right operand. - * @return the reference to itself. - */ - AtomicInt64& operator +=(int64_t right) { - _assert_(true); - add(right); - return *this; - } - /** - * Subtraction assignment operator by integer. - * @param right the right operand. - * @return the reference to itself. - */ - AtomicInt64& operator -=(int64_t right) { - _assert_(true); - add(-right); - return *this; - } - /** - * Secure the least value - * @param val the least value - * @return the current value. - */ - int64_t secure_least(int64_t val) { - _assert_(true); - while (true) { - int64_t cur = get(); - if (cur >= val) return cur; - if (cas(cur, val)) break; - } - return val; - } - private: - /** The value. */ - volatile int64_t value_; -}; - - -/** - * Task queue device. - */ -class TaskQueue { - public: - class Task; - private: - class WorkerThread; - /** An alias of list of tasks. */ - typedef std::list<Task*> TaskList; - public: - /** - * Interface of a task. - */ - class Task { - friend class TaskQueue; - public: - /** - * Default constructor. - */ - explicit Task() : id_(0), thid_(0), aborted_(false) { - _assert_(true); - } - /** - * Destructor. - */ - virtual ~Task() { - _assert_(true); - } - /** - * Get the ID number of the task. - * @return the ID number of the task, which is incremented from 1. - */ - uint64_t id() const { - _assert_(true); - return id_; - } - /** - * Get the ID number of the worker thread. - * @return the ID number of the worker thread. It is from 0 to less than the number of - * worker threads. - */ - uint32_t thread_id() const { - _assert_(true); - return thid_; - } - /** - * Check whether the thread is to be aborted. - * @return true if the thread is to be aborted, or false if not. - */ - bool aborted() const { - _assert_(true); - return aborted_; - } - private: - /** The task ID number. */ - uint64_t id_; - /** The thread ID number. */ - uint64_t thid_; - /** The flag to be aborted. */ - bool aborted_; - }; - /** - * Default Constructor. - */ - TaskQueue() : thary_(NULL), thnum_(0), tasks_(), count_(0), mutex_(), cond_(), seed_(0) { - _assert_(true); - } - /** - * Destructor. - */ - virtual ~TaskQueue() { - _assert_(true); - } - /** - * Process a task. - * @param task a task object. - */ - virtual void do_task(Task* task) = 0; - /** - * Process the starting event. - * @param task a task object. - * @note This is called for each thread on starting. - */ - virtual void do_start(const Task* task) { - _assert_(true); - } - /** - * Process the finishing event. - * @param task a task object. - * @note This is called for each thread on finishing. - */ - virtual void do_finish(const Task* task) { - _assert_(true); - } - /** - * Start the task queue. - * @param thnum the number of worker threads. - */ - void start(size_t thnum) { - _assert_(thnum > 0 && thnum <= MEMMAXSIZ); - thary_ = new WorkerThread[thnum]; - for (size_t i = 0; i < thnum; i++) { - thary_[i].id_ = i; - thary_[i].queue_ = this; - thary_[i].start(); - } - thnum_ = thnum; - } - /** - * Finish the task queue. - * @note This function blocks until all tasks in the queue are popped. - */ - void finish() { - _assert_(true); - mutex_.lock(); - TaskList::iterator it = tasks_.begin(); - TaskList::iterator itend = tasks_.end(); - while (it != itend) { - Task* task = *it; - task->aborted_ = true; - ++it; - } - cond_.broadcast(); - mutex_.unlock(); - Thread::yield(); - for (double wsec = 1.0 / CLOCKTICK; true; wsec *= 2) { - mutex_.lock(); - if (tasks_.empty()) { - mutex_.unlock(); - break; - } - mutex_.unlock(); - if (wsec > 1.0) wsec = 1.0; - Thread::sleep(wsec); - } - mutex_.lock(); - for (size_t i = 0; i < thnum_; i++) { - thary_[i].aborted_ = true; - } - cond_.broadcast(); - mutex_.unlock(); - for (size_t i = 0; i < thnum_; i++) { - thary_[i].join(); - } - delete[] thary_; - } - /** - * Add a task. - * @param task a task object. - * @return the number of tasks in the queue. - */ - int64_t add_task(Task* task) { - _assert_(task); - mutex_.lock(); - task->id_ = ++seed_; - tasks_.push_back(task); - int64_t count = ++count_; - cond_.signal(); - mutex_.unlock(); - return count; - } - /** - * Get the number of tasks in the queue. - * @return the number of tasks in the queue. - */ - int64_t count() { - _assert_(true); - mutex_.lock(); - int64_t count = count_; - mutex_.unlock(); - return count; - } - private: - /** - * Implementation of the worker thread. - */ - class WorkerThread : public Thread { - friend class TaskQueue; - public: - explicit WorkerThread() : id_(0), queue_(NULL), aborted_(false) { - _assert_(true); - } - private: - void run() { - _assert_(true); - Task* stask = new Task; - stask->thid_ = id_; - queue_->do_start(stask); - delete stask; - bool empty = false; - while (true) { - queue_->mutex_.lock(); - if (aborted_) { - queue_->mutex_.unlock(); - break; - } - if (empty) queue_->cond_.wait(&queue_->mutex_, 1.0); - Task * task = NULL; - if (queue_->tasks_.empty()) { - empty = true; - } else { - task = queue_->tasks_.front(); - task->thid_ = id_; - queue_->tasks_.pop_front(); - queue_->count_--; - empty = false; - } - queue_->mutex_.unlock(); - if (task) queue_->do_task(task); - } - Task* ftask = new Task; - ftask->thid_ = id_; - ftask->aborted_ = true; - queue_->do_finish(ftask); - delete ftask; - } - uint32_t id_; - TaskQueue* queue_; - Task* task_; - bool aborted_; - }; - /** Dummy constructor to forbid the use. */ - TaskQueue(const TaskQueue&); - /** Dummy Operator to forbid the use. */ - TaskQueue& operator =(const TaskQueue&); - /** The array of worker threads. */ - WorkerThread* thary_; - /** The number of worker threads. */ - size_t thnum_; - /** The list of tasks. */ - TaskList tasks_; - /** The number of the tasks. */ - int64_t count_; - /** The mutex for the task list. */ - Mutex mutex_; - /** The condition variable for the task list. */ - CondVar cond_; - /** The seed of ID numbers. */ - uint64_t seed_; -}; - - -} // common namespace - -#endif // duplication check - -// END OF FILE |