/*************************************************************************************************
* Prototype database
* Copyright (C) 2009-2012 FAL Labs
* This file is part of Kyoto Cabinet.
* This program is free software: you can redistribute it and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation, either version
* 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
* You should have received a copy of the GNU General Public License along with this program.
* If not, see .
*************************************************************************************************/
#ifndef _KCPROTODB_H // duplication check
#define _KCPROTODB_H
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace kyotocabinet { // common namespace
/**
* Prototype implementation of database with STL.
* @param STRMAP a class compatible with the map class of STL.
* @param DBTYPE the database type number of the class.
* @note This class template is a template for concrete classes which wrap data structures
* compatible with std::map. Template instance classes can be inherited but overwriting methods
* is forbidden. The class ProtoHashDB is the instance using std::unordered_map. The class
* ProtoTreeDB is the instance using std::map. Before every database operation, it is necessary
* to call the BasicDB::open method in order to open a database file and connect the database
* object to it. To avoid data missing or corruption, it is important to close every database
* file by the BasicDB::close method when the database is no longer in use. It is forbidden for
* multible database objects in a process to open the same database at the same time. It is
* forbidden to share a database object with child processes.
*/
template
class ProtoDB : public BasicDB {
public:
class Cursor;
private:
struct TranLog;
class ScopedVisitor;
/** An alias of list of cursors. */
typedef std::list CursorList;
/** An alias of list of transaction logs. */
typedef std::list TranLogList;
/** The size of the opaque buffer. */
static const size_t OPAQUESIZ = 16;
/** The threshold of busy loop and sleep for locking. */
static const uint32_t LOCKBUSYLOOP = 8192;
public:
/**
* Cursor to indicate a record.
*/
class Cursor : public BasicDB::Cursor {
friend class ProtoDB;
public:
/**
* Constructor.
* @param db the container database object.
*/
explicit Cursor(ProtoDB* db) : db_(db), it_(db->recs_.end()) {
_assert_(db);
// ScopedRWLock lock(&db_->mlock_, true);
db_->curs_.push_back(this);
}
/**
* Destructor.
*/
virtual ~Cursor() {
_assert_(true);
if (!db_) return;
// ScopedRWLock lock(&db_->mlock_, true);
db_->curs_.remove(this);
}
/**
* Accept a visitor to the current record.
* @param visitor a visitor object.
* @param writable true for writable operation, or false for read-only operation.
* @param step true to move the cursor to the next record, or false for no move.
* @return true on success, or false on failure.
* @note The operation for each record is performed atomically and other threads accessing
* the same record are blocked. To avoid deadlock, any explicit database operation must not
* be performed in this function.
*/
bool accept(Visitor* visitor, bool writable = true, bool step = false) {
_assert_(visitor);
// ScopedRWLock lock(&db_->mlock_, true);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (writable && !(db_->omode_ & OWRITER)) {
db_->set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
if (it_ == db_->recs_.end()) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
const std::string& key = it_->first;
const std::string& value = it_->second;
size_t vsiz;
const char* vbuf = visitor->visit_full(key.data(), key.size(),
value.data(), value.size(), &vsiz);
if (vbuf == Visitor::REMOVE) {
if (db_->tran_) {
TranLog log(key, value);
db_->trlogs_.push_back(log);
}
db_->size_ -= key.size() + value.size();
if (db_->curs_.size() > 1) {
typename CursorList::const_iterator cit = db_->curs_.begin();
typename CursorList::const_iterator citend = db_->curs_.end();
while (cit != citend) {
Cursor* cur = *cit;
if (cur != this && cur->it_ == it_) ++cur->it_;
++cit;
}
}
db_->recs_.erase(it_++);
} else if (vbuf == Visitor::NOP) {
if (step) ++it_;
} else {
if (db_->tran_) {
TranLog log(key, value);
db_->trlogs_.push_back(log);
}
db_->size_ -= value.size();
db_->size_ += vsiz;
it_->second = std::string(vbuf, vsiz);
if (step) ++it_;
}
return true;
}
/**
* Jump the cursor to the first record for forward scan.
* @return true on success, or false on failure.
*/
bool jump() {
_assert_(true);
// ScopedRWLock lock(&db_->mlock_, true);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
it_ = db_->recs_.begin();
if (it_ == db_->recs_.end()) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
return true;
}
/**
* Jump the cursor to a record for forward scan.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @return true on success, or false on failure.
*/
bool jump(const char* kbuf, size_t ksiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
// ScopedRWLock lock(&db_->mlock_, true);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
std::string key(kbuf, ksiz);
search(key);
if (it_ == db_->recs_.end()) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
return true;
}
/**
* Jump the cursor to a record for forward scan.
* @note Equal to the original Cursor::jump method except that the parameter is std::string.
*/
bool jump(const std::string& key) {
_assert_(true);
return jump(key.data(), key.size());
}
/**
* Jump the cursor to the last record for backward scan.
* @return true on success, or false on failure.
*/
bool jump_back() {
_assert_(true);
// ScopedRWLock lock(&db_->mlock_, true);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
it_ = db_->recs_.end();
if (it_ == db_->recs_.begin()) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
if (!iter_back()) {
db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
return false;
}
return true;
}
/**
* Jump the cursor to a record for backward scan.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @return true on success, or false on failure.
*/
bool jump_back(const char* kbuf, size_t ksiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
// ScopedRWLock lock(&db_->mlock_, true);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
std::string key(kbuf, ksiz);
search(key);
if (it_ == db_->recs_.end()) {
if (it_ == db_->recs_.begin()) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
if (!iter_back()) {
db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
return false;
}
} else {
std::string key(kbuf, ksiz);
if (key < it_->first) {
if (it_ == db_->recs_.begin()) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
it_ = db_->recs_.end();
return false;
}
if (!iter_back()) {
db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
it_ = db_->recs_.end();
return false;
}
}
}
return true;
}
/**
* Jump the cursor to a record for backward scan.
* @note Equal to the original Cursor::jump_back method except that the parameter is
* std::string.
*/
bool jump_back(const std::string& key) {
_assert_(true);
return jump_back(key.data(), key.size());
}
/**
* Step the cursor to the next record.
* @return true on success, or false on failure.
*/
bool step() {
_assert_(true);
// ScopedRWLock lock(&db_->mlock_, true);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (it_ == db_->recs_.end()) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
++it_;
if (it_ == db_->recs_.end()) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
return true;
}
/**
* Step the cursor to the previous record.
* @return true on success, or false on failure.
*/
bool step_back() {
_assert_(true);
// ScopedRWLock lock(&db_->mlock_, true);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (it_ == db_->recs_.begin()) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
it_ = db_->recs_.end();
return false;
}
if (!iter_back()) {
db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented");
it_ = db_->recs_.end();
return false;
}
return true;
}
/**
* Get the database object.
* @return the database object.
*/
ProtoDB* db() {
_assert_(true);
return db_;
}
private:
/**
* Search for a record.
*/
void search(const std::string& key);
/**
* Place back the inner iterator.
* @return true on success, or false on failure.
*/
bool iter_back();
/** Dummy constructor to forbid the use. */
Cursor(const Cursor&);
/** Dummy Operator to forbid the use. */
Cursor& operator =(const Cursor&);
/** The inner database. */
ProtoDB* db_;
/** The inner iterator. */
typename STRMAP::iterator it_;
};
/**
* Default constructor.
*/
explicit ProtoDB() :
error_(), logger_(NULL), logkinds_(0), mtrigger_(NULL),
omode_(0), recs_(), curs_(), path_(""), size_(0), opaque_(),
tran_(false), trlogs_(), trsize_(0) {
_assert_(true);
map_tune();
}
/**
* Destructor.
* @note If the database is not closed, it is closed implicitly.
*/
virtual ~ProtoDB() {
_assert_(true);
if (omode_ != 0) close();
if (!curs_.empty()) {
typename CursorList::const_iterator cit = curs_.begin();
typename CursorList::const_iterator citend = curs_.end();
while (cit != citend) {
Cursor* cur = *cit;
cur->db_ = NULL;
++cit;
}
}
}
/**
* Accept a visitor to a record.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param visitor a visitor object.
* @param writable true for writable operation, or false for read-only operation.
* @return true on success, or false on failure.
* @note The operation for each record is performed atomically and other threads accessing the
* same record are blocked. To avoid deadlock, any explicit database operation must not be
* performed in this function.
*/
bool accept(const char* kbuf, size_t ksiz, Visitor* visitor, bool writable = true) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && visitor);
if (writable) {
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (!(omode_ & OWRITER)) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
std::string key(kbuf, ksiz);
typename STRMAP::iterator it = recs_.find(key);
if (it == recs_.end()) {
size_t vsiz;
const char* vbuf = visitor->visit_empty(kbuf, ksiz, &vsiz);
if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) {
if (tran_) {
TranLog log(key);
trlogs_.push_back(log);
}
size_ += ksiz + vsiz;
recs_[key] = std::string(vbuf, vsiz);
}
} else {
const std::string& value = it->second;
size_t vsiz;
const char* vbuf = visitor->visit_full(kbuf, ksiz, value.data(), value.size(), &vsiz);
if (vbuf == Visitor::REMOVE) {
if (tran_) {
TranLog log(key, value);
trlogs_.push_back(log);
}
size_ -= ksiz + value.size();
if (!curs_.empty()) {
typename CursorList::const_iterator cit = curs_.begin();
typename CursorList::const_iterator citend = curs_.end();
while (cit != citend) {
Cursor* cur = *cit;
if (cur->it_ == it) ++cur->it_;
++cit;
}
}
recs_.erase(it);
} else if (vbuf != Visitor::NOP) {
if (tran_) {
TranLog log(key, value);
trlogs_.push_back(log);
}
size_ -= value.size();
size_ += vsiz;
it->second = std::string(vbuf, vsiz);
}
}
} else {
// ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
std::string key(kbuf, ksiz);
const STRMAP& rrecs = recs_;
typename STRMAP::const_iterator it = rrecs.find(key);
if (it == rrecs.end()) {
size_t vsiz;
const char* vbuf = visitor->visit_empty(kbuf, ksiz, &vsiz);
if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
} else {
const std::string& value = it->second;
size_t vsiz;
const char* vbuf = visitor->visit_full(kbuf, ksiz, value.data(), value.size(), &vsiz);
if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
}
}
return true;
}
/**
* Accept a visitor to multiple records at once.
* @param keys specifies a string vector of the keys.
* @param visitor a visitor object.
* @param writable true for writable operation, or false for read-only operation.
* @return true on success, or false on failure.
* @note The operations for specified records are performed atomically and other threads
* accessing the same records are blocked. To avoid deadlock, any explicit database operation
* must not be performed in this function.
*/
bool accept_bulk(const std::vector& keys, Visitor* visitor,
bool writable = true) {
_assert_(visitor);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (!(omode_ & OWRITER)) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
ScopedVisitor svis(visitor);
if (keys.empty()) return true;
std::vector::const_iterator kit = keys.begin();
std::vector::const_iterator kitend = keys.end();
while (kit != kitend) {
const std::string& key = *kit;
typename STRMAP::iterator it = recs_.find(key);
if (it == recs_.end()) {
size_t vsiz;
const char* vbuf = visitor->visit_empty(key.data(), key.size(), &vsiz);
if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) {
if (tran_) {
TranLog log(key);
trlogs_.push_back(log);
}
size_ += key.size() + vsiz;
recs_[key] = std::string(vbuf, vsiz);
}
} else {
const std::string& value = it->second;
size_t vsiz;
const char* vbuf = visitor->visit_full(key.data(), key.size(),
value.data(), value.size(), &vsiz);
if (vbuf == Visitor::REMOVE) {
if (tran_) {
TranLog log(key, value);
trlogs_.push_back(log);
}
size_ -= key.size() + value.size();
if (!curs_.empty()) {
typename CursorList::const_iterator cit = curs_.begin();
typename CursorList::const_iterator citend = curs_.end();
while (cit != citend) {
Cursor* cur = *cit;
if (cur->it_ == it) ++cur->it_;
++cit;
}
}
recs_.erase(it);
} else if (vbuf != Visitor::NOP) {
if (tran_) {
TranLog log(key, value);
trlogs_.push_back(log);
}
size_ -= value.size();
size_ += vsiz;
it->second = std::string(vbuf, vsiz);
}
}
++kit;
}
return true;
}
/**
* Iterate to accept a visitor for each record.
* @param visitor a visitor object.
* @param writable true for writable operation, or false for read-only operation.
* @param checker a progress checker object. If it is NULL, no checking is performed.
* @return true on success, or false on failure.
* @note The whole iteration is performed atomically and other threads are blocked. To avoid
* deadlock, any explicit database operation must not be performed in this function.
*/
bool iterate(Visitor *visitor, bool writable = true, ProgressChecker* checker = NULL) {
_assert_(visitor);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (writable && !(omode_ & OWRITER)) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
ScopedVisitor svis(visitor);
int64_t allcnt = recs_.size();
if (checker && !checker->check("iterate", "beginning", 0, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
typename STRMAP::iterator it = recs_.begin();
typename STRMAP::iterator itend = recs_.end();
int64_t curcnt = 0;
while (it != itend) {
const std::string& key = it->first;
const std::string& value = it->second;
size_t vsiz;
const char* vbuf = visitor->visit_full(key.data(), key.size(),
value.data(), value.size(), &vsiz);
if (vbuf == Visitor::REMOVE) {
size_ -= key.size() + value.size();
recs_.erase(it++);
} else if (vbuf == Visitor::NOP) {
++it;
} else {
size_ -= value.size();
size_ += vsiz;
it->second = std::string(vbuf, vsiz);
++it;
}
curcnt++;
if (checker && !checker->check("iterate", "processing", curcnt, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
}
if (checker && !checker->check("iterate", "ending", -1, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
trigger_meta(MetaTrigger::ITERATE, "iterate");
return true;
}
/**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object. If it is NULL, no checking is performed.
* @return true on success, or false on failure.
* @note This function is for reading records and not for updating ones. The return value of
* the visitor is just ignored. To avoid deadlock, any explicit database operation must not
* be performed in this function.
*/
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* checker = NULL) {
_assert_(visitor && thnum <= MEMMAXSIZ);
// ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (thnum < 1) thnum = 1;
if (thnum > (size_t)INT8MAX) thnum = INT8MAX;
ScopedVisitor svis(visitor);
int64_t allcnt = recs_.size();
if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
class ThreadImpl : public Thread {
public:
explicit ThreadImpl() :
db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0),
itp_(NULL), itend_(), itmtx_(NULL), error_() {}
void init(ProtoDB* db, Visitor* visitor, ProgressChecker* checker, int64_t allcnt,
typename STRMAP::const_iterator* itp, typename STRMAP::const_iterator itend,
Mutex* itmtx) {
db_ = db;
visitor_ = visitor;
checker_ = checker;
allcnt_ = allcnt;
itp_ = itp;
itend_ = itend;
itmtx_ = itmtx;
}
const Error& error() {
return error_;
}
private:
void run() {
ProtoDB* db = db_;
Visitor* visitor = visitor_;
ProgressChecker* checker = checker_;
int64_t allcnt = allcnt_;
typename STRMAP::const_iterator* itp = itp_;
typename STRMAP::const_iterator itend = itend_;
Mutex* itmtx = itmtx_;
while (true) {
itmtx->lock();
if (*itp == itend) {
itmtx->unlock();
break;
}
const std::string& key = (*itp)->first;
const std::string& value = (*itp)->second;
++(*itp);
itmtx->unlock();
size_t vsiz;
visitor->visit_full(key.data(), key.size(), value.data(), value.size(), &vsiz);
if (checker && !checker->check("scan_parallel", "processing", -1, allcnt)) {
db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
error_ = db->error();
break;
}
}
}
ProtoDB* db_;
Visitor* visitor_;
ProgressChecker* checker_;
int64_t allcnt_;
typename STRMAP::const_iterator* itp_;
typename STRMAP::const_iterator itend_;
Mutex* itmtx_;
Error error_;
};
bool err = false;
typename STRMAP::const_iterator it = recs_.begin();
typename STRMAP::const_iterator itend = recs_.end();
Mutex itmtx;
ThreadImpl* threads = new ThreadImpl[thnum];
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->init(this, visitor, checker, allcnt, &it, itend, &itmtx);
}
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->start();
}
for (size_t i = 0; i < thnum; i++) {
ThreadImpl* thread = threads + i;
thread->join();
if (thread->error() != Error::SUCCESS) {
*error_ = thread->error();
err = true;
}
}
delete[] threads;
if (err) return false;
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
trigger_meta(MetaTrigger::ITERATE, "scan_parallel");
return true;
}
/**
* Get the last happened error.
* @return the last happened error.
*/
Error error() const {
_assert_(true);
return error_;
}
/**
* Set the error information.
* @param file the file name of the program source code.
* @param line the line number of the program source code.
* @param func the function name of the program source code.
* @param code an error code.
* @param message a supplement message.
*/
void set_error(const char* file, int32_t line, const char* func,
Error::Code code, const char* message) {
_assert_(file && line > 0 && func && message);
error_->set(code, message);
if (logger_) {
Logger::Kind kind = code == Error::BROKEN || code == Error::SYSTEM ?
Logger::ERROR : Logger::INFO;
if (kind & logkinds_)
report(file, line, func, kind, "%d: %s: %s", code, Error::codename(code), message);
}
}
/**
* Open a database file.
* @param path the path of a database file.
* @param mode the connection mode. BasicDB::OWRITER as a writer, BasicDB::OREADER as a
* reader. The following may be added to the writer mode by bitwise-or: BasicDB::OCREATE,
* which means it creates a new database if the file does not exist, BasicDB::OTRUNCATE, which
* means it creates a new database regardless if the file exists, BasicDB::OAUTOTRAN, which
* means each updating operation is performed in implicit transaction, BasicDB::OAUTOSYNC,
* which means each updating operation is followed by implicit synchronization with the file
* system. The following may be added to both of the reader mode and the writer mode by
* bitwise-or: BasicDB::ONOLOCK, which means it opens the database file without file locking,
* BasicDB::OTRYLOCK, which means locking is performed without blocking, BasicDB::ONOREPAIR,
* which means the database file is not repaired implicitly even if file destruction is
* detected.
* @return true on success, or false on failure.
* @note Every opened database must be closed by the BasicDB::close method when it is no
* longer in use. It is not allowed for two or more database objects in the same process to
* keep their connections to the same database file at the same time.
*/
bool open(const std::string& path, uint32_t mode = OWRITER | OCREATE) {
_assert_(true);
// ScopedRWLock lock(&mlock_, true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
report(_KCCODELINE_, Logger::DEBUG, "opening the database (path=%s)", path.c_str());
omode_ = mode;
path_.append(path);
std::memset(opaque_, 0, sizeof(opaque_));
trigger_meta(MetaTrigger::OPEN, "open");
return true;
}
/**
* Close the database file.
* @return true on success, or false on failure.
*/
bool close() {
_assert_(true);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
report(_KCCODELINE_, Logger::DEBUG, "closing the database (path=%s)", path_.c_str());
tran_ = false;
trlogs_.clear();
recs_.clear();
if (!curs_.empty()) {
typename CursorList::const_iterator cit = curs_.begin();
typename CursorList::const_iterator citend = curs_.end();
while (cit != citend) {
Cursor* cur = *cit;
cur->it_ = recs_.end();
++cit;
}
}
path_.clear();
omode_ = 0;
trigger_meta(MetaTrigger::CLOSE, "close");
return true;
}
/**
* Synchronize updated contents with the file and the device.
* @param hard true for physical synchronization with the device, or false for logical
* synchronization with the file system.
* @param proc a postprocessor object. If it is NULL, no postprocessing is performed.
* @param checker a progress checker object. If it is NULL, no checking is performed.
* @return true on success, or false on failure.
* @note The operation of the postprocessor is performed atomically and other threads accessing
* the same record are blocked. To avoid deadlock, any explicit database operation must not
* be performed in this function.
*/
bool synchronize(bool hard = false, FileProcessor* proc = NULL,
ProgressChecker* checker = NULL) {
_assert_(true);
// ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
bool err = false;
if ((omode_ & OWRITER) && checker &&
!checker->check("synchronize", "nothing to be synchronized", -1, -1)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
if (proc) {
if (checker && !checker->check("synchronize", "running the post processor", -1, -1)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
if (!proc->process(path_, recs_.size(), size_)) {
set_error(_KCCODELINE_, Error::LOGIC, "postprocessing failed");
err = true;
}
}
trigger_meta(MetaTrigger::SYNCHRONIZE, "synchronize");
return !err;
}
/**
* Occupy database by locking and do something meanwhile.
* @param writable true to use writer lock, or false to use reader lock.
* @param proc a processor object. If it is NULL, no processing is performed.
* @return true on success, or false on failure.
* @note The operation of the processor is performed atomically and other threads accessing
* the same record are blocked. To avoid deadlock, any explicit database operation must not
* be performed in this function.
*/
bool occupy(bool writable = true, FileProcessor* proc = NULL) {
_assert_(true);
// ScopedRWLock lock(&mlock_, writable);
bool err = false;
if (proc && !proc->process(path_, recs_.size(), size_)) {
set_error(_KCCODELINE_, Error::LOGIC, "processing failed");
err = true;
}
trigger_meta(MetaTrigger::OCCUPY, "occupy");
return !err;
}
/**
* Begin transaction.
* @param hard true for physical synchronization with the device, or false for logical
* synchronization with the file system.
* @return true on success, or false on failure.
*/
bool begin_transaction(bool hard = false) {
_assert_(true);
uint32_t wcnt = 0;
while (true) {
// mlock_.lock_writer();
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
// mlock_.unlock();
return false;
}
if (!(omode_ & OWRITER)) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
// mlock_.unlock();
return false;
}
if (!tran_) break;
// mlock_.unlock();
if (wcnt >= LOCKBUSYLOOP) {
Thread::chill();
} else {
Thread::yield();
wcnt++;
}
}
tran_ = true;
trsize_ = size_;
trigger_meta(MetaTrigger::BEGINTRAN, "begin_transaction");
// mlock_.unlock();
return true;
}
/**
* Try to begin transaction.
* @param hard true for physical synchronization with the device, or false for logical
* synchronization with the file system.
* @return true on success, or false on failure.
*/
bool begin_transaction_try(bool hard = false) {
_assert_(true);
// mlock_.lock_writer();
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
// mlock_.unlock();
return false;
}
if (!(omode_ & OWRITER)) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
// mlock_.unlock();
return false;
}
if (tran_) {
set_error(_KCCODELINE_, Error::LOGIC, "competition avoided");
// mlock_.unlock();
return false;
}
tran_ = true;
trsize_ = size_;
trigger_meta(MetaTrigger::BEGINTRAN, "begin_transaction_try");
// mlock_.unlock();
return true;
}
/**
* End transaction.
* @param commit true to commit the transaction, or false to abort the transaction.
* @return true on success, or false on failure.
*/
bool end_transaction(bool commit = true) {
_assert_(true);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (!tran_) {
set_error(_KCCODELINE_, Error::INVALID, "not in transaction");
return false;
}
if (!commit) {
if (!curs_.empty()) {
typename CursorList::const_iterator cit = curs_.begin();
typename CursorList::const_iterator citend = curs_.end();
while (cit != citend) {
Cursor* cur = *cit;
cur->it_ = recs_.end();
++cit;
}
}
const TranLogList& logs = trlogs_;
typename TranLogList::const_iterator lit = logs.end();
typename TranLogList::const_iterator litbeg = logs.begin();
while (lit != litbeg) {
--lit;
if (lit->full) {
recs_[lit->key] = lit->value;
} else {
recs_.erase(lit->key);
}
}
size_ = trsize_;
}
trlogs_.clear();
tran_ = false;
trigger_meta(commit ? MetaTrigger::COMMITTRAN : MetaTrigger::ABORTTRAN, "end_transaction");
return true;
}
/**
* Remove all records.
* @return true on success, or false on failure.
*/
bool clear() {
_assert_(true);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
recs_.clear();
if (!curs_.empty()) {
typename CursorList::const_iterator cit = curs_.begin();
typename CursorList::const_iterator citend = curs_.end();
while (cit != citend) {
Cursor* cur = *cit;
cur->it_ = recs_.end();
++cit;
}
}
std::memset(opaque_, 0, sizeof(opaque_));
trigger_meta(MetaTrigger::CLEAR, "clear");
return true;
}
/**
* Get the number of records.
* @return the number of records, or -1 on failure.
*/
int64_t count() {
_assert_(true);
// ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return -1;
}
return recs_.size();
}
/**
* Get the size of the database file.
* @return the size of the database file in bytes, or -1 on failure.
*/
int64_t size() {
_assert_(true);
// ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return -1;
}
return size_;
}
/**
* Get the path of the database file.
* @return the path of the database file, or an empty string on failure.
*/
std::string path() {
_assert_(true);
// ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return "";
}
return path_;
}
/**
* Get the miscellaneous status information.
* @param strmap a string map to contain the result.
* @return true on success, or false on failure.
*/
bool status(std::map* strmap) {
_assert_(strmap);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
(*strmap)["type"] = strprintf("%u", (unsigned)DBTYPE);
(*strmap)["realtype"] = strprintf("%u", (unsigned)DBTYPE);
(*strmap)["path"] = path_;
if (strmap->count("opaque") > 0)
(*strmap)["opaque"] = std::string(opaque_, sizeof(opaque_));
(*strmap)["count"] = strprintf("%lld", (long long)recs_.size());
(*strmap)["size"] = strprintf("%lld", (long long)size_);
return true;
}
/**
* Create a cursor object.
* @return the return value is the created cursor object.
* @note Because the object of the return value is allocated by the constructor, it should be
* released with the delete operator when it is no longer in use.
*/
Cursor* cursor() {
_assert_(true);
return new Cursor(this);
}
/**
* Write a log message.
* @param file the file name of the program source code.
* @param line the line number of the program source code.
* @param func the function name of the program source code.
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
* @param message the supplement message.
*/
void log(const char* file, int32_t line, const char* func, Logger::Kind kind,
const char* message) {
_assert_(file && line > 0 && func && message);
// ScopedRWLock lock(&mlock_, false);
if (!logger_) return;
logger_->log(file, line, func, kind, message);
}
/**
* Set the internal logger.
* @param logger the logger object.
* @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging,
* Logger::INFO for normal information, Logger::WARN for warning, and Logger::ERROR for fatal
* error.
* @return true on success, or false on failure.
*/
bool tune_logger(Logger* logger, uint32_t kinds = Logger::WARN | Logger::ERROR) {
_assert_(logger);
// ScopedRWLock lock(&mlock_, true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
logger_ = logger;
logkinds_ = kinds;
return true;
}
/**
* Set the internal meta operation trigger.
* @param trigger the trigger object.
* @return true on success, or false on failure.
*/
bool tune_meta_trigger(MetaTrigger* trigger) {
_assert_(trigger);
// ScopedRWLock lock(&mlock_, true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
mtrigger_ = trigger;
return true;
}
/**
* Get the opaque data.
* @return the pointer to the opaque data region, whose size is 16 bytes.
*/
char* opaque() {
_assert_(true);
// ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return NULL;
}
return opaque_;
}
/**
* Synchronize the opaque data.
* @return true on success, or false on failure.
*/
bool synchronize_opaque() {
_assert_(true);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (!(omode_ & OWRITER)) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
return true;
}
protected:
/**
* Report a message for debugging.
* @param file the file name of the program source code.
* @param line the line number of the program source code.
* @param func the function name of the program source code.
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
* @param format the printf-like format string.
* @param ... used according to the format string.
*/
void report(const char* file, int32_t line, const char* func, Logger::Kind kind,
const char* format, ...) {
_assert_(file && line > 0 && func && format);
if (!logger_ || !(kind & logkinds_)) return;
std::string message;
strprintf(&message, "%s: ", path_.empty() ? "-" : path_.c_str());
va_list ap;
va_start(ap, format);
vstrprintf(&message, format, ap);
va_end(ap);
logger_->log(file, line, func, kind, message.c_str());
}
/**
* Report a message for debugging with variable number of arguments.
* @param file the file name of the program source code.
* @param line the line number of the program source code.
* @param func the function name of the program source code.
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
* @param format the printf-like format string.
* @param ap used according to the format string.
*/
void report_valist(const char* file, int32_t line, const char* func, Logger::Kind kind,
const char* format, va_list ap) {
_assert_(file && line > 0 && func && format);
if (!logger_ || !(kind & logkinds_)) return;
std::string message;
strprintf(&message, "%s: ", path_.empty() ? "-" : path_.c_str());
vstrprintf(&message, format, ap);
logger_->log(file, line, func, kind, message.c_str());
}
/**
* Report the content of a binary buffer for debugging.
* @param file the file name of the epicenter.
* @param line the line number of the epicenter.
* @param func the function name of the program source code.
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
* @param name the name of the information.
* @param buf the binary buffer.
* @param size the size of the binary buffer
*/
void report_binary(const char* file, int32_t line, const char* func, Logger::Kind kind,
const char* name, const char* buf, size_t size) {
_assert_(file && line > 0 && func && name && buf && size <= MEMMAXSIZ);
if (!logger_) return;
char* hex = hexencode(buf, size);
report(file, line, func, kind, "%s=%s", name, hex);
delete[] hex;
}
/**
* Trigger a meta database operation.
* @param kind the kind of the event. MetaTrigger::OPEN for opening, MetaTrigger::CLOSE for
* closing, MetaTrigger::CLEAR for clearing, MetaTrigger::ITERATE for iteration,
* MetaTrigger::SYNCHRONIZE for synchronization, MetaTrigger::BEGINTRAN for beginning
* transaction, MetaTrigger::COMMITTRAN for committing transaction, MetaTrigger::ABORTTRAN
* for aborting transaction, and MetaTrigger::MISC for miscellaneous operations.
* @param message the supplement message.
*/
void trigger_meta(MetaTrigger::Kind kind, const char* message) {
_assert_(message);
if (mtrigger_) mtrigger_->trigger(kind, message);
}
private:
/**
* Transaction log.
*/
struct TranLog {
bool full; ///< flag whether full
std::string key; ///< old key
std::string value; ///< old value
/** constructor for a full record */
explicit TranLog(const std::string& pkey, const std::string& pvalue) :
full(true), key(pkey), value(pvalue) {
_assert_(true);
}
/** constructor for an empty record */
explicit TranLog(const std::string& pkey) : full(false), key(pkey) {
_assert_(true);
}
};
/**
* Scoped visitor.
*/
class ScopedVisitor {
public:
/** constructor */
explicit ScopedVisitor(Visitor* visitor) : visitor_(visitor) {
_assert_(visitor);
visitor_->visit_before();
}
/** destructor */
~ScopedVisitor() {
_assert_(true);
visitor_->visit_after();
}
private:
Visitor* visitor_; ///< visitor
};
/**
* Tune the internal map object.
*/
void map_tune();
/** Dummy constructor to forbid the use. */
ProtoDB(const ProtoDB&);
/** Dummy Operator to forbid the use. */
ProtoDB& operator =(const ProtoDB&);
/** The last happened error. */
TSD error_;
/** The internal logger. */
Logger* logger_;
/** The kinds of logged messages. */
uint32_t logkinds_;
/** The internal meta operation trigger. */
MetaTrigger* mtrigger_;
/** The open mode. */
uint32_t omode_;
/** The map of records. */
STRMAP recs_;
/** The cursor objects. */
CursorList curs_;
/** The path of the database file. */
std::string path_;
/** The total size of records. */
int64_t size_;
/** The opaque data. */
char opaque_[OPAQUESIZ];
/** The flag whether in transaction. */
bool tran_;
/** The transaction logs. */
TranLogList trlogs_;
/** The old size before transaction. */
size_t trsize_;
};
/**
* Search for a record.
*/
template
inline void ProtoDB::Cursor::search(const std::string& key) {
_assert_(true);
it_ = db_->recs_.find(key);
}
/**
* Search for a record.
*/
template <> /** specialization for StringTreeMap */
inline void ProtoDB::Cursor::search(const std::string& key) {
_assert_(true);
it_ = db_->recs_.lower_bound(key);
}
/**
* Place back the inner iterator.
*/
template
inline bool ProtoDB::Cursor::iter_back() {
_assert_(true);
return false;
}
/**
* Place back the inner iterator.
*/
template <> /** specialization for StringTreeMap */
inline bool ProtoDB::Cursor::iter_back() {
_assert_(true);
--it_;
return true;
}
/**
* Tune the internal map object.
*/
template
inline void ProtoDB::map_tune() {
_assert_(true);
}
/**
* Tune the internal map object.
*/
template <> /** specialization for StringTreeMap */
inline void ProtoDB::map_tune() {
_assert_(true);
recs_.rehash(1048583LL);
recs_.max_load_factor(FLTMAX);
}
/** An alias of the prototype hash database. */
typedef ProtoDB ProtoHashDB;
/** An alias of the prototype tree database. */
typedef ProtoDB ProtoTreeDB;
} // common namespace
#endif // duplication check
// END OF FILE