/*************************************************************************************************
* Database extension
* Copyright (C) 2009-2012 FAL Labs
* This file is part of Kyoto Cabinet.
* This program is free software: you can redistribute it and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation, either version
* 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
* You should have received a copy of the GNU General Public License along with this program.
* If not, see .
*************************************************************************************************/
#ifndef _KCDBEXT_H // duplication check
#define _KCDBEXT_H
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace kyotocabinet { // common namespace
/**
* MapReduce framework.
* @note Although this framework is not distributed or concurrent, it is useful for aggregate
* calculation with less CPU loading and less memory usage.
*/
class MapReduce {
public:
class ValueIterator;
private:
class FlushThread;
class ReduceTaskQueue;
class MapVisitor;
struct MergeLine;
/** An alias of vector of loaded values. */
typedef std::vector Values;
/** The default number of temporary databases. */
static const size_t DEFDBNUM = 8;
/** The maxinum number of temporary databases. */
static const size_t MAXDBNUM = 256;
/** The default cache limit. */
static const int64_t DEFCLIM = 512LL << 20;
/** The default cache bucket numer. */
static const int64_t DEFCBNUM = 1048583LL;
/** The bucket number of temprary databases. */
static const int64_t DBBNUM = 512LL << 10;
/** The page size of temprary databases. */
static const int32_t DBPSIZ = 32768;
/** The mapped size of temprary databases. */
static const int64_t DBMSIZ = 516LL * 4096;
/** The page cache capacity of temprary databases. */
static const int64_t DBPCCAP = 16LL << 20;
/** The default number of threads in parallel mode. */
static const size_t DEFTHNUM = 8;
/** The number of slots of the record lock. */
static const int32_t RLOCKSLOT = 256;
public:
/**
* Value iterator for the reducer.
*/
class ValueIterator {
friend class MapReduce;
public:
/**
* Get the next value.
* @param sp the pointer to the variable into which the size of the region of the return
* value is assigned.
* @return the pointer to the next value region, or NULL if no value remains.
*/
const char* next(size_t* sp) {
_assert_(sp);
if (!vptr_) {
if (vit_ == vend_) return NULL;
vptr_ = vit_->data();
vsiz_ = vit_->size();
vit_++;
}
uint64_t vsiz;
size_t step = readvarnum(vptr_, vsiz_, &vsiz);
vptr_ += step;
vsiz_ -= step;
const char* vbuf = vptr_;
*sp = vsiz;
vptr_ += vsiz;
vsiz_ -= vsiz;
if (vsiz_ < 1) vptr_ = NULL;
return vbuf;
}
private:
/**
* Default constructor.
*/
explicit ValueIterator(Values::const_iterator vit, Values::const_iterator vend) :
vit_(vit), vend_(vend), vptr_(NULL), vsiz_(0) {
_assert_(true);
}
/**
* Destructor.
*/
~ValueIterator() {
_assert_(true);
}
/** Dummy constructor to forbid the use. */
ValueIterator(const ValueIterator&);
/** Dummy Operator to forbid the use. */
ValueIterator& operator =(const ValueIterator&);
/** The current iterator of loaded values. */
Values::const_iterator vit_;
/** The ending iterator of loaded values. */
Values::const_iterator vend_;
/** The pointer of the current value. */
const char* vptr_;
/** The size of the current value. */
size_t vsiz_;
};
/**
* Execution options.
*/
enum Option {
XNOLOCK = 1 << 0, ///< avoid locking against update operations
XPARAMAP = 1 << 1, ///< run mappers in parallel
XPARARED = 1 << 2, ///< run reducers in parallel
XPARAFLS = 1 << 3, ///< run cache flushers in parallel
XNOCOMP = 1 << 8 ///< avoid compression of temporary databases
};
/**
* Default constructor.
*/
explicit MapReduce() :
db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0),
mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), flsthnum_(DEFTHNUM),
cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), flsths_(NULL),
redtasks_(NULL), redaborted_(false), rlocks_(NULL) {
_assert_(true);
}
/**
* Destructor.
*/
virtual ~MapReduce() {
_assert_(true);
}
/**
* Map a record data.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param vbuf the pointer to the value region.
* @param vsiz the size of the value region.
* @return true on success, or false on failure.
* @note This function can call the MapReduce::emit method to emit a record. To avoid
* deadlock, any explicit database operation must not be performed in this function.
*/
virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) = 0;
/**
* Reduce a record data.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param iter the iterator to get the values.
* @return true on success, or false on failure.
* @note To avoid deadlock, any explicit database operation must not be performed in this
* function.
*/
virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0;
/**
* Preprocess the map operations.
* @return true on success, or false on failure.
* @note This function can call the MapReduce::emit method to emit a record. To avoid
* deadlock, any explicit database operation must not be performed in this function.
*/
virtual bool preprocess() {
_assert_(true);
return true;
}
/**
* Mediate between the map and the reduce phases.
* @return true on success, or false on failure.
* @note This function can call the MapReduce::emit method to emit a record. To avoid
* deadlock, any explicit database operation must not be performed in this function.
*/
virtual bool midprocess() {
_assert_(true);
return true;
}
/**
* Postprocess the reduce operations.
* @return true on success, or false on failure.
* @note To avoid deadlock, any explicit database operation must not be performed in this
* function.
*/
virtual bool postprocess() {
_assert_(true);
return true;
}
/**
* Process a log message.
* @param name the name of the event.
* @param message a supplement message.
* @return true on success, or false on failure.
*/
virtual bool log(const char* name, const char* message) {
_assert_(name && message);
return true;
}
/**
* Execute the MapReduce process about a database.
* @param db the source database.
* @param tmppath the path of a directory for the temporary data storage. If it is an empty
* string, temporary data are handled on memory.
* @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking
* against update operations by other threads, MapReduce::XPARAMAP to run the mapper in
* parallel, MapReduce::XPARARED to run the reducer in parallel, MapReduce::XNOCOMP to avoid
* compression of temporary databases.
* @return true on success, or false on failure.
*/
bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) {
int64_t count = db->count();
if (count < 0) {
if (db->error() != BasicDB::Error::NOIMPL) return false;
count = 0;
}
bool err = false;
double stime, etime;
db_ = db;
rcomp_ = LEXICALCOMP;
BasicDB* idb = db;
if (typeid(*db) == typeid(PolyDB)) {
PolyDB* pdb = (PolyDB*)idb;
idb = pdb->reveal_inner_db();
}
const std::type_info& info = typeid(*idb);
if (info == typeid(GrassDB)) {
GrassDB* gdb = (GrassDB*)idb;
rcomp_ = gdb->rcomp();
} else if (info == typeid(TreeDB)) {
TreeDB* tdb = (TreeDB*)idb;
rcomp_ = tdb->rcomp();
} else if (info == typeid(ForestDB)) {
ForestDB* fdb = (ForestDB*)idb;
rcomp_ = fdb->rcomp();
}
tmpdbs_ = new BasicDB*[dbnum_];
if (tmppath.empty()) {
if (!logf("prepare", "started to open temporary databases on memory")) err = true;
stime = time();
for (size_t i = 0; i < dbnum_; i++) {
GrassDB* gdb = new GrassDB;
int32_t myopts = 0;
if (!(opts & XNOCOMP)) myopts |= GrassDB::TCOMPRESS;
gdb->tune_options(myopts);
gdb->tune_buckets(DBBNUM / 2);
gdb->tune_page(DBPSIZ);
gdb->tune_page_cache(DBPCCAP);
gdb->tune_comparator(rcomp_);
gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUNCATE);
tmpdbs_[i] = gdb;
}
etime = time();
if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime))
err = true;
if (err) {
delete[] tmpdbs_;
return false;
}
} else {
File::Status sbuf;
if (!File::status(tmppath, &sbuf) || !sbuf.isdir) {
db->set_error(_KCCODELINE_, BasicDB::Error::NOREPOS, "no such directory");
delete[] tmpdbs_;
return false;
}
if (!logf("prepare", "started to open temporary databases under %s", tmppath.c_str()))
err = true;
stime = time();
uint32_t pid = getpid() & UINT16MAX;
uint32_t tid = Thread::hash() & UINT16MAX;
uint32_t ts = time() * 1000;
for (size_t i = 0; i < dbnum_; i++) {
std::string childpath =
strprintf("%s%cmr-%04x-%04x-%08x-%03d%ckct",
tmppath.c_str(), File::PATHCHR, pid, tid, ts, (int)(i + 1), File::EXTCHR);
TreeDB* tdb = new TreeDB;
int32_t myopts = TreeDB::TSMALL | TreeDB::TLINEAR;
if (!(opts & XNOCOMP)) myopts |= TreeDB::TCOMPRESS;
tdb->tune_options(myopts);
tdb->tune_buckets(DBBNUM);
tdb->tune_page(DBPSIZ);
tdb->tune_map(DBMSIZ);
tdb->tune_page_cache(DBPCCAP);
tdb->tune_comparator(rcomp_);
if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeDB::OTRUNCATE)) {
const BasicDB::Error& e = tdb->error();
db->set_error(_KCCODELINE_, e.code(), e.message());
err = true;
}
tmpdbs_[i] = tdb;
}
etime = time();
if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime))
err = true;
if (err) {
for (size_t i = 0; i < dbnum_; i++) {
delete tmpdbs_[i];
}
delete[] tmpdbs_;
return false;
}
}
if (opts & XPARARED) redtasks_ = new ReduceTaskQueue;
if (opts & XPARAFLS) flsths_ = new std::deque;
if (opts & XNOLOCK) {
MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, count);
mapvisitor.visit_before();
if (!err) {
BasicDB::Cursor* cur = db->cursor();
if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = true;
while (!err) {
if (!cur->accept(&mapvisitor, false, true)) {
if (cur->error() != BasicDB::Error::NOREC) err = true;
break;
}
}
delete cur;
}
if (mapvisitor.error()) {
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed");
err = true;
}
mapvisitor.visit_after();
} else if (opts & XPARAMAP) {
MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, count);
rlocks_ = new SlottedMutex(RLOCKSLOT);
if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker)) {
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed");
err = true;
}
delete rlocks_;
rlocks_ = NULL;
if (mapvisitor.error()) err = true;
} else {
MapChecker mapchecker;
MapVisitor mapvisitor(this, &mapchecker, count);
if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true;
if (mapvisitor.error()) {
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed");
err = true;
}
}
if (flsths_) {
delete flsths_;
flsths_ = NULL;
}
if (redtasks_) {
delete redtasks_;
redtasks_ = NULL;
}
if (!logf("clean", "closing the temporary databases")) err = true;
stime = time();
for (size_t i = 0; i < dbnum_; i++) {
const std::string& path = tmpdbs_[i]->path();
if (!tmpdbs_[i]->clear()) {
const BasicDB::Error& e = tmpdbs_[i]->error();
db->set_error(_KCCODELINE_, e.code(), e.message());
err = true;
}
if (!tmpdbs_[i]->close()) {
const BasicDB::Error& e = tmpdbs_[i]->error();
db->set_error(_KCCODELINE_, e.code(), e.message());
err = true;
}
if (!tmppath.empty()) File::remove(path);
delete tmpdbs_[i];
}
etime = time();
if (!logf("clean", "closing the temporary databases finished: time=%.6f",
etime - stime)) err = true;
delete[] tmpdbs_;
return !err;
}
/**
* Set the storage configurations.
* @param dbnum the number of temporary databases.
* @param clim the limit size of the internal cache.
* @param cbnum the bucket number of the internal cache.
*/
void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) {
_assert_(true);
dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM;
if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM;
clim_ = clim > 0 ? clim : DEFCLIM;
cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM;
if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_);
}
/**
* Set the thread configurations.
* @param mapthnum the number of threads for the mapper.
* @param redthnum the number of threads for the reducer.
* @param flsthnum the number of threads for the internal flusher.
*/
void tune_thread(int32_t mapthnum, int32_t redthnum, int32_t flsthnum) {
_assert_(true);
mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM;
redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM;
flsthnum_ = flsthnum > 0 ? flsthnum : DEFTHNUM;
}
protected:
/**
* Emit a record from the mapper.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param vbuf the pointer to the value region.
* @param vsiz the size of the value region.
* @return true on success, or false on failure.
*/
bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
bool err = false;
size_t rsiz = sizevarnum(vsiz) + vsiz;
char stack[NUMBUFSIZ*4];
char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
char* wp = rbuf;
wp += writevarnum(rbuf, vsiz);
std::memcpy(wp, vbuf, vsiz);
if (rlocks_) {
size_t bidx = TinyHashMap::hash_record(kbuf, ksiz) % cbnum_;
size_t lidx = bidx % RLOCKSLOT;
rlocks_->lock(lidx);
cache_->append(kbuf, ksiz, rbuf, rsiz);
rlocks_->unlock(lidx);
} else {
cache_->append(kbuf, ksiz, rbuf, rsiz);
}
if (rbuf != stack) delete[] rbuf;
csiz_ += sizevarnum(ksiz) + ksiz + rsiz;
return !err;
}
private:
/**
* Cache flusher.
*/
class FlushThread : public Thread {
public:
/** constructor */
explicit FlushThread(MapReduce* mr, BasicDB* tmpdb,
TinyHashMap* cache, size_t csiz, bool cown) :
mr_(mr), tmpdb_(tmpdb), cache_(cache), csiz_(csiz), cown_(cown), err_(false) {}
/** perform the concrete process */
void run() {
if (!mr_->logf("map", "started to flushing the cache: count=%lld size=%lld",
(long long)cache_->count(), (long long)csiz_)) err_ = true;
double stime = time();
BasicDB* tmpdb = tmpdb_;
TinyHashMap* cache = cache_;
bool cown = cown_;
TinyHashMap::Sorter sorter(cache);
const char* kbuf, *vbuf;
size_t ksiz, vsiz;
while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) {
if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) {
const BasicDB::Error& e = tmpdb->error();
mr_->db_->set_error(_KCCODELINE_, e.code(), e.message());
err_ = true;
}
sorter.step();
if (cown) cache->remove(kbuf, ksiz);
}
double etime = time();
if (!mr_->logf("map", "flushing the cache finished: time=%.6f", etime - stime))
err_ = true;
if (cown) delete cache;
}
/** check the error flag. */
bool error() {
return err_;
}
private:
MapReduce* mr_; ///< driver
BasicDB* tmpdb_; ///< temprary database
TinyHashMap* cache_; ///< cache for emitter
size_t csiz_; ///< current cache size
bool cown_; ///< cache ownership flag
bool err_; ///< error flag
};
/**
* Task queue for parallel reducer.
*/
class ReduceTaskQueue : public TaskQueue {
public:
/**
* Task for parallel reducer.
*/
class ReduceTask : public Task {
friend class ReduceTaskQueue;
public:
/** constructor */
explicit ReduceTask(MapReduce* mr, const char* kbuf, size_t ksiz, const Values& values) :
mr_(mr), key_(kbuf, ksiz), values_(values) {}
private:
MapReduce* mr_; ///< driver
std::string key_; ///< key
Values values_; ///< values
};
/** constructor */
explicit ReduceTaskQueue() {}
private:
/** process a task */
void do_task(Task* task) {
ReduceTask* rtask = (ReduceTask*)task;
ValueIterator iter(rtask->values_.begin(), rtask->values_.end());
if (!rtask->mr_->reduce(rtask->key_.data(), rtask->key_.size(), &iter))
rtask->mr_->redaborted_ = true;
delete rtask;
}
};
/**
* Checker for the map process.
*/
class MapChecker : public BasicDB::ProgressChecker {
public:
/** constructor */
explicit MapChecker() : stop_(false) {}
/** stop the process */
void stop() {
stop_ = true;
}
/** check whether stopped */
bool stopped() {
return stop_;
}
private:
/** check whether stopped */
bool check(const char* name, const char* message, int64_t curcnt, int64_t allcnt) {
return !stop_;
}
bool stop_; ///< flag for stop
};
/**
* Visitor for the map process.
*/
class MapVisitor : public BasicDB::Visitor {
public:
/** constructor */
explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale) :
mr_(mr), checker_(checker), scale_(scale), stime_(0), err_(false) {}
/** get the error flag */
bool error() {
return err_;
}
/** preprocess the mappter */
void visit_before() {
mr_->dbclock_ = 0;
mr_->cache_ = new TinyHashMap(mr_->cbnum_);
mr_->csiz_ = 0;
if (!mr_->preprocess()) err_ = true;
if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true;
if (!mr_->logf("map", "started the map process: scale=%lld", (long long)scale_))
err_ = true;
stime_ = time();
}
/** postprocess the mappter and call the reducer */
void visit_after() {
if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true;
double etime = time();
if (!mr_->logf("map", "the map process finished: time=%.6f", etime - stime_))
err_ = true;
if (!mr_->midprocess()) err_ = true;
if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true;
delete mr_->cache_;
if (mr_->flsths_ && !mr_->flsths_->empty()) {
std::deque::iterator flthit = mr_->flsths_->begin();
std::deque::iterator flthitend = mr_->flsths_->end();
while (flthit != flthitend) {
FlushThread* flth = *flthit;
flth->join();
if (flth->error()) err_ = true;
delete flth;
++flthit;
}
}
if (!err_ && !mr_->execute_reduce()) err_ = true;
if (!mr_->postprocess()) err_ = true;
}
private:
/** visit a record */
const char* visit_full(const char* kbuf, size_t ksiz,
const char* vbuf, size_t vsiz, size_t* sp) {
if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) {
checker_->stop();
err_ = true;
}
if (mr_->rlocks_) {
if (mr_->csiz_ >= mr_->clim_) {
mr_->rlocks_->lock_all();
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
checker_->stop();
err_ = true;
}
mr_->rlocks_->unlock_all();
}
} else {
if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
checker_->stop();
err_ = true;
}
}
return NOP;
}
MapReduce* mr_; ///< driver
MapChecker* checker_; ///< checker
int64_t scale_; ///< number of records
double stime_; ///< start time
bool err_; ///< error flag
};
/**
* Front line of a merging list.
*/
struct MergeLine {
BasicDB::Cursor* cur; ///< cursor
Comparator* rcomp; ///< record comparator
char* kbuf; ///< pointer to the key
size_t ksiz; ///< size of the key
const char* vbuf; ///< pointer to the value
size_t vsiz; ///< size of the value
/** comparing operator */
bool operator <(const MergeLine& right) const {
return rcomp->compare(kbuf, ksiz, right.kbuf, right.ksiz) > 0;
}
};
/**
* Process a log message.
* @param name the name of the event.
* @param format the printf-like format string.
* @param ... used according to the format string.
* @return true on success, or false on failure.
*/
bool logf(const char* name, const char* format, ...) {
_assert_(name && format);
va_list ap;
va_start(ap, format);
std::string message;
vstrprintf(&message, format, ap);
va_end(ap);
return log(name, message.c_str());
}
/**
* Flush all cache records.
* @return true on success, or false on failure.
*/
bool flush_cache() {
_assert_(true);
bool err = false;
BasicDB* tmpdb = tmpdbs_[dbclock_];
dbclock_ = (dbclock_ + 1) % dbnum_;
if (flsths_) {
size_t num = flsths_->size();
if (num >= flsthnum_ || num >= dbnum_) {
FlushThread* flth = flsths_->front();
flsths_->pop_front();
flth->join();
if (flth->error()) err = true;
delete flth;
}
FlushThread* flth = new FlushThread(this, tmpdb, cache_, csiz_, true);
cache_ = new TinyHashMap(cbnum_);
csiz_ = 0;
flth->start();
flsths_->push_back(flth);
} else {
FlushThread flth(this, tmpdb, cache_, csiz_, false);
flth.run();
if (flth.error()) err = true;
cache_->clear();
csiz_ = 0;
}
return !err;
}
/**
* Execute the reduce part.
* @return true on success, or false on failure.
*/
bool execute_reduce() {
bool err = false;
int64_t scale = 0;
for (size_t i = 0; i < dbnum_; i++) {
scale += tmpdbs_[i]->count();
}
if (!logf("reduce", "started the reduce process: scale=%lld", (long long)scale)) err = true;
double stime = time();
if (redtasks_) redtasks_->start(redthnum_);
std::priority_queue lines;
for (size_t i = 0; i < dbnum_; i++) {
MergeLine line;
line.cur = tmpdbs_[i]->cursor();
line.rcomp = rcomp_;
line.cur->jump();
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
if (line.kbuf) {
lines.push(line);
} else {
delete line.cur;
}
}
char* lkbuf = NULL;
size_t lksiz = 0;
Values values;
while (!err && !lines.empty()) {
MergeLine line = lines.top();
lines.pop();
if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lksiz))) {
if (!call_reducer(lkbuf, lksiz, values)) {
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed");
err = true;
}
values.clear();
}
delete[] lkbuf;
lkbuf = line.kbuf;
lksiz = line.ksiz;
values.push_back(std::string(line.vbuf, line.vsiz));
line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
if (line.kbuf) {
lines.push(line);
} else {
delete line.cur;
}
}
if (lkbuf) {
if (!err && !call_reducer(lkbuf, lksiz, values)) {
db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed");
err = true;
}
delete[] lkbuf;
}
while (!lines.empty()) {
MergeLine line = lines.top();
lines.pop();
delete[] line.kbuf;
delete line.cur;
}
if (redtasks_) redtasks_->finish();
double etime = time();
if (!logf("reduce", "the reduce process finished: time=%.6f", etime - stime)) err = true;
return !err;
}
/**
* Call the reducer.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param values a vector of the values.
* @return true on success, or false on failure.
*/
bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
if (redtasks_) {
if (redaborted_) return false;
ReduceTaskQueue::ReduceTask* task =
new ReduceTaskQueue::ReduceTask(this, kbuf, ksiz, values);
redtasks_->add_task(task);
return true;
}
bool err = false;
ValueIterator iter(values.begin(), values.end());
if (!reduce(kbuf, ksiz, &iter)) err = true;
return !err;
}
/** Dummy constructor to forbid the use. */
MapReduce(const MapReduce&);
/** Dummy Operator to forbid the use. */
MapReduce& operator =(const MapReduce&);
/** The internal database. */
BasicDB* db_;
/** The record comparator. */
Comparator* rcomp_;
/** The temporary databases. */
BasicDB** tmpdbs_;
/** The number of temporary databases. */
size_t dbnum_;
/** The logical clock for temporary databases. */
int64_t dbclock_;
/** The number of the mapper threads. */
size_t mapthnum_;
/** The number of the reducer threads. */
size_t redthnum_;
/** The number of the flusher threads. */
size_t flsthnum_;
/** The cache for emitter. */
TinyHashMap* cache_;
/** The current size of the cache for emitter. */
AtomicInt64 csiz_;
/** The limit size of the cache for emitter. */
int64_t clim_;
/** The bucket number of the cache for emitter. */
int64_t cbnum_;
/** The flush threads. */
std::deque* flsths_;
/** The task queue for parallel reducer. */
TaskQueue* redtasks_;
/** The flag whether aborted. */
bool redaborted_;
/** The whole lock. */
SlottedMutex* rlocks_;
};
/**
* Index database.
* @note This class is designed to implement an indexing storage with an efficient appending
* operation for the existing record values. This class is a wrapper of the polymorphic
* database, featuring buffering mechanism to alleviate IO overhead in the database layer. This
* class can be inherited but overwriting methods is forbidden. Before every database operation,
* it is necessary to call the IndexDB::open method in order to open a database file and connect
* the database object to it. To avoid data missing or corruption, it is important to close
* every database file by the IndexDB::close method when the database is no longer in use. It
* is forbidden for multible database objects in a process to open the same database at the same
* time. It is forbidden to share a database object with child processes.
*/
class IndexDB {
private:
/** The default number of temporary databases. */
static const size_t DEFDBNUM = 8;
/** The maxinum number of temporary databases. */
static const size_t MAXDBNUM = 256;
/** The default cache limit size. */
static const int64_t DEFCLIM = 256LL << 20;
/** The default cache bucket number. */
static const int64_t DEFCBNUM = 1048583LL;
/** The bucket number of temprary databases. */
static const int64_t DBBNUM = 512LL << 10;
/** The page size of temprary databases. */
static const int32_t DBPSIZ = 32768;
/** The mapped size of temprary databases. */
static const int64_t DBMSIZ = 516LL * 4096;
/** The page cache capacity of temprary databases. */
static const int64_t DBPCCAP = 16LL << 20;
public:
/**
* Default constructor.
*/
explicit IndexDB() :
db_(), omode_(0),
rcomp_(NULL), tmppath_(""), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0),
cache_(NULL), csiz_(0), clim_(0) {
_assert_(true);
}
/**
* Destructor.
* @note If the database is not closed, it is closed implicitly.
*/
virtual ~IndexDB() {
_assert_(true);
if (omode_ != 0) close();
}
/**
* Get the last happened error.
* @return the last happened error.
*/
BasicDB::Error error() const {
_assert_(true);
return db_.error();
}
/**
* Set the error information.
* @param file the file name of the program source code.
* @param line the line number of the program source code.
* @param func the function name of the program source code.
* @param code an error code.
* @param message a supplement message.
*/
void set_error(const char* file, int32_t line, const char* func,
BasicDB::Error::Code code, const char* message) {
_assert_(file && line > 0 && func && message);
db_.set_error(file, line, func, code, message);
}
/**
* Set the error information without source code information.
* @param code an error code.
* @param message a supplement message.
*/
void set_error(BasicDB::Error::Code code, const char* message) {
_assert_(message);
db_.set_error(_KCCODELINE_, code, message);
}
/**
* Open a database file.
* @param path the path of a database file. The same as with PolyDB. In addition, the
* following tuning parameters are supported. "idxclim" specifies the limit size of the
* internal cache. "idxcbnum" the bucket number of the internal cache. "idxdbnum" specifies
* the number of internal databases. "idxtmppath' specifies the path of the temporary
* directory.
* @param mode the connection mode. The same as with PolyDB.
* @return true on success, or false on failure.
* @note Every opened database must be closed by the IndexDB::close method when it is no longer
* in use. It is not allowed for two or more database objects in the same process to keep
* their connections to the same database file at the same time.
*/
bool open(const std::string& path = ":",
uint32_t mode = BasicDB::OWRITER | BasicDB::OCREATE) {
_assert_(true);
// ScopedRWLock lock(&mlock_, true);
if (omode_ != 0) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "already opened");
return false;
}
std::vector elems;
strsplit(path, '#', &elems);
int64_t clim = 0;
int64_t cbnum = 0;
size_t dbnum = 0;
std::string tmppath = "";
std::vector::iterator it = elems.begin();
std::vector::iterator itend = elems.end();
if (it != itend) ++it;
while (it != itend) {
std::vector fields;
if (strsplit(*it, '=', &fields) > 1) {
const char* key = fields[0].c_str();
const char* value = fields[1].c_str();
if (!std::strcmp(key, "idxclim") || !std::strcmp(key, "idxcachelimit")) {
clim = atoix(value);
} else if (!std::strcmp(key, "idxcbnum") || !std::strcmp(key, "idxcachebuckets")) {
cbnum = atoix(value);
} else if (!std::strcmp(key, "idxdbnum")) {
dbnum = atoix(value);
} else if (!std::strcmp(key, "idxtmppath")) {
tmppath = value;
}
}
++it;
}
if (!db_.open(path, mode)) return false;
tmppath_ = tmppath;
rcomp_ = LEXICALCOMP;
BasicDB* idb = &db_;
if (typeid(db_) == typeid(PolyDB)) {
PolyDB* pdb = (PolyDB*)idb;
idb = pdb->reveal_inner_db();
}
const std::type_info& info = typeid(*idb);
if (info == typeid(GrassDB)) {
GrassDB* gdb = (GrassDB*)idb;
rcomp_ = gdb->rcomp();
} else if (info == typeid(TreeDB)) {
TreeDB* tdb = (TreeDB*)idb;
rcomp_ = tdb->rcomp();
} else if (info == typeid(ForestDB)) {
ForestDB* fdb = (ForestDB*)idb;
rcomp_ = fdb->rcomp();
}
dbnum_ = dbnum < MAXDBNUM ? dbnum : MAXDBNUM;
dbclock_ = 0;
if ((mode & BasicDB::OWRITER) && dbnum > 0) {
tmpdbs_ = new BasicDB*[dbnum_];
if (tmppath_.empty()) {
report(_KCCODELINE_, "started to open temporary databases on memory");
double stime = time();
for (size_t i = 0; i < dbnum_; i++) {
GrassDB* gdb = new GrassDB;
gdb->tune_options(GrassDB::TCOMPRESS);
gdb->tune_buckets(DBBNUM / 2);
gdb->tune_page(DBPSIZ);
gdb->tune_page_cache(DBPCCAP);
gdb->tune_comparator(rcomp_);
gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUNCATE);
tmpdbs_[i] = gdb;
}
double etime = time();
report(_KCCODELINE_, "opening temporary databases finished: time=%.6f", etime - stime);
} else {
File::Status sbuf;
if (!File::status(tmppath_, &sbuf) || !sbuf.isdir) {
set_error(_KCCODELINE_, BasicDB::Error::NOREPOS, "no such directory");
delete[] tmpdbs_;
db_.close();
return false;
}
report(_KCCODELINE_, "started to open temporary databases under %s", tmppath.c_str());
double stime = time();
uint32_t pid = getpid() & UINT16MAX;
uint32_t tid = Thread::hash() & UINT16MAX;
uint32_t ts = time() * 1000;
bool err = false;
for (size_t i = 0; i < dbnum_; i++) {
std::string childpath =
strprintf("%s%cidx-%04x-%04x-%08x-%03d%ckct",
tmppath_.c_str(), File::PATHCHR, pid, tid, ts,
(int)(i + 1), File::EXTCHR);
TreeDB* tdb = new TreeDB;
tdb->tune_options(TreeDB::TSMALL | TreeDB::TLINEAR);
tdb->tune_buckets(DBBNUM);
tdb->tune_page(DBPSIZ);
tdb->tune_map(DBMSIZ);
tdb->tune_page_cache(DBPCCAP);
tdb->tune_comparator(rcomp_);
if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeDB::OTRUNCATE)) {
const BasicDB::Error& e = tdb->error();
set_error(_KCCODELINE_, e.code(), e.message());
err = true;
}
tmpdbs_[i] = tdb;
}
double etime = time();
report(_KCCODELINE_, "opening temporary databases finished: time=%.6f", etime - stime);
if (err) {
for (size_t i = 0; i < dbnum_; i++) {
delete tmpdbs_[i];
}
delete[] tmpdbs_;
db_.close();
return false;
}
}
} else {
tmpdbs_ = NULL;
}
if (mode & BasicDB::OWRITER) {
cache_ = new TinyHashMap(cbnum > 0 ? cbnum : DEFCBNUM);
} else {
cache_ = NULL;
}
clim_ = clim > 0 ? clim : DEFCLIM;
csiz_ = 0;
omode_ = mode;
return true;
}
/**
* Close the database file.
* @return true on success, or false on failure.
*/
bool close() {
_assert_(true);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
return false;
}
bool err = false;
if (cache_) {
if (!flush_cache()) err = true;
delete cache_;
if (tmpdbs_) {
if (!merge_tmpdbs()) err = true;
report(_KCCODELINE_, "closing the temporary databases");
double stime = time();
for (size_t i = 0; i < dbnum_; i++) {
BasicDB* tmpdb = tmpdbs_[i];
const std::string& path = tmpdb->path();
if (!tmpdb->close()) {
const BasicDB::Error& e = tmpdb->error();
set_error(_KCCODELINE_, e.code(), e.message());
err = true;
}
if (!tmppath_.empty()) File::remove(path);
delete tmpdb;
}
double etime = time();
report(_KCCODELINE_, "closing the temporary databases finished: %.6f", etime - stime);
delete[] tmpdbs_;
}
}
if (!db_.close()) err = true;
omode_ = 0;
return !err;
}
/**
* Set the value of a record.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param vbuf the pointer to the value region.
* @param vsiz the size of the value region.
* @return true on success, or false on failure.
* @note If no record corresponds to the key, a new record is created. If the corresponding
* record exists, the value is overwritten.
*/
bool set(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
return false;
}
if (!cache_) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
return false;
}
bool err = false;
if (!clean_dbs(kbuf, ksiz)) err = true;
cache_->set(kbuf, ksiz, vbuf, vsiz);
csiz_ += ksiz + vsiz;
if (csiz_ > clim_ && !flush_cache()) err = false;
return !err;
}
/**
* Set the value of a record.
* @note Equal to the original DB::set method except that the parameters are std::string.
*/
bool set(const std::string& key, const std::string& value) {
_assert_(true);
return set(key.c_str(), key.size(), value.c_str(), value.size());
}
/**
* Add a record.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param vbuf the pointer to the value region.
* @param vsiz the size of the value region.
* @return true on success, or false on failure.
* @note If no record corresponds to the key, a new record is created. If the corresponding
* record exists, the record is not modified and false is returned.
*/
bool add(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
return false;
}
if (!cache_) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
return false;
}
if (check_impl(kbuf, ksiz)) {
set_error(_KCCODELINE_, BasicDB::Error::DUPREC, "record duplication");
return false;
}
bool err = false;
cache_->set(kbuf, ksiz, vbuf, vsiz);
csiz_ += ksiz + vsiz;
if (csiz_ > clim_ && !flush_cache()) err = false;
return !err;
}
/**
* Set the value of a record.
* @note Equal to the original DB::add method except that the parameters are std::string.
*/
bool add(const std::string& key, const std::string& value) {
_assert_(true);
return add(key.c_str(), key.size(), value.c_str(), value.size());
}
/**
* Replace the value of a record.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param vbuf the pointer to the value region.
* @param vsiz the size of the value region.
* @return true on success, or false on failure.
* @note If no record corresponds to the key, no new record is created and false is returned.
* If the corresponding record exists, the value is modified.
*/
bool replace(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
return false;
}
if (!cache_) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
return false;
}
if (!check_impl(kbuf, ksiz)) {
set_error(_KCCODELINE_, BasicDB::Error::NOREC, "no record");
return false;
}
bool err = false;
if (!clean_dbs(kbuf, ksiz)) err = true;
cache_->set(kbuf, ksiz, vbuf, vsiz);
csiz_ += ksiz + vsiz;
if (csiz_ > clim_ && !flush_cache()) err = false;
return !err;
}
/**
* Replace the value of a record.
* @note Equal to the original DB::replace method except that the parameters are std::string.
*/
bool replace(const std::string& key, const std::string& value) {
_assert_(true);
return replace(key.c_str(), key.size(), value.c_str(), value.size());
}
/**
* Append the value of a record.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param vbuf the pointer to the value region.
* @param vsiz the size of the value region.
* @return true on success, or false on failure.
* @note If no record corresponds to the key, a new record is created. If the corresponding
* record exists, the given value is appended at the end of the existing value.
*/
bool append(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
return false;
}
if (!cache_) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
return false;
}
bool err = false;
cache_->append(kbuf, ksiz, vbuf, vsiz);
csiz_ += ksiz + vsiz;
if (csiz_ > clim_ && !flush_cache()) err = false;
return !err;
}
/**
* Set the value of a record.
* @note Equal to the original DB::append method except that the parameters are std::string.
*/
bool append(const std::string& key, const std::string& value) {
_assert_(true);
return append(key.c_str(), key.size(), value.c_str(), value.size());
}
/**
* Remove a record.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @return true on success, or false on failure.
* @note If no record corresponds to the key, false is returned.
*/
bool remove(const char* kbuf, size_t ksiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
return false;
}
if (!cache_) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
return false;
}
bool err = false;
if (!clean_dbs(kbuf, ksiz)) err = true;
cache_->remove(kbuf, ksiz);
return !err;
}
/**
* Remove a record.
* @note Equal to the original DB::remove method except that the parameter is std::string.
*/
bool remove(const std::string& key) {
_assert_(true);
return remove(key.c_str(), key.size());
}
/**
* Retrieve the value of a record.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param sp the pointer to the variable into which the size of the region of the return
* value is assigned.
* @return the pointer to the value region of the corresponding record, or NULL on failure.
* @note If no record corresponds to the key, NULL is returned. Because an additional zero
* code is appended at the end of the region of the return value, the return value can be
* treated as a C-style string. Because the region of the return value is allocated with the
* the new[] operator, it should be released with the delete[] operator when it is no longer
* in use.
*/
char* get(const char* kbuf, size_t ksiz, size_t* sp) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && sp);
// ScopedRWLock lock(&mlock_, false);
if (omode_ == 0) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
*sp = 0;
return false;
}
if (!cache_) return db_.get(kbuf, ksiz, sp);
size_t dvsiz = 0;
char* dvbuf = db_.get(kbuf, ksiz, &dvsiz);
size_t cvsiz = 0;
const char* cvbuf = cache_->get(kbuf, ksiz, &cvsiz);
struct Record {
char* buf;
size_t size;
};
Record* recs = NULL;
bool hit = false;
size_t rsiz = 0;
if (tmpdbs_) {
recs = new Record[dbnum_];
for (size_t i = 0; i < dbnum_; i++) {
BasicDB* tmpdb = tmpdbs_[i];
Record* rp = recs + i;
rp->buf = tmpdb->get(kbuf, ksiz, &rp->size);
if (rp->buf) {
rsiz += rp->size;
hit = true;
}
}
}
if (!hit) {
delete[] recs;
if (!dvbuf && !cvbuf) {
*sp = 0;
return NULL;
}
if (!dvbuf) {
dvbuf = new char[cvsiz+1];
std::memcpy(dvbuf, cvbuf, cvsiz);
*sp = cvsiz;
return dvbuf;
}
if (!cvbuf) {
*sp = dvsiz;
return dvbuf;
}
char* rbuf = new char[dvsiz+cvsiz+1];
std::memcpy(rbuf, dvbuf, dvsiz);
std::memcpy(rbuf + dvsiz, cvbuf, cvsiz);
delete[] dvbuf;
*sp = dvsiz + cvsiz;
return rbuf;
}
if (dvbuf) rsiz += dvsiz;
if (cvbuf) rsiz += cvsiz;
char* rbuf = new char[rsiz+1];
char* wp = rbuf;
if (dvbuf) {
std::memcpy(wp, dvbuf, dvsiz);
wp += dvsiz;
delete[] dvbuf;
}
if (cvbuf) {
std::memcpy(wp, cvbuf, cvsiz);
wp += cvsiz;
}
if (recs) {
for (size_t i = 0; i < dbnum_; i++) {
Record* rp = recs + i;
if (rp->buf) {
std::memcpy(wp, rp->buf, rp->size);
wp += rp->size;
delete[] rp->buf;
}
}
delete[] recs;
}
*sp = rsiz;
return rbuf;
}
/**
* Retrieve the value of a record.
* @note Equal to the original DB::get method except that the first parameters is the key
* string and the second parameter is a string to contain the result and the return value is
* bool for success.
*/
bool get(const std::string& key, std::string* value) {
_assert_(value);
size_t vsiz;
char* vbuf = get(key.c_str(), key.size(), &vsiz);
if (!vbuf) return false;
value->clear();
value->append(vbuf, vsiz);
delete[] vbuf;
return true;
}
/**
* Synchronize updated contents with the file and the device.
* @param hard true for physical synchronization with the device, or false for logical
* synchronization with the file system.
* @param proc a postprocessor object. If it is NULL, no postprocessing is performed.
* @return true on success, or false on failure.
* @note The operation of the postprocessor is performed atomically and other threads accessing
* the same record are blocked. To avoid deadlock, any explicit database operation must not
* be performed in this function.
*/
bool synchronize(bool hard = false, BasicDB::FileProcessor* proc = NULL) {
_assert_(true);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
return false;
}
if (!cache_) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
return false;
}
bool err = false;
if (!flush_cache()) err = true;
if (tmpdbs_ && !merge_tmpdbs()) err = true;
if (!db_.synchronize(hard, proc)) err = true;
return !err;
}
/**
* Remove all records.
* @return true on success, or false on failure.
*/
bool clear() {
_assert_(true);
// ScopedRWLock lock(&mlock_, true);
if (omode_ == 0) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
return false;
}
if (!cache_) {
set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
return false;
}
cache_->clear();
csiz_ = 0;
return db_.clear();
}
/**
* Get the number of records.
* @return the number of records, or -1 on failure.
*/
int64_t count() {
_assert_(true);
// ScopedRWLock lock(&mlock_, false);
return count_impl();
}
/**
* Get the size of the database file.
* @return the size of the database file in bytes, or -1 on failure.
*/
int64_t size() {
_assert_(true);
// ScopedRWLock lock(&mlock_, false);
return size_impl();
}
/**
* Get the path of the database file.
* @return the path of the database file, or an empty string on failure.
*/
std::string path() {
_assert_(true);
return db_.path();
}
/**
* Get the miscellaneous status information.
* @param strmap a string map to contain the result.
* @return true on success, or false on failure.
*/
bool status(std::map* strmap) {
_assert_(strmap);
return db_.status(strmap);
}
/**
* Reveal the inner database object.
* @return the inner database object, or NULL on failure.
*/
PolyDB* reveal_inner_db() {
_assert_(true);
return &db_;
}
/**
* Create a cursor object.
* @return the return value is the created cursor object.
* @note Because the object of the return value is allocated by the constructor, it should be
* released with the delete operator when it is no longer in use.
*/
BasicDB::Cursor* cursor() {
_assert_(true);
return db_.cursor();
}
/**
* Write a log message.
* @param file the file name of the program source code.
* @param line the line number of the program source code.
* @param func the function name of the program source code.
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
* @param message the supplement message.
*/
void log(const char* file, int32_t line, const char* func, BasicDB::Logger::Kind kind,
const char* message) {
_assert_(file && line > 0 && func && message);
db_.log(file, line, func, kind, message);
}
/**
* Set the internal logger.
* @param logger the logger object.
* @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging,
* Logger::INFO for normal information, Logger::WARN for warning, and Logger::ERROR for fatal
* error.
* @return true on success, or false on failure.
*/
bool tune_logger(BasicDB::Logger* logger,
uint32_t kinds = BasicDB::Logger::WARN | BasicDB::Logger::ERROR) {
_assert_(logger);
return db_.tune_logger(logger, kinds);
}
/**
* Set the internal meta operation trigger.
* @param trigger the trigger object.
* @return true on success, or false on failure.
*/
bool tune_meta_trigger(BasicDB::MetaTrigger* trigger) {
_assert_(trigger);
return db_.tune_meta_trigger(trigger);
}
protected:
/**
* Report a message for debugging.
* @param file the file name of the program source code.
* @param line the line number of the program source code.
* @param func the function name of the program source code.
* @param format the printf-like format string.
* @param ... used according to the format string.
*/
void report(const char* file, int32_t line, const char* func, const char* format, ...) {
_assert_(file && line > 0 && func && format);
std::string message;
va_list ap;
va_start(ap, format);
vstrprintf(&message, format, ap);
va_end(ap);
db_.log(file, line, func, BasicDB::Logger::INFO, message.c_str());
}
private:
/**
* Flush all cache records.
* @return true on success, or false on failure.
*/
bool flush_cache() {
_assert_(true);
bool err = false;
double stime = time();
report(_KCCODELINE_, "flushing the cache");
if (tmpdbs_) {
BasicDB* tmpdb = tmpdbs_[dbclock_];
TinyHashMap::Sorter sorter(cache_);
const char* kbuf, *vbuf;
size_t ksiz, vsiz;
while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) {
if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) {
const BasicDB::Error& e = tmpdb->error();
db_.set_error(_KCCODELINE_, e.code(), e.message());
err = true;
}
sorter.step();
}
dbclock_ = (dbclock_ + 1) % dbnum_;
} else {
TinyHashMap::Sorter sorter(cache_);
const char* kbuf, *vbuf;
size_t ksiz, vsiz;
while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) {
if (!db_.append(kbuf, ksiz, vbuf, vsiz)) err = true;
sorter.step();
}
}
cache_->clear();
csiz_ = 0;
double etime = time();
report(_KCCODELINE_, "flushing the cache finished: time=%.6f", etime - stime);
return !err;
}
/**
* Merge temporary databases.
* @return true on success, or false on failure.
*/
bool merge_tmpdbs() {
_assert_(true);
bool err = false;
report(_KCCODELINE_, "merging the temporary databases");
double stime = time();
if (!db_.merge(tmpdbs_, dbnum_, PolyDB::MAPPEND)) err = true;
dbclock_ = 0;
for (size_t i = 0; i < dbnum_; i++) {
BasicDB* tmpdb = tmpdbs_[i];
if (!tmpdb->clear()) {
const BasicDB::Error& e = tmpdb->error();
set_error(_KCCODELINE_, e.code(), e.message());
err = true;
}
}
double etime = time();
report(_KCCODELINE_, "merging the temporary databases finished: %.6f", etime - stime);
return !err;
}
/**
* Remove a record from databases.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @return true on success, or false on failure.
*/
bool clean_dbs(const char* kbuf, size_t ksiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
if (db_.remove(kbuf, ksiz)) return true;
bool err = false;
if (db_.error() != BasicDB::Error::NOREC) err = true;
if (tmpdbs_) {
for (size_t i = 0; i < dbnum_; i++) {
BasicDB* tmpdb = tmpdbs_[i];
if (!tmpdb->remove(kbuf, ksiz)) {
const BasicDB::Error& e = tmpdb->error();
if (e != BasicDB::Error::NOREC) {
set_error(_KCCODELINE_, e.code(), e.message());
err = true;
}
}
}
}
return !err;
}
/**
* Check whether a record exists.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @return true if the record exists, or false if not.
*/
bool check_impl(const char* kbuf, size_t ksiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
char vbuf;
if (db_.get(kbuf, ksiz, &vbuf, 1) >= 0) return true;
if (cache_) {
size_t vsiz;
if (cache_->get(kbuf, ksiz, &vsiz)) return true;
if (tmpdbs_) {
for (size_t i = 0; i < dbnum_; i++) {
BasicDB* tmpdb = tmpdbs_[i];
if (tmpdb->get(kbuf, ksiz, &vbuf, 1)) return true;
}
}
}
return false;
}
/**
* Get the number of records.
* @return the number of records, or -1 on failure.
*/
int64_t count_impl() {
_assert_(true);
int64_t dbcnt = db_.count();
if (dbcnt < 0) return -1;
if (!cache_) return dbcnt;
int64_t ccnt = cache_->count();
return dbcnt > ccnt ? dbcnt : ccnt;
}
/**
* Get the size of the database file.
* @return the size of the database file in bytes.
*/
int64_t size_impl() {
_assert_(true);
int64_t dbsiz = db_.size();
if (dbsiz < 0) return -1;
return dbsiz + csiz_;
}
/** Dummy constructor to forbid the use. */
IndexDB(const IndexDB&);
/** Dummy Operator to forbid the use. */
IndexDB& operator =(const IndexDB&);
/** The internal database. */
PolyDB db_;
/** The open mode. */
uint32_t omode_;
/** The record comparator. */
Comparator* rcomp_;
/** The base path of temporary databases. */
std::string tmppath_;
/** The temporary databases. */
BasicDB** tmpdbs_;
/** The number of temporary databases. */
size_t dbnum_;
/** The logical clock for temporary databases. */
int64_t dbclock_;
/** The internal cache. */
TinyHashMap* cache_;
/** The current size of the internal cache. */
int64_t csiz_;
/** The limit size of the internal cache. */
int64_t clim_;
};
} // common namespace
#endif // duplication check
// END OF FILE