summaryrefslogtreecommitdiff
path: root/plugins/Dbx_kyoto/src/kyotocabinet/kcthread.h
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/Dbx_kyoto/src/kyotocabinet/kcthread.h')
-rw-r--r--plugins/Dbx_kyoto/src/kyotocabinet/kcthread.h933
1 files changed, 0 insertions, 933 deletions
diff --git a/plugins/Dbx_kyoto/src/kyotocabinet/kcthread.h b/plugins/Dbx_kyoto/src/kyotocabinet/kcthread.h
deleted file mode 100644
index e570411ec4..0000000000
--- a/plugins/Dbx_kyoto/src/kyotocabinet/kcthread.h
+++ /dev/null
@@ -1,933 +0,0 @@
-/*************************************************************************************************
- * Threading devices
- * Copyright (C) 2009-2012 FAL Labs
- * This file is part of Kyoto Cabinet.
- * This program is free software: you can redistribute it and/or modify it under the terms of
- * the GNU General Public License as published by the Free Software Foundation, either version
- * 3 of the License, or any later version.
- * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- * See the GNU General Public License for more details.
- * You should have received a copy of the GNU General Public License along with this program.
- * If not, see <http://www.gnu.org/licenses/>.
- *************************************************************************************************/
-
-
-#ifndef _KCTHREAD_H // duplication check
-#define _KCTHREAD_H
-
-#include <kccommon.h>
-#include <kcutil.h>
-
-namespace kyotocabinet { // common namespace
-
-
-/**
- * Threading device.
- */
-class Thread {
- public:
- /**
- * Default constructor.
- */
- explicit Thread();
- /**
- * Destructor.
- */
- virtual ~Thread();
- /**
- * Perform the concrete process.
- */
- virtual void run() = 0;
- /**
- * Start the thread.
- */
- void start();
- /**
- * Wait for the thread to finish.
- */
- void join();
- /**
- * Put the thread in the detached state.
- */
- void detach();
- /**
- * Terminate the running thread.
- */
- static void exit();
- /**
- * Yield the processor from the current thread.
- */
- static void yield();
- /**
- * Chill the processor by suspending execution for a quick moment.
- */
- static void chill();
- /**
- * Suspend execution of the current thread.
- * @param sec the interval of the suspension in seconds.
- * @return true on success, or false on failure.
- */
- static bool sleep(double sec);
- /**
- * Get the hash value of the current thread.
- * @return the hash value of the current thread.
- */
- static int64_t hash();
- private:
- /** Dummy constructor to forbid the use. */
- Thread(const Thread&);
- /** Dummy Operator to forbid the use. */
- Thread& operator =(const Thread&);
- /** Opaque pointer. */
- void* opq_;
-};
-
-
-/**
- * Basic mutual exclusion device.
- */
-class Mutex {
- friend class CondVar;
- public:
- /**
- * Type of the behavior for double locking.
- */
- enum Type {
- FAST, ///< no operation
- ERRORCHECK, ///< check error
- RECURSIVE ///< allow recursive locking
- };
- /**
- * Default constructor.
- */
- explicit Mutex();
- /**
- * Constructor.
- * @param type the behavior for double locking.
- */
- explicit Mutex(Type type);
- /**
- * Destructor.
- */
- ~Mutex();
- /**
- * Get the lock.
- */
- void lock();
- /**
- * Try to get the lock.
- * @return true on success, or false on failure.
- */
- bool lock_try();
- /**
- * Try to get the lock.
- * @param sec the interval of the suspension in seconds.
- * @return true on success, or false on failure.
- */
- bool lock_try(double sec);
- /**
- * Release the lock.
- */
- void unlock();
- private:
- /** Dummy constructor to forbid the use. */
- Mutex(const Mutex&);
- /** Dummy Operator to forbid the use. */
- Mutex& operator =(const Mutex&);
- /** Opaque pointer. */
- void* opq_;
-};
-
-
-/**
- * Scoped mutex device.
- */
-class ScopedMutex {
- public:
- /**
- * Constructor.
- * @param mutex a mutex to lock the block.
- */
- explicit ScopedMutex(Mutex* mutex) : mutex_(mutex) {
- _assert_(mutex);
- mutex_->lock();
- }
- /**
- * Destructor.
- */
- ~ScopedMutex() {
- _assert_(true);
- mutex_->unlock();
- }
- private:
- /** Dummy constructor to forbid the use. */
- ScopedMutex(const ScopedMutex&);
- /** Dummy Operator to forbid the use. */
- ScopedMutex& operator =(const ScopedMutex&);
- /** The inner device. */
- Mutex* mutex_;
-};
-
-
-/**
- * Slotted mutex device.
- */
-class SlottedMutex {
- public:
- /**
- * Constructor.
- * @param slotnum the number of slots.
- */
- explicit SlottedMutex(size_t slotnum);
- /**
- * Destructor.
- */
- ~SlottedMutex();
- /**
- * Get the lock of a slot.
- * @param idx the index of a slot.
- */
- void lock(size_t idx);
- /**
- * Release the lock of a slot.
- * @param idx the index of a slot.
- */
- void unlock(size_t idx);
- /**
- * Get the locks of all slots.
- */
- void lock_all();
- /**
- * Release the locks of all slots.
- */
- void unlock_all();
- private:
- /** Opaque pointer. */
- void* opq_;
-};
-
-
-/**
- * Condition variable.
- */
-class CondVar {
- public:
- /**
- * Default constructor.
- */
- explicit CondVar();
- /**
- * Destructor.
- */
- ~CondVar();
- /**
- * Wait for the signal.
- * @param mutex a locked mutex.
- */
- void wait(Mutex* mutex);
- /**
- * Wait for the signal.
- * @param mutex a locked mutex.
- * @param sec the interval of the suspension in seconds.
- * @return true on catched signal, or false on timeout.
- */
- bool wait(Mutex* mutex, double sec);
- /**
- * Send the wake-up signal to another waiting thread.
- * @note The mutex used for the wait method should be locked by the caller.
- */
- void signal();
- /**
- * Send the wake-up signals to all waiting threads.
- * @note The mutex used for the wait method should be locked by the caller.
- */
- void broadcast();
- private:
- /** Dummy constructor to forbid the use. */
- CondVar(const CondVar&);
- /** Dummy Operator to forbid the use. */
- CondVar& operator =(const CondVar&);
- /** Opaque pointer. */
- void* opq_;
-};
-
-
-/**
- * Assosiative condition variable.
- */
-class CondMap {
- private:
- struct Count;
- struct Slot;
- /** An alias of set of counters. */
- typedef std::map<std::string, Count> CountMap;
- /** The number of slots. */
- static const size_t SLOTNUM = 64;
- public:
- /**
- * Default constructor.
- */
- explicit CondMap() : slots_() {
- _assert_(true);
- }
- /**
- * Destructor.
- */
- ~CondMap() {
- _assert_(true);
- }
- /**
- * Wait for a signal.
- * @param kbuf the pointer to the key region.
- * @param ksiz the size of the key region.
- * @param sec the interval of the suspension in seconds. If it is negative, no timeout is
- * specified.
- * @return true on catched signal, or false on timeout.
- */
- bool wait(const char* kbuf, size_t ksiz, double sec = -1) {
- _assert_(kbuf && ksiz <= MEMMAXSIZ);
- std::string key(kbuf, ksiz);
- return wait(key, sec);
- }
- /**
- * Wait for a signal by a key.
- * @param key the key.
- * @param sec the interval of the suspension in seconds. If it is negative, no timeout is
- * specified.
- * @return true on catched signal, or false on timeout.
- */
- bool wait(const std::string& key, double sec = -1) {
- _assert_(true);
- double invtime = sec < 0 ? 1.0 : sec;
- double curtime = time();
- double endtime = curtime + (sec < 0 ? UINT32MAX : sec);
- Slot* slot = get_slot(key);
- while (curtime < endtime) {
- ScopedMutex lock(&slot->mutex);
- CountMap::iterator cit = slot->counter.find(key);
- if (cit == slot->counter.end()) {
- Count cnt = { 1, false };
- slot->counter[key] = cnt;
- } else {
- cit->second.num++;
- }
- slot->cond.wait(&slot->mutex, invtime);
- cit = slot->counter.find(key);
- cit->second.num--;
- if (cit->second.wake > 0) {
- cit->second.wake--;
- if (cit->second.num < 1) slot->counter.erase(cit);
- return true;
- }
- if (cit->second.num < 1) slot->counter.erase(cit);
- curtime = time();
- }
- return false;
- }
- /**
- * Send a wake-up signal to another thread waiting by a key.
- * @param kbuf the pointer to the key region.
- * @param ksiz the size of the key region.
- * @return the number of threads waiting for the signal.
- */
- size_t signal(const char* kbuf, size_t ksiz) {
- _assert_(kbuf && ksiz <= MEMMAXSIZ);
- std::string key(kbuf, ksiz);
- return signal(key);
- }
- /**
- * Send a wake-up signal to another thread waiting by a key.
- * @param key the key.
- * @return the number of threads waiting for the signal.
- */
- size_t signal(const std::string& key) {
- _assert_(true);
- Slot* slot = get_slot(key);
- ScopedMutex lock(&slot->mutex);
- CountMap::iterator cit = slot->counter.find(key);
- if (cit == slot->counter.end() || cit->second.num < 1) return 0;
- if (cit->second.wake < cit->second.num) cit->second.wake++;
- slot->cond.broadcast();
- return cit->second.num;
- }
- /**
- * Send wake-up signals to all threads waiting by a key.
- * @param kbuf the pointer to the key region.
- * @param ksiz the size of the key region.
- * @return the number of threads waiting for the signal.
- */
- size_t broadcast(const char* kbuf, size_t ksiz) {
- _assert_(kbuf && ksiz <= MEMMAXSIZ);
- std::string key(kbuf, ksiz);
- return broadcast(key);
- }
- /**
- * Send wake-up signals to all threads waiting by a key.
- * @param key the key.
- * @return the number of threads waiting for the signal.
- */
- size_t broadcast(const std::string& key) {
- _assert_(true);
- Slot* slot = get_slot(key);
- ScopedMutex lock(&slot->mutex);
- CountMap::iterator cit = slot->counter.find(key);
- if (cit == slot->counter.end() || cit->second.num < 1) return 0;
- cit->second.wake = cit->second.num;
- slot->cond.broadcast();
- return cit->second.num;
- }
- /**
- * Send wake-up signals to all threads waiting by each key.
- * @return the number of threads waiting for the signal.
- */
- size_t broadcast_all() {
- _assert_(true);
- size_t sum = 0;
- for (size_t i = 0; i < SLOTNUM; i++) {
- Slot* slot = slots_ + i;
- ScopedMutex lock(&slot->mutex);
- CountMap::iterator cit = slot->counter.begin();
- CountMap::iterator citend = slot->counter.end();
- while (cit != citend) {
- if (cit->second.num > 0) {
- cit->second.wake = cit->second.num;
- sum += cit->second.num;
- }
- slot->cond.broadcast();
- ++cit;
- }
- }
- return sum;
- }
- /**
- * Get the total number of threads waiting for signals.
- * @return the total number of threads waiting for signals.
- */
- size_t count() {
- _assert_(true);
- size_t sum = 0;
- for (size_t i = 0; i < SLOTNUM; i++) {
- Slot* slot = slots_ + i;
- ScopedMutex lock(&slot->mutex);
- CountMap::iterator cit = slot->counter.begin();
- CountMap::iterator citend = slot->counter.end();
- while (cit != citend) {
- sum += cit->second.num;
- ++cit;
- }
- }
- return sum;
- }
- private:
- /**
- * Counter for waiting threads.
- */
- struct Count {
- size_t num; ///< waiting threads
- size_t wake; ///< waking threads
- };
- /**
- * Slot of a key space.
- */
- struct Slot {
- CondVar cond; ///< condition variable
- Mutex mutex; ///< mutex
- CountMap counter; ///< counter
- };
- /**
- * Get the slot corresponding a key.
- * @param key the key.
- * @return the slot corresponding the key.
- */
- Slot* get_slot(const std::string& key) {
- return slots_ + hashmurmur(key.data(), key.size()) % SLOTNUM;
- }
- /** The slot array. */
- Slot slots_[SLOTNUM];
-};
-
-
-/**
- * Key of thread specific data.
- */
-class TSDKey {
- public:
- /**
- * Default constructor.
- */
- explicit TSDKey();
- /**
- * Constructor.
- * @param dstr the destructor for the value.
- */
- explicit TSDKey(void (*dstr)(void*));
- /**
- * Destructor.
- */
- ~TSDKey();
- /**
- * Set the value.
- * @param ptr an arbitrary pointer.
- */
- void set(void* ptr);
- /**
- * Get the value.
- * @return the value.
- */
- void* get() const ;
- private:
- /** Opaque pointer. */
- void* opq_;
-};
-
-
-/**
- * Smart pointer to thread specific data.
- */
-template <class TYPE>
-class TSD {
- public:
- /**
- * Default constructor.
- */
- explicit TSD() : key_(delete_value) {
- _assert_(true);
- }
- /**
- * Destructor.
- */
- ~TSD() {
- _assert_(true);
- TYPE* obj = (TYPE*)key_.get();
- if (obj) {
- delete obj;
- key_.set(NULL);
- }
- }
- /**
- * Dereference operator.
- * @return the reference to the inner object.
- */
- TYPE& operator *() {
- _assert_(true);
- TYPE* obj = (TYPE*)key_.get();
- if (!obj) {
- obj = new TYPE;
- key_.set(obj);
- }
- return *obj;
- }
- /**
- * Member reference operator.
- * @return the pointer to the inner object.
- */
- TYPE* operator ->() {
- _assert_(true);
- TYPE* obj = (TYPE*)key_.get();
- if (!obj) {
- obj = new TYPE;
- key_.set(obj);
- }
- return obj;
- }
- /**
- * Cast operator to the original type.
- * @return the copy of the inner object.
- */
- operator TYPE() const {
- _assert_(true);
- TYPE* obj = (TYPE*)key_.get();
- if (!obj) return TYPE();
- return *obj;
- }
- private:
- /**
- * Delete the inner object.
- * @param obj the inner object.
- */
- static void delete_value(void* obj) {
- _assert_(true);
- delete (TYPE*)obj;
- }
- /** Dummy constructor to forbid the use. */
- TSD(const TSD&);
- /** Dummy Operator to forbid the use. */
- TSD& operator =(const TSD&);
- /** Key of thread specific data. */
- TSDKey key_;
-};
-
-
-/**
- * Integer with atomic operations.
- */
-class AtomicInt64 {
- public:
- /**
- * Default constructor.
- */
- explicit AtomicInt64() : value_(0) {
- _assert_(true);
- }
- /**
- * Copy constructor.
- * @param src the source object.
- */
- AtomicInt64(const AtomicInt64& src) : value_(src.get()) {
- _assert_(true);
- };
- /**
- * Constructor.
- * @param num the initial value.
- */
- AtomicInt64(int64_t num) : value_(num) {
- _assert_(true);
- }
- /**
- * Destructor.
- */
- ~AtomicInt64() {
- _assert_(true);
- }
- /**
- * Set the new value.
- * @param val the new value.
- * @return the old value.
- */
- int64_t set(int64_t val);
- /**
- * Add a value.
- * @param val the additional value.
- * @return the old value.
- */
- int64_t add(int64_t val);
- /**
- * Perform compare-and-swap.
- * @param oval the old value.
- * @param nval the new value.
- * @return true on success, or false on failure.
- */
- bool cas(int64_t oval, int64_t nval);
- /**
- * Get the current value.
- * @return the current value.
- */
- int64_t get() const;
- /**
- * Assignment operator from the self type.
- * @param right the right operand.
- * @return the reference to itself.
- */
- AtomicInt64& operator =(const AtomicInt64& right) {
- _assert_(true);
- if (&right == this) return *this;
- set(right.get());
- return *this;
- }
- /**
- * Assignment operator from integer.
- * @param right the right operand.
- * @return the reference to itself.
- */
- AtomicInt64& operator =(const int64_t& right) {
- _assert_(true);
- set(right);
- return *this;
- }
- /**
- * Cast operator to integer.
- * @return the current value.
- */
- operator int64_t() const {
- _assert_(true);
- return get();
- }
- /**
- * Summation assignment operator by integer.
- * @param right the right operand.
- * @return the reference to itself.
- */
- AtomicInt64& operator +=(int64_t right) {
- _assert_(true);
- add(right);
- return *this;
- }
- /**
- * Subtraction assignment operator by integer.
- * @param right the right operand.
- * @return the reference to itself.
- */
- AtomicInt64& operator -=(int64_t right) {
- _assert_(true);
- add(-right);
- return *this;
- }
- /**
- * Secure the least value
- * @param val the least value
- * @return the current value.
- */
- int64_t secure_least(int64_t val) {
- _assert_(true);
- while (true) {
- int64_t cur = get();
- if (cur >= val) return cur;
- if (cas(cur, val)) break;
- }
- return val;
- }
- private:
- /** The value. */
- volatile int64_t value_;
-};
-
-
-/**
- * Task queue device.
- */
-class TaskQueue {
- public:
- class Task;
- private:
- class WorkerThread;
- /** An alias of list of tasks. */
- typedef std::list<Task*> TaskList;
- public:
- /**
- * Interface of a task.
- */
- class Task {
- friend class TaskQueue;
- public:
- /**
- * Default constructor.
- */
- explicit Task() : id_(0), thid_(0), aborted_(false) {
- _assert_(true);
- }
- /**
- * Destructor.
- */
- virtual ~Task() {
- _assert_(true);
- }
- /**
- * Get the ID number of the task.
- * @return the ID number of the task, which is incremented from 1.
- */
- uint64_t id() const {
- _assert_(true);
- return id_;
- }
- /**
- * Get the ID number of the worker thread.
- * @return the ID number of the worker thread. It is from 0 to less than the number of
- * worker threads.
- */
- uint32_t thread_id() const {
- _assert_(true);
- return thid_;
- }
- /**
- * Check whether the thread is to be aborted.
- * @return true if the thread is to be aborted, or false if not.
- */
- bool aborted() const {
- _assert_(true);
- return aborted_;
- }
- private:
- /** The task ID number. */
- uint64_t id_;
- /** The thread ID number. */
- uint64_t thid_;
- /** The flag to be aborted. */
- bool aborted_;
- };
- /**
- * Default Constructor.
- */
- TaskQueue() : thary_(NULL), thnum_(0), tasks_(), count_(0), mutex_(), cond_(), seed_(0) {
- _assert_(true);
- }
- /**
- * Destructor.
- */
- virtual ~TaskQueue() {
- _assert_(true);
- }
- /**
- * Process a task.
- * @param task a task object.
- */
- virtual void do_task(Task* task) = 0;
- /**
- * Process the starting event.
- * @param task a task object.
- * @note This is called for each thread on starting.
- */
- virtual void do_start(const Task* task) {
- _assert_(true);
- }
- /**
- * Process the finishing event.
- * @param task a task object.
- * @note This is called for each thread on finishing.
- */
- virtual void do_finish(const Task* task) {
- _assert_(true);
- }
- /**
- * Start the task queue.
- * @param thnum the number of worker threads.
- */
- void start(size_t thnum) {
- _assert_(thnum > 0 && thnum <= MEMMAXSIZ);
- thary_ = new WorkerThread[thnum];
- for (size_t i = 0; i < thnum; i++) {
- thary_[i].id_ = i;
- thary_[i].queue_ = this;
- thary_[i].start();
- }
- thnum_ = thnum;
- }
- /**
- * Finish the task queue.
- * @note This function blocks until all tasks in the queue are popped.
- */
- void finish() {
- _assert_(true);
- mutex_.lock();
- TaskList::iterator it = tasks_.begin();
- TaskList::iterator itend = tasks_.end();
- while (it != itend) {
- Task* task = *it;
- task->aborted_ = true;
- ++it;
- }
- cond_.broadcast();
- mutex_.unlock();
- Thread::yield();
- for (double wsec = 1.0 / CLOCKTICK; true; wsec *= 2) {
- mutex_.lock();
- if (tasks_.empty()) {
- mutex_.unlock();
- break;
- }
- mutex_.unlock();
- if (wsec > 1.0) wsec = 1.0;
- Thread::sleep(wsec);
- }
- mutex_.lock();
- for (size_t i = 0; i < thnum_; i++) {
- thary_[i].aborted_ = true;
- }
- cond_.broadcast();
- mutex_.unlock();
- for (size_t i = 0; i < thnum_; i++) {
- thary_[i].join();
- }
- delete[] thary_;
- }
- /**
- * Add a task.
- * @param task a task object.
- * @return the number of tasks in the queue.
- */
- int64_t add_task(Task* task) {
- _assert_(task);
- mutex_.lock();
- task->id_ = ++seed_;
- tasks_.push_back(task);
- int64_t count = ++count_;
- cond_.signal();
- mutex_.unlock();
- return count;
- }
- /**
- * Get the number of tasks in the queue.
- * @return the number of tasks in the queue.
- */
- int64_t count() {
- _assert_(true);
- mutex_.lock();
- int64_t count = count_;
- mutex_.unlock();
- return count;
- }
- private:
- /**
- * Implementation of the worker thread.
- */
- class WorkerThread : public Thread {
- friend class TaskQueue;
- public:
- explicit WorkerThread() : id_(0), queue_(NULL), aborted_(false) {
- _assert_(true);
- }
- private:
- void run() {
- _assert_(true);
- Task* stask = new Task;
- stask->thid_ = id_;
- queue_->do_start(stask);
- delete stask;
- bool empty = false;
- while (true) {
- queue_->mutex_.lock();
- if (aborted_) {
- queue_->mutex_.unlock();
- break;
- }
- if (empty) queue_->cond_.wait(&queue_->mutex_, 1.0);
- Task * task = NULL;
- if (queue_->tasks_.empty()) {
- empty = true;
- } else {
- task = queue_->tasks_.front();
- task->thid_ = id_;
- queue_->tasks_.pop_front();
- queue_->count_--;
- empty = false;
- }
- queue_->mutex_.unlock();
- if (task) queue_->do_task(task);
- }
- Task* ftask = new Task;
- ftask->thid_ = id_;
- ftask->aborted_ = true;
- queue_->do_finish(ftask);
- delete ftask;
- }
- uint32_t id_;
- TaskQueue* queue_;
- Task* task_;
- bool aborted_;
- };
- /** Dummy constructor to forbid the use. */
- TaskQueue(const TaskQueue&);
- /** Dummy Operator to forbid the use. */
- TaskQueue& operator =(const TaskQueue&);
- /** The array of worker threads. */
- WorkerThread* thary_;
- /** The number of worker threads. */
- size_t thnum_;
- /** The list of tasks. */
- TaskList tasks_;
- /** The number of the tasks. */
- int64_t count_;
- /** The mutex for the task list. */
- Mutex mutex_;
- /** The condition variable for the task list. */
- CondVar cond_;
- /** The seed of ID numbers. */
- uint64_t seed_;
-};
-
-
-} // common namespace
-
-#endif // duplication check
-
-// END OF FILE