/*************************************************************************************************
* Threading devices
* Copyright (C) 2009-2012 FAL Labs
* This file is part of Kyoto Cabinet.
* This program is free software: you can redistribute it and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation, either version
* 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
* You should have received a copy of the GNU General Public License along with this program.
* If not, see .
*************************************************************************************************/
#ifndef _KCTHREAD_H // duplication check
#define _KCTHREAD_H
#include
#include
namespace kyotocabinet { // common namespace
/**
* Threading device.
*/
class Thread {
public:
/**
* Default constructor.
*/
explicit Thread();
/**
* Destructor.
*/
virtual ~Thread();
/**
* Perform the concrete process.
*/
virtual void run() = 0;
/**
* Start the thread.
*/
void start();
/**
* Wait for the thread to finish.
*/
void join();
/**
* Put the thread in the detached state.
*/
void detach();
/**
* Terminate the running thread.
*/
static void exit();
/**
* Yield the processor from the current thread.
*/
static void yield();
/**
* Chill the processor by suspending execution for a quick moment.
*/
static void chill();
/**
* Suspend execution of the current thread.
* @param sec the interval of the suspension in seconds.
* @return true on success, or false on failure.
*/
static bool sleep(double sec);
/**
* Get the hash value of the current thread.
* @return the hash value of the current thread.
*/
static int64_t hash();
private:
/** Dummy constructor to forbid the use. */
Thread(const Thread&);
/** Dummy Operator to forbid the use. */
Thread& operator =(const Thread&);
/** Opaque pointer. */
void* opq_;
};
/**
* Basic mutual exclusion device.
*/
class Mutex {
friend class CondVar;
public:
/**
* Type of the behavior for double locking.
*/
enum Type {
FAST, ///< no operation
ERRORCHECK, ///< check error
RECURSIVE ///< allow recursive locking
};
/**
* Default constructor.
*/
explicit Mutex();
/**
* Constructor.
* @param type the behavior for double locking.
*/
explicit Mutex(Type type);
/**
* Destructor.
*/
~Mutex();
/**
* Get the lock.
*/
void lock();
/**
* Try to get the lock.
* @return true on success, or false on failure.
*/
bool lock_try();
/**
* Try to get the lock.
* @param sec the interval of the suspension in seconds.
* @return true on success, or false on failure.
*/
bool lock_try(double sec);
/**
* Release the lock.
*/
void unlock();
private:
/** Dummy constructor to forbid the use. */
Mutex(const Mutex&);
/** Dummy Operator to forbid the use. */
Mutex& operator =(const Mutex&);
/** Opaque pointer. */
void* opq_;
};
/**
* Scoped mutex device.
*/
class ScopedMutex {
public:
/**
* Constructor.
* @param mutex a mutex to lock the block.
*/
explicit ScopedMutex(Mutex* mutex) : mutex_(mutex) {
_assert_(mutex);
mutex_->lock();
}
/**
* Destructor.
*/
~ScopedMutex() {
_assert_(true);
mutex_->unlock();
}
private:
/** Dummy constructor to forbid the use. */
ScopedMutex(const ScopedMutex&);
/** Dummy Operator to forbid the use. */
ScopedMutex& operator =(const ScopedMutex&);
/** The inner device. */
Mutex* mutex_;
};
/**
* Slotted mutex device.
*/
class SlottedMutex {
public:
/**
* Constructor.
* @param slotnum the number of slots.
*/
explicit SlottedMutex(size_t slotnum);
/**
* Destructor.
*/
~SlottedMutex();
/**
* Get the lock of a slot.
* @param idx the index of a slot.
*/
void lock(size_t idx);
/**
* Release the lock of a slot.
* @param idx the index of a slot.
*/
void unlock(size_t idx);
/**
* Get the locks of all slots.
*/
void lock_all();
/**
* Release the locks of all slots.
*/
void unlock_all();
private:
/** Opaque pointer. */
void* opq_;
};
/**
* Condition variable.
*/
class CondVar {
public:
/**
* Default constructor.
*/
explicit CondVar();
/**
* Destructor.
*/
~CondVar();
/**
* Wait for the signal.
* @param mutex a locked mutex.
*/
void wait(Mutex* mutex);
/**
* Wait for the signal.
* @param mutex a locked mutex.
* @param sec the interval of the suspension in seconds.
* @return true on catched signal, or false on timeout.
*/
bool wait(Mutex* mutex, double sec);
/**
* Send the wake-up signal to another waiting thread.
* @note The mutex used for the wait method should be locked by the caller.
*/
void signal();
/**
* Send the wake-up signals to all waiting threads.
* @note The mutex used for the wait method should be locked by the caller.
*/
void broadcast();
private:
/** Dummy constructor to forbid the use. */
CondVar(const CondVar&);
/** Dummy Operator to forbid the use. */
CondVar& operator =(const CondVar&);
/** Opaque pointer. */
void* opq_;
};
/**
* Assosiative condition variable.
*/
class CondMap {
private:
struct Count;
struct Slot;
/** An alias of set of counters. */
typedef std::map CountMap;
/** The number of slots. */
static const size_t SLOTNUM = 64;
public:
/**
* Default constructor.
*/
explicit CondMap() : slots_() {
_assert_(true);
}
/**
* Destructor.
*/
~CondMap() {
_assert_(true);
}
/**
* Wait for a signal.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param sec the interval of the suspension in seconds. If it is negative, no timeout is
* specified.
* @return true on catched signal, or false on timeout.
*/
bool wait(const char* kbuf, size_t ksiz, double sec = -1) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
std::string key(kbuf, ksiz);
return wait(key, sec);
}
/**
* Wait for a signal by a key.
* @param key the key.
* @param sec the interval of the suspension in seconds. If it is negative, no timeout is
* specified.
* @return true on catched signal, or false on timeout.
*/
bool wait(const std::string& key, double sec = -1) {
_assert_(true);
double invtime = sec < 0 ? 1.0 : sec;
double curtime = time();
double endtime = curtime + (sec < 0 ? UINT32MAX : sec);
Slot* slot = get_slot(key);
while (curtime < endtime) {
ScopedMutex lock(&slot->mutex);
CountMap::iterator cit = slot->counter.find(key);
if (cit == slot->counter.end()) {
Count cnt = { 1, false };
slot->counter[key] = cnt;
} else {
cit->second.num++;
}
slot->cond.wait(&slot->mutex, invtime);
cit = slot->counter.find(key);
cit->second.num--;
if (cit->second.wake > 0) {
cit->second.wake--;
if (cit->second.num < 1) slot->counter.erase(cit);
return true;
}
if (cit->second.num < 1) slot->counter.erase(cit);
curtime = time();
}
return false;
}
/**
* Send a wake-up signal to another thread waiting by a key.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @return the number of threads waiting for the signal.
*/
size_t signal(const char* kbuf, size_t ksiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
std::string key(kbuf, ksiz);
return signal(key);
}
/**
* Send a wake-up signal to another thread waiting by a key.
* @param key the key.
* @return the number of threads waiting for the signal.
*/
size_t signal(const std::string& key) {
_assert_(true);
Slot* slot = get_slot(key);
ScopedMutex lock(&slot->mutex);
CountMap::iterator cit = slot->counter.find(key);
if (cit == slot->counter.end() || cit->second.num < 1) return 0;
if (cit->second.wake < cit->second.num) cit->second.wake++;
slot->cond.broadcast();
return cit->second.num;
}
/**
* Send wake-up signals to all threads waiting by a key.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @return the number of threads waiting for the signal.
*/
size_t broadcast(const char* kbuf, size_t ksiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
std::string key(kbuf, ksiz);
return broadcast(key);
}
/**
* Send wake-up signals to all threads waiting by a key.
* @param key the key.
* @return the number of threads waiting for the signal.
*/
size_t broadcast(const std::string& key) {
_assert_(true);
Slot* slot = get_slot(key);
ScopedMutex lock(&slot->mutex);
CountMap::iterator cit = slot->counter.find(key);
if (cit == slot->counter.end() || cit->second.num < 1) return 0;
cit->second.wake = cit->second.num;
slot->cond.broadcast();
return cit->second.num;
}
/**
* Send wake-up signals to all threads waiting by each key.
* @return the number of threads waiting for the signal.
*/
size_t broadcast_all() {
_assert_(true);
size_t sum = 0;
for (size_t i = 0; i < SLOTNUM; i++) {
Slot* slot = slots_ + i;
ScopedMutex lock(&slot->mutex);
CountMap::iterator cit = slot->counter.begin();
CountMap::iterator citend = slot->counter.end();
while (cit != citend) {
if (cit->second.num > 0) {
cit->second.wake = cit->second.num;
sum += cit->second.num;
}
slot->cond.broadcast();
++cit;
}
}
return sum;
}
/**
* Get the total number of threads waiting for signals.
* @return the total number of threads waiting for signals.
*/
size_t count() {
_assert_(true);
size_t sum = 0;
for (size_t i = 0; i < SLOTNUM; i++) {
Slot* slot = slots_ + i;
ScopedMutex lock(&slot->mutex);
CountMap::iterator cit = slot->counter.begin();
CountMap::iterator citend = slot->counter.end();
while (cit != citend) {
sum += cit->second.num;
++cit;
}
}
return sum;
}
private:
/**
* Counter for waiting threads.
*/
struct Count {
size_t num; ///< waiting threads
size_t wake; ///< waking threads
};
/**
* Slot of a key space.
*/
struct Slot {
CondVar cond; ///< condition variable
Mutex mutex; ///< mutex
CountMap counter; ///< counter
};
/**
* Get the slot corresponding a key.
* @param key the key.
* @return the slot corresponding the key.
*/
Slot* get_slot(const std::string& key) {
return slots_ + hashmurmur(key.data(), key.size()) % SLOTNUM;
}
/** The slot array. */
Slot slots_[SLOTNUM];
};
/**
* Key of thread specific data.
*/
class TSDKey {
public:
/**
* Default constructor.
*/
explicit TSDKey();
/**
* Constructor.
* @param dstr the destructor for the value.
*/
explicit TSDKey(void (*dstr)(void*));
/**
* Destructor.
*/
~TSDKey();
/**
* Set the value.
* @param ptr an arbitrary pointer.
*/
void set(void* ptr);
/**
* Get the value.
* @return the value.
*/
void* get() const ;
private:
/** Opaque pointer. */
void* opq_;
};
/**
* Smart pointer to thread specific data.
*/
template
class TSD {
public:
/**
* Default constructor.
*/
explicit TSD() : key_(delete_value) {
_assert_(true);
}
/**
* Destructor.
*/
~TSD() {
_assert_(true);
TYPE* obj = (TYPE*)key_.get();
if (obj) {
delete obj;
key_.set(NULL);
}
}
/**
* Dereference operator.
* @return the reference to the inner object.
*/
TYPE& operator *() {
_assert_(true);
TYPE* obj = (TYPE*)key_.get();
if (!obj) {
obj = new TYPE;
key_.set(obj);
}
return *obj;
}
/**
* Member reference operator.
* @return the pointer to the inner object.
*/
TYPE* operator ->() {
_assert_(true);
TYPE* obj = (TYPE*)key_.get();
if (!obj) {
obj = new TYPE;
key_.set(obj);
}
return obj;
}
/**
* Cast operator to the original type.
* @return the copy of the inner object.
*/
operator TYPE() const {
_assert_(true);
TYPE* obj = (TYPE*)key_.get();
if (!obj) return TYPE();
return *obj;
}
private:
/**
* Delete the inner object.
* @param obj the inner object.
*/
static void delete_value(void* obj) {
_assert_(true);
delete (TYPE*)obj;
}
/** Dummy constructor to forbid the use. */
TSD(const TSD&);
/** Dummy Operator to forbid the use. */
TSD& operator =(const TSD&);
/** Key of thread specific data. */
TSDKey key_;
};
/**
* Integer with atomic operations.
*/
class AtomicInt64 {
public:
/**
* Default constructor.
*/
explicit AtomicInt64() : value_(0) {
_assert_(true);
}
/**
* Copy constructor.
* @param src the source object.
*/
AtomicInt64(const AtomicInt64& src) : value_(src.get()) {
_assert_(true);
};
/**
* Constructor.
* @param num the initial value.
*/
AtomicInt64(int64_t num) : value_(num) {
_assert_(true);
}
/**
* Destructor.
*/
~AtomicInt64() {
_assert_(true);
}
/**
* Set the new value.
* @param val the new value.
* @return the old value.
*/
int64_t set(int64_t val);
/**
* Add a value.
* @param val the additional value.
* @return the old value.
*/
int64_t add(int64_t val);
/**
* Perform compare-and-swap.
* @param oval the old value.
* @param nval the new value.
* @return true on success, or false on failure.
*/
bool cas(int64_t oval, int64_t nval);
/**
* Get the current value.
* @return the current value.
*/
int64_t get() const;
/**
* Assignment operator from the self type.
* @param right the right operand.
* @return the reference to itself.
*/
AtomicInt64& operator =(const AtomicInt64& right) {
_assert_(true);
if (&right == this) return *this;
set(right.get());
return *this;
}
/**
* Assignment operator from integer.
* @param right the right operand.
* @return the reference to itself.
*/
AtomicInt64& operator =(const int64_t& right) {
_assert_(true);
set(right);
return *this;
}
/**
* Cast operator to integer.
* @return the current value.
*/
operator int64_t() const {
_assert_(true);
return get();
}
/**
* Summation assignment operator by integer.
* @param right the right operand.
* @return the reference to itself.
*/
AtomicInt64& operator +=(int64_t right) {
_assert_(true);
add(right);
return *this;
}
/**
* Subtraction assignment operator by integer.
* @param right the right operand.
* @return the reference to itself.
*/
AtomicInt64& operator -=(int64_t right) {
_assert_(true);
add(-right);
return *this;
}
/**
* Secure the least value
* @param val the least value
* @return the current value.
*/
int64_t secure_least(int64_t val) {
_assert_(true);
while (true) {
int64_t cur = get();
if (cur >= val) return cur;
if (cas(cur, val)) break;
}
return val;
}
private:
/** The value. */
volatile int64_t value_;
};
/**
* Task queue device.
*/
class TaskQueue {
public:
class Task;
private:
class WorkerThread;
/** An alias of list of tasks. */
typedef std::list TaskList;
public:
/**
* Interface of a task.
*/
class Task {
friend class TaskQueue;
public:
/**
* Default constructor.
*/
explicit Task() : id_(0), thid_(0), aborted_(false) {
_assert_(true);
}
/**
* Destructor.
*/
virtual ~Task() {
_assert_(true);
}
/**
* Get the ID number of the task.
* @return the ID number of the task, which is incremented from 1.
*/
uint64_t id() const {
_assert_(true);
return id_;
}
/**
* Get the ID number of the worker thread.
* @return the ID number of the worker thread. It is from 0 to less than the number of
* worker threads.
*/
uint32_t thread_id() const {
_assert_(true);
return thid_;
}
/**
* Check whether the thread is to be aborted.
* @return true if the thread is to be aborted, or false if not.
*/
bool aborted() const {
_assert_(true);
return aborted_;
}
private:
/** The task ID number. */
uint64_t id_;
/** The thread ID number. */
uint64_t thid_;
/** The flag to be aborted. */
bool aborted_;
};
/**
* Default Constructor.
*/
TaskQueue() : thary_(NULL), thnum_(0), tasks_(), count_(0), mutex_(), cond_(), seed_(0) {
_assert_(true);
}
/**
* Destructor.
*/
virtual ~TaskQueue() {
_assert_(true);
}
/**
* Process a task.
* @param task a task object.
*/
virtual void do_task(Task* task) = 0;
/**
* Process the starting event.
* @param task a task object.
* @note This is called for each thread on starting.
*/
virtual void do_start(const Task* task) {
_assert_(true);
}
/**
* Process the finishing event.
* @param task a task object.
* @note This is called for each thread on finishing.
*/
virtual void do_finish(const Task* task) {
_assert_(true);
}
/**
* Start the task queue.
* @param thnum the number of worker threads.
*/
void start(size_t thnum) {
_assert_(thnum > 0 && thnum <= MEMMAXSIZ);
thary_ = new WorkerThread[thnum];
for (size_t i = 0; i < thnum; i++) {
thary_[i].id_ = i;
thary_[i].queue_ = this;
thary_[i].start();
}
thnum_ = thnum;
}
/**
* Finish the task queue.
* @note This function blocks until all tasks in the queue are popped.
*/
void finish() {
_assert_(true);
mutex_.lock();
TaskList::iterator it = tasks_.begin();
TaskList::iterator itend = tasks_.end();
while (it != itend) {
Task* task = *it;
task->aborted_ = true;
++it;
}
cond_.broadcast();
mutex_.unlock();
Thread::yield();
for (double wsec = 1.0 / CLOCKTICK; true; wsec *= 2) {
mutex_.lock();
if (tasks_.empty()) {
mutex_.unlock();
break;
}
mutex_.unlock();
if (wsec > 1.0) wsec = 1.0;
Thread::sleep(wsec);
}
mutex_.lock();
for (size_t i = 0; i < thnum_; i++) {
thary_[i].aborted_ = true;
}
cond_.broadcast();
mutex_.unlock();
for (size_t i = 0; i < thnum_; i++) {
thary_[i].join();
}
delete[] thary_;
}
/**
* Add a task.
* @param task a task object.
* @return the number of tasks in the queue.
*/
int64_t add_task(Task* task) {
_assert_(task);
mutex_.lock();
task->id_ = ++seed_;
tasks_.push_back(task);
int64_t count = ++count_;
cond_.signal();
mutex_.unlock();
return count;
}
/**
* Get the number of tasks in the queue.
* @return the number of tasks in the queue.
*/
int64_t count() {
_assert_(true);
mutex_.lock();
int64_t count = count_;
mutex_.unlock();
return count;
}
private:
/**
* Implementation of the worker thread.
*/
class WorkerThread : public Thread {
friend class TaskQueue;
public:
explicit WorkerThread() : id_(0), queue_(NULL), aborted_(false) {
_assert_(true);
}
private:
void run() {
_assert_(true);
Task* stask = new Task;
stask->thid_ = id_;
queue_->do_start(stask);
delete stask;
bool empty = false;
while (true) {
queue_->mutex_.lock();
if (aborted_) {
queue_->mutex_.unlock();
break;
}
if (empty) queue_->cond_.wait(&queue_->mutex_, 1.0);
Task * task = NULL;
if (queue_->tasks_.empty()) {
empty = true;
} else {
task = queue_->tasks_.front();
task->thid_ = id_;
queue_->tasks_.pop_front();
queue_->count_--;
empty = false;
}
queue_->mutex_.unlock();
if (task) queue_->do_task(task);
}
Task* ftask = new Task;
ftask->thid_ = id_;
ftask->aborted_ = true;
queue_->do_finish(ftask);
delete ftask;
}
uint32_t id_;
TaskQueue* queue_;
Task* task_;
bool aborted_;
};
/** Dummy constructor to forbid the use. */
TaskQueue(const TaskQueue&);
/** Dummy Operator to forbid the use. */
TaskQueue& operator =(const TaskQueue&);
/** The array of worker threads. */
WorkerThread* thary_;
/** The number of worker threads. */
size_t thnum_;
/** The list of tasks. */
TaskList tasks_;
/** The number of the tasks. */
int64_t count_;
/** The mutex for the task list. */
Mutex mutex_;
/** The condition variable for the task list. */
CondVar cond_;
/** The seed of ID numbers. */
uint64_t seed_;
};
} // common namespace
#endif // duplication check
// END OF FILE