summaryrefslogtreecommitdiff
path: root/plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h')
-rw-r--r--plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h1690
1 files changed, 1690 insertions, 0 deletions
diff --git a/plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h b/plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h
new file mode 100644
index 0000000000..001c09a1d4
--- /dev/null
+++ b/plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h
@@ -0,0 +1,1690 @@
+/*************************************************************************************************
+ * Database extension
+ * Copyright (C) 2009-2012 FAL Labs
+ * This file is part of Kyoto Cabinet.
+ * This program is free software: you can redistribute it and/or modify it under the terms of
+ * the GNU General Public License as published by the Free Software Foundation, either version
+ * 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ * You should have received a copy of the GNU General Public License along with this program.
+ * If not, see <http://www.gnu.org/licenses/>.
+ *************************************************************************************************/
+
+
+#ifndef _KCDBEXT_H // duplication check
+#define _KCDBEXT_H
+
+#include <kccommon.h>
+#include <kcutil.h>
+#include <kcthread.h>
+#include <kcfile.h>
+#include <kccompress.h>
+#include <kccompare.h>
+#include <kcmap.h>
+#include <kcregex.h>
+#include <kcdb.h>
+#include <kcplantdb.h>
+#include <kcprotodb.h>
+#include <kcstashdb.h>
+#include <kccachedb.h>
+#include <kchashdb.h>
+#include <kcdirdb.h>
+#include <kcpolydb.h>
+
+namespace kyotocabinet { // common namespace
+
+
+/**
+ * MapReduce framework.
+ * @note Although this framework is not distributed or concurrent, it is useful for aggregate
+ * calculation with less CPU loading and less memory usage.
+ */
+class MapReduce {
+ public:
+ class ValueIterator;
+ private:
+ class FlushThread;
+ class ReduceTaskQueue;
+ class MapVisitor;
+ struct MergeLine;
+ /** An alias of vector of loaded values. */
+ typedef std::vector<std::string> Values;
+ /** The default number of temporary databases. */
+ static const size_t DEFDBNUM = 8;
+ /** The maxinum number of temporary databases. */
+ static const size_t MAXDBNUM = 256;
+ /** The default cache limit. */
+ static const int64_t DEFCLIM = 512LL << 20;
+ /** The default cache bucket numer. */
+ static const int64_t DEFCBNUM = 1048583LL;
+ /** The bucket number of temprary databases. */
+ static const int64_t DBBNUM = 512LL << 10;
+ /** The page size of temprary databases. */
+ static const int32_t DBPSIZ = 32768;
+ /** The mapped size of temprary databases. */
+ static const int64_t DBMSIZ = 516LL * 4096;
+ /** The page cache capacity of temprary databases. */
+ static const int64_t DBPCCAP = 16LL << 20;
+ /** The default number of threads in parallel mode. */
+ static const size_t DEFTHNUM = 8;
+ /** The number of slots of the record lock. */
+ static const int32_t RLOCKSLOT = 256;
+ public:
+ /**
+ * Value iterator for the reducer.
+ */
+ class ValueIterator {
+ friend class MapReduce;
+ public:
+ /**
+ * Get the next value.
+ * @param sp the pointer to the variable into which the size of the region of the return
+ * value is assigned.
+ * @return the pointer to the next value region, or NULL if no value remains.
+ */
+ const char* next(size_t* sp) {
+ _assert_(sp);
+ if (!vptr_) {
+ if (vit_ == vend_) return NULL;
+ vptr_ = vit_->data();
+ vsiz_ = vit_->size();
+ vit_++;
+ }
+ uint64_t vsiz;
+ size_t step = readvarnum(vptr_, vsiz_, &vsiz);
+ vptr_ += step;
+ vsiz_ -= step;
+ const char* vbuf = vptr_;
+ *sp = vsiz;
+ vptr_ += vsiz;
+ vsiz_ -= vsiz;
+ if (vsiz_ < 1) vptr_ = NULL;
+ return vbuf;
+ }
+ private:
+ /**
+ * Default constructor.
+ */
+ explicit ValueIterator(Values::const_iterator vit, Values::const_iterator vend) :
+ vit_(vit), vend_(vend), vptr_(NULL), vsiz_(0) {
+ _assert_(true);
+ }
+ /**
+ * Destructor.
+ */
+ ~ValueIterator() {
+ _assert_(true);
+ }
+ /** Dummy constructor to forbid the use. */
+ ValueIterator(const ValueIterator&);
+ /** Dummy Operator to forbid the use. */
+ ValueIterator& operator =(const ValueIterator&);
+ /** The current iterator of loaded values. */
+ Values::const_iterator vit_;
+ /** The ending iterator of loaded values. */
+ Values::const_iterator vend_;
+ /** The pointer of the current value. */
+ const char* vptr_;
+ /** The size of the current value. */
+ size_t vsiz_;
+ };
+ /**
+ * Execution options.
+ */
+ enum Option {
+ XNOLOCK = 1 << 0, ///< avoid locking against update operations
+ XPARAMAP = 1 << 1, ///< run mappers in parallel
+ XPARARED = 1 << 2, ///< run reducers in parallel
+ XPARAFLS = 1 << 3, ///< run cache flushers in parallel
+ XNOCOMP = 1 << 8 ///< avoid compression of temporary databases
+ };
+ /**
+ * Default constructor.
+ */
+ explicit MapReduce() :
+ db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0),
+ mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), flsthnum_(DEFTHNUM),
+ cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), flsths_(NULL),
+ redtasks_(NULL), redaborted_(false), rlocks_(NULL) {
+ _assert_(true);
+ }
+ /**
+ * Destructor.
+ */
+ virtual ~MapReduce() {
+ _assert_(true);
+ }
+ /**
+ * Map a record data.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @param vbuf the pointer to the value region.
+ * @param vsiz the size of the value region.
+ * @return true on success, or false on failure.
+ * @note This function can call the MapReduce::emit method to emit a record. To avoid
+ * deadlock, any explicit database operation must not be performed in this function.
+ */
+ virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) = 0;
+ /**
+ * Reduce a record data.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @param iter the iterator to get the values.
+ * @return true on success, or false on failure.
+ * @note To avoid deadlock, any explicit database operation must not be performed in this
+ * function.
+ */
+ virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0;
+ /**
+ * Preprocess the map operations.
+ * @return true on success, or false on failure.
+ * @note This function can call the MapReduce::emit method to emit a record. To avoid
+ * deadlock, any explicit database operation must not be performed in this function.
+ */
+ virtual bool preprocess() {
+ _assert_(true);
+ return true;
+ }
+ /**
+ * Mediate between the map and the reduce phases.
+ * @return true on success, or false on failure.
+ * @note This function can call the MapReduce::emit method to emit a record. To avoid
+ * deadlock, any explicit database operation must not be performed in this function.
+ */
+ virtual bool midprocess() {
+ _assert_(true);
+ return true;
+ }
+ /**
+ * Postprocess the reduce operations.
+ * @return true on success, or false on failure.
+ * @note To avoid deadlock, any explicit database operation must not be performed in this
+ * function.
+ */
+ virtual bool postprocess() {
+ _assert_(true);
+ return true;
+ }
+ /**
+ * Process a log message.
+ * @param name the name of the event.
+ * @param message a supplement message.
+ * @return true on success, or false on failure.
+ */
+ virtual bool log(const char* name, const char* message) {
+ _assert_(name && message);
+ return true;
+ }
+ /**
+ * Execute the MapReduce process about a database.
+ * @param db the source database.
+ * @param tmppath the path of a directory for the temporary data storage. If it is an empty
+ * string, temporary data are handled on memory.
+ * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking
+ * against update operations by other threads, MapReduce::XPARAMAP to run the mapper in
+ * parallel, MapReduce::XPARARED to run the reducer in parallel, MapReduce::XNOCOMP to avoid
+ * compression of temporary databases.
+ * @return true on success, or false on failure.
+ */
+ bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) {
+ int64_t count = db->count();
+ if (count < 0) {
+ if (db->error() != BasicDB::Error::NOIMPL) return false;
+ count = 0;
+ }
+ bool err = false;
+ double stime, etime;
+ db_ = db;
+ rcomp_ = LEXICALCOMP;
+ BasicDB* idb = db;
+ if (typeid(*db) == typeid(PolyDB)) {
+ PolyDB* pdb = (PolyDB*)idb;
+ idb = pdb->reveal_inner_db();
+ }
+ const std::type_info& info = typeid(*idb);
+ if (info == typeid(GrassDB)) {
+ GrassDB* gdb = (GrassDB*)idb;
+ rcomp_ = gdb->rcomp();
+ } else if (info == typeid(TreeDB)) {
+ TreeDB* tdb = (TreeDB*)idb;
+ rcomp_ = tdb->rcomp();
+ } else if (info == typeid(ForestDB)) {
+ ForestDB* fdb = (ForestDB*)idb;
+ rcomp_ = fdb->rcomp();
+ }
+ tmpdbs_ = new BasicDB*[dbnum_];
+ if (tmppath.empty()) {
+ if (!logf("prepare", "started to open temporary databases on memory")) err = true;
+ stime = time();
+ for (size_t i = 0; i < dbnum_; i++) {
+ GrassDB* gdb = new GrassDB;
+ int32_t myopts = 0;
+ if (!(opts & XNOCOMP)) myopts |= GrassDB::TCOMPRESS;
+ gdb->tune_options(myopts);
+ gdb->tune_buckets(DBBNUM / 2);
+ gdb->tune_page(DBPSIZ);
+ gdb->tune_page_cache(DBPCCAP);
+ gdb->tune_comparator(rcomp_);
+ gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUNCATE);
+ tmpdbs_[i] = gdb;
+ }
+ etime = time();
+ if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime))
+ err = true;
+ if (err) {
+ delete[] tmpdbs_;
+ return false;
+ }
+ } else {
+ File::Status sbuf;
+ if (!File::status(tmppath, &sbuf) || !sbuf.isdir) {
+ db->set_error(_KCCODELINE_, BasicDB::Error::NOREPOS, "no such directory");
+ delete[] tmpdbs_;
+ return false;
+ }
+ if (!logf("prepare", "started to open temporary databases under %s", tmppath.c_str()))
+ err = true;
+ stime = time();
+ uint32_t pid = getpid() & UINT16MAX;
+ uint32_t tid = Thread::hash() & UINT16MAX;
+ uint32_t ts = time() * 1000;
+ for (size_t i = 0; i < dbnum_; i++) {
+ std::string childpath =
+ strprintf("%s%cmr-%04x-%04x-%08x-%03d%ckct",
+ tmppath.c_str(), File::PATHCHR, pid, tid, ts, (int)(i + 1), File::EXTCHR);
+ TreeDB* tdb = new TreeDB;
+ int32_t myopts = TreeDB::TSMALL | TreeDB::TLINEAR;
+ if (!(opts & XNOCOMP)) myopts |= TreeDB::TCOMPRESS;
+ tdb->tune_options(myopts);
+ tdb->tune_buckets(DBBNUM);
+ tdb->tune_page(DBPSIZ);
+ tdb->tune_map(DBMSIZ);
+ tdb->tune_page_cache(DBPCCAP);
+ tdb->tune_comparator(rcomp_);
+ if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeDB::OTRUNCATE)) {
+ const BasicDB::Error& e = tdb->error();
+ db->set_error(_KCCODELINE_, e.code(), e.message());
+ err = true;
+ }
+ tmpdbs_[i] = tdb;
+ }
+ etime = time();
+ if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime))
+ err = true;
+ if (err) {
+ for (size_t i = 0; i < dbnum_; i++) {
+ delete tmpdbs_[i];
+ }
+ delete[] tmpdbs_;
+ return false;
+ }
+ }
+ if (opts & XPARARED) redtasks_ = new ReduceTaskQueue;
+ if (opts & XPARAFLS) flsths_ = new std::deque<FlushThread*>;
+ if (opts & XNOLOCK) {
+ MapChecker mapchecker;
+ MapVisitor mapvisitor(this, &mapchecker, count);
+ mapvisitor.visit_before();
+ if (!err) {
+ BasicDB::Cursor* cur = db->cursor();
+ if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = true;
+ while (!err) {
+ if (!cur->accept(&mapvisitor, false, true)) {
+ if (cur->error() != BasicDB::Error::NOREC) err = true;
+ break;
+ }
+ }
+ delete cur;
+ }
+ if (mapvisitor.error()) {
+ db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed");
+ err = true;
+ }
+ mapvisitor.visit_after();
+ } else if (opts & XPARAMAP) {
+ MapChecker mapchecker;
+ MapVisitor mapvisitor(this, &mapchecker, count);
+ rlocks_ = new SlottedMutex(RLOCKSLOT);
+ if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker)) {
+ db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed");
+ err = true;
+ }
+ delete rlocks_;
+ rlocks_ = NULL;
+ if (mapvisitor.error()) err = true;
+ } else {
+ MapChecker mapchecker;
+ MapVisitor mapvisitor(this, &mapchecker, count);
+ if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true;
+ if (mapvisitor.error()) {
+ db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed");
+ err = true;
+ }
+ }
+ if (flsths_) {
+ delete flsths_;
+ flsths_ = NULL;
+ }
+ if (redtasks_) {
+ delete redtasks_;
+ redtasks_ = NULL;
+ }
+ if (!logf("clean", "closing the temporary databases")) err = true;
+ stime = time();
+ for (size_t i = 0; i < dbnum_; i++) {
+ const std::string& path = tmpdbs_[i]->path();
+ if (!tmpdbs_[i]->clear()) {
+ const BasicDB::Error& e = tmpdbs_[i]->error();
+ db->set_error(_KCCODELINE_, e.code(), e.message());
+ err = true;
+ }
+ if (!tmpdbs_[i]->close()) {
+ const BasicDB::Error& e = tmpdbs_[i]->error();
+ db->set_error(_KCCODELINE_, e.code(), e.message());
+ err = true;
+ }
+ if (!tmppath.empty()) File::remove(path);
+ delete tmpdbs_[i];
+ }
+ etime = time();
+ if (!logf("clean", "closing the temporary databases finished: time=%.6f",
+ etime - stime)) err = true;
+ delete[] tmpdbs_;
+ return !err;
+ }
+ /**
+ * Set the storage configurations.
+ * @param dbnum the number of temporary databases.
+ * @param clim the limit size of the internal cache.
+ * @param cbnum the bucket number of the internal cache.
+ */
+ void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) {
+ _assert_(true);
+ dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM;
+ if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM;
+ clim_ = clim > 0 ? clim : DEFCLIM;
+ cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM;
+ if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_);
+ }
+ /**
+ * Set the thread configurations.
+ * @param mapthnum the number of threads for the mapper.
+ * @param redthnum the number of threads for the reducer.
+ * @param flsthnum the number of threads for the internal flusher.
+ */
+ void tune_thread(int32_t mapthnum, int32_t redthnum, int32_t flsthnum) {
+ _assert_(true);
+ mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM;
+ redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM;
+ flsthnum_ = flsthnum > 0 ? flsthnum : DEFTHNUM;
+ }
+ protected:
+ /**
+ * Emit a record from the mapper.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @param vbuf the pointer to the value region.
+ * @param vsiz the size of the value region.
+ * @return true on success, or false on failure.
+ */
+ bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
+ _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
+ bool err = false;
+ size_t rsiz = sizevarnum(vsiz) + vsiz;
+ char stack[NUMBUFSIZ*4];
+ char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack;
+ char* wp = rbuf;
+ wp += writevarnum(rbuf, vsiz);
+ std::memcpy(wp, vbuf, vsiz);
+ if (rlocks_) {
+ size_t bidx = TinyHashMap::hash_record(kbuf, ksiz) % cbnum_;
+ size_t lidx = bidx % RLOCKSLOT;
+ rlocks_->lock(lidx);
+ cache_->append(kbuf, ksiz, rbuf, rsiz);
+ rlocks_->unlock(lidx);
+ } else {
+ cache_->append(kbuf, ksiz, rbuf, rsiz);
+ }
+ if (rbuf != stack) delete[] rbuf;
+ csiz_ += sizevarnum(ksiz) + ksiz + rsiz;
+ return !err;
+ }
+ private:
+ /**
+ * Cache flusher.
+ */
+ class FlushThread : public Thread {
+ public:
+ /** constructor */
+ explicit FlushThread(MapReduce* mr, BasicDB* tmpdb,
+ TinyHashMap* cache, size_t csiz, bool cown) :
+ mr_(mr), tmpdb_(tmpdb), cache_(cache), csiz_(csiz), cown_(cown), err_(false) {}
+ /** perform the concrete process */
+ void run() {
+ if (!mr_->logf("map", "started to flushing the cache: count=%lld size=%lld",
+ (long long)cache_->count(), (long long)csiz_)) err_ = true;
+ double stime = time();
+ BasicDB* tmpdb = tmpdb_;
+ TinyHashMap* cache = cache_;
+ bool cown = cown_;
+ TinyHashMap::Sorter sorter(cache);
+ const char* kbuf, *vbuf;
+ size_t ksiz, vsiz;
+ while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) {
+ if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) {
+ const BasicDB::Error& e = tmpdb->error();
+ mr_->db_->set_error(_KCCODELINE_, e.code(), e.message());
+ err_ = true;
+ }
+ sorter.step();
+ if (cown) cache->remove(kbuf, ksiz);
+ }
+ double etime = time();
+ if (!mr_->logf("map", "flushing the cache finished: time=%.6f", etime - stime))
+ err_ = true;
+ if (cown) delete cache;
+ }
+ /** check the error flag. */
+ bool error() {
+ return err_;
+ }
+ private:
+ MapReduce* mr_; ///< driver
+ BasicDB* tmpdb_; ///< temprary database
+ TinyHashMap* cache_; ///< cache for emitter
+ size_t csiz_; ///< current cache size
+ bool cown_; ///< cache ownership flag
+ bool err_; ///< error flag
+ };
+ /**
+ * Task queue for parallel reducer.
+ */
+ class ReduceTaskQueue : public TaskQueue {
+ public:
+ /**
+ * Task for parallel reducer.
+ */
+ class ReduceTask : public Task {
+ friend class ReduceTaskQueue;
+ public:
+ /** constructor */
+ explicit ReduceTask(MapReduce* mr, const char* kbuf, size_t ksiz, const Values& values) :
+ mr_(mr), key_(kbuf, ksiz), values_(values) {}
+ private:
+ MapReduce* mr_; ///< driver
+ std::string key_; ///< key
+ Values values_; ///< values
+ };
+ /** constructor */
+ explicit ReduceTaskQueue() {}
+ private:
+ /** process a task */
+ void do_task(Task* task) {
+ ReduceTask* rtask = (ReduceTask*)task;
+ ValueIterator iter(rtask->values_.begin(), rtask->values_.end());
+ if (!rtask->mr_->reduce(rtask->key_.data(), rtask->key_.size(), &iter))
+ rtask->mr_->redaborted_ = true;
+ delete rtask;
+ }
+ };
+ /**
+ * Checker for the map process.
+ */
+ class MapChecker : public BasicDB::ProgressChecker {
+ public:
+ /** constructor */
+ explicit MapChecker() : stop_(false) {}
+ /** stop the process */
+ void stop() {
+ stop_ = true;
+ }
+ /** check whether stopped */
+ bool stopped() {
+ return stop_;
+ }
+ private:
+ /** check whether stopped */
+ bool check(const char* name, const char* message, int64_t curcnt, int64_t allcnt) {
+ return !stop_;
+ }
+ bool stop_; ///< flag for stop
+ };
+ /**
+ * Visitor for the map process.
+ */
+ class MapVisitor : public BasicDB::Visitor {
+ public:
+ /** constructor */
+ explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale) :
+ mr_(mr), checker_(checker), scale_(scale), stime_(0), err_(false) {}
+ /** get the error flag */
+ bool error() {
+ return err_;
+ }
+ /** preprocess the mappter */
+ void visit_before() {
+ mr_->dbclock_ = 0;
+ mr_->cache_ = new TinyHashMap(mr_->cbnum_);
+ mr_->csiz_ = 0;
+ if (!mr_->preprocess()) err_ = true;
+ if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true;
+ if (!mr_->logf("map", "started the map process: scale=%lld", (long long)scale_))
+ err_ = true;
+ stime_ = time();
+ }
+ /** postprocess the mappter and call the reducer */
+ void visit_after() {
+ if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true;
+ double etime = time();
+ if (!mr_->logf("map", "the map process finished: time=%.6f", etime - stime_))
+ err_ = true;
+ if (!mr_->midprocess()) err_ = true;
+ if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true;
+ delete mr_->cache_;
+ if (mr_->flsths_ && !mr_->flsths_->empty()) {
+ std::deque<FlushThread*>::iterator flthit = mr_->flsths_->begin();
+ std::deque<FlushThread*>::iterator flthitend = mr_->flsths_->end();
+ while (flthit != flthitend) {
+ FlushThread* flth = *flthit;
+ flth->join();
+ if (flth->error()) err_ = true;
+ delete flth;
+ ++flthit;
+ }
+ }
+ if (!err_ && !mr_->execute_reduce()) err_ = true;
+ if (!mr_->postprocess()) err_ = true;
+ }
+ private:
+ /** visit a record */
+ const char* visit_full(const char* kbuf, size_t ksiz,
+ const char* vbuf, size_t vsiz, size_t* sp) {
+ if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) {
+ checker_->stop();
+ err_ = true;
+ }
+ if (mr_->rlocks_) {
+ if (mr_->csiz_ >= mr_->clim_) {
+ mr_->rlocks_->lock_all();
+ if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
+ checker_->stop();
+ err_ = true;
+ }
+ mr_->rlocks_->unlock_all();
+ }
+ } else {
+ if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) {
+ checker_->stop();
+ err_ = true;
+ }
+ }
+ return NOP;
+ }
+ MapReduce* mr_; ///< driver
+ MapChecker* checker_; ///< checker
+ int64_t scale_; ///< number of records
+ double stime_; ///< start time
+ bool err_; ///< error flag
+ };
+ /**
+ * Front line of a merging list.
+ */
+ struct MergeLine {
+ BasicDB::Cursor* cur; ///< cursor
+ Comparator* rcomp; ///< record comparator
+ char* kbuf; ///< pointer to the key
+ size_t ksiz; ///< size of the key
+ const char* vbuf; ///< pointer to the value
+ size_t vsiz; ///< size of the value
+ /** comparing operator */
+ bool operator <(const MergeLine& right) const {
+ return rcomp->compare(kbuf, ksiz, right.kbuf, right.ksiz) > 0;
+ }
+ };
+ /**
+ * Process a log message.
+ * @param name the name of the event.
+ * @param format the printf-like format string.
+ * @param ... used according to the format string.
+ * @return true on success, or false on failure.
+ */
+ bool logf(const char* name, const char* format, ...) {
+ _assert_(name && format);
+ va_list ap;
+ va_start(ap, format);
+ std::string message;
+ vstrprintf(&message, format, ap);
+ va_end(ap);
+ return log(name, message.c_str());
+ }
+ /**
+ * Flush all cache records.
+ * @return true on success, or false on failure.
+ */
+ bool flush_cache() {
+ _assert_(true);
+ bool err = false;
+ BasicDB* tmpdb = tmpdbs_[dbclock_];
+ dbclock_ = (dbclock_ + 1) % dbnum_;
+ if (flsths_) {
+ size_t num = flsths_->size();
+ if (num >= flsthnum_ || num >= dbnum_) {
+ FlushThread* flth = flsths_->front();
+ flsths_->pop_front();
+ flth->join();
+ if (flth->error()) err = true;
+ delete flth;
+ }
+ FlushThread* flth = new FlushThread(this, tmpdb, cache_, csiz_, true);
+ cache_ = new TinyHashMap(cbnum_);
+ csiz_ = 0;
+ flth->start();
+ flsths_->push_back(flth);
+ } else {
+ FlushThread flth(this, tmpdb, cache_, csiz_, false);
+ flth.run();
+ if (flth.error()) err = true;
+ cache_->clear();
+ csiz_ = 0;
+ }
+ return !err;
+ }
+ /**
+ * Execute the reduce part.
+ * @return true on success, or false on failure.
+ */
+ bool execute_reduce() {
+ bool err = false;
+ int64_t scale = 0;
+ for (size_t i = 0; i < dbnum_; i++) {
+ scale += tmpdbs_[i]->count();
+ }
+ if (!logf("reduce", "started the reduce process: scale=%lld", (long long)scale)) err = true;
+ double stime = time();
+ if (redtasks_) redtasks_->start(redthnum_);
+ std::priority_queue<MergeLine> lines;
+ for (size_t i = 0; i < dbnum_; i++) {
+ MergeLine line;
+ line.cur = tmpdbs_[i]->cursor();
+ line.rcomp = rcomp_;
+ line.cur->jump();
+ line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
+ if (line.kbuf) {
+ lines.push(line);
+ } else {
+ delete line.cur;
+ }
+ }
+ char* lkbuf = NULL;
+ size_t lksiz = 0;
+ Values values;
+ while (!err && !lines.empty()) {
+ MergeLine line = lines.top();
+ lines.pop();
+ if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lksiz))) {
+ if (!call_reducer(lkbuf, lksiz, values)) {
+ db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed");
+ err = true;
+ }
+ values.clear();
+ }
+ delete[] lkbuf;
+ lkbuf = line.kbuf;
+ lksiz = line.ksiz;
+ values.push_back(std::string(line.vbuf, line.vsiz));
+ line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true);
+ if (line.kbuf) {
+ lines.push(line);
+ } else {
+ delete line.cur;
+ }
+ }
+ if (lkbuf) {
+ if (!err && !call_reducer(lkbuf, lksiz, values)) {
+ db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed");
+ err = true;
+ }
+ delete[] lkbuf;
+ }
+ while (!lines.empty()) {
+ MergeLine line = lines.top();
+ lines.pop();
+ delete[] line.kbuf;
+ delete line.cur;
+ }
+ if (redtasks_) redtasks_->finish();
+ double etime = time();
+ if (!logf("reduce", "the reduce process finished: time=%.6f", etime - stime)) err = true;
+ return !err;
+ }
+ /**
+ * Call the reducer.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @param values a vector of the values.
+ * @return true on success, or false on failure.
+ */
+ bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) {
+ _assert_(kbuf && ksiz <= MEMMAXSIZ);
+ if (redtasks_) {
+ if (redaborted_) return false;
+ ReduceTaskQueue::ReduceTask* task =
+ new ReduceTaskQueue::ReduceTask(this, kbuf, ksiz, values);
+ redtasks_->add_task(task);
+ return true;
+ }
+ bool err = false;
+ ValueIterator iter(values.begin(), values.end());
+ if (!reduce(kbuf, ksiz, &iter)) err = true;
+ return !err;
+ }
+ /** Dummy constructor to forbid the use. */
+ MapReduce(const MapReduce&);
+ /** Dummy Operator to forbid the use. */
+ MapReduce& operator =(const MapReduce&);
+ /** The internal database. */
+ BasicDB* db_;
+ /** The record comparator. */
+ Comparator* rcomp_;
+ /** The temporary databases. */
+ BasicDB** tmpdbs_;
+ /** The number of temporary databases. */
+ size_t dbnum_;
+ /** The logical clock for temporary databases. */
+ int64_t dbclock_;
+ /** The number of the mapper threads. */
+ size_t mapthnum_;
+ /** The number of the reducer threads. */
+ size_t redthnum_;
+ /** The number of the flusher threads. */
+ size_t flsthnum_;
+ /** The cache for emitter. */
+ TinyHashMap* cache_;
+ /** The current size of the cache for emitter. */
+ AtomicInt64 csiz_;
+ /** The limit size of the cache for emitter. */
+ int64_t clim_;
+ /** The bucket number of the cache for emitter. */
+ int64_t cbnum_;
+ /** The flush threads. */
+ std::deque<FlushThread*>* flsths_;
+ /** The task queue for parallel reducer. */
+ TaskQueue* redtasks_;
+ /** The flag whether aborted. */
+ bool redaborted_;
+ /** The whole lock. */
+ SlottedMutex* rlocks_;
+};
+
+
+/**
+ * Index database.
+ * @note This class is designed to implement an indexing storage with an efficient appending
+ * operation for the existing record values. This class is a wrapper of the polymorphic
+ * database, featuring buffering mechanism to alleviate IO overhead in the database layer. This
+ * class can be inherited but overwriting methods is forbidden. Before every database operation,
+ * it is necessary to call the IndexDB::open method in order to open a database file and connect
+ * the database object to it. To avoid data missing or corruption, it is important to close
+ * every database file by the IndexDB::close method when the database is no longer in use. It
+ * is forbidden for multible database objects in a process to open the same database at the same
+ * time. It is forbidden to share a database object with child processes.
+ */
+class IndexDB {
+ private:
+ /** The default number of temporary databases. */
+ static const size_t DEFDBNUM = 8;
+ /** The maxinum number of temporary databases. */
+ static const size_t MAXDBNUM = 256;
+ /** The default cache limit size. */
+ static const int64_t DEFCLIM = 256LL << 20;
+ /** The default cache bucket number. */
+ static const int64_t DEFCBNUM = 1048583LL;
+ /** The bucket number of temprary databases. */
+ static const int64_t DBBNUM = 512LL << 10;
+ /** The page size of temprary databases. */
+ static const int32_t DBPSIZ = 32768;
+ /** The mapped size of temprary databases. */
+ static const int64_t DBMSIZ = 516LL * 4096;
+ /** The page cache capacity of temprary databases. */
+ static const int64_t DBPCCAP = 16LL << 20;
+ public:
+ /**
+ * Default constructor.
+ */
+ explicit IndexDB() :
+ mlock_(), db_(), omode_(0),
+ rcomp_(NULL), tmppath_(""), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0),
+ cache_(NULL), csiz_(0), clim_(0) {
+ _assert_(true);
+ }
+ /**
+ * Destructor.
+ * @note If the database is not closed, it is closed implicitly.
+ */
+ virtual ~IndexDB() {
+ _assert_(true);
+ if (omode_ != 0) close();
+ }
+ /**
+ * Get the last happened error.
+ * @return the last happened error.
+ */
+ BasicDB::Error error() const {
+ _assert_(true);
+ return db_.error();
+ }
+ /**
+ * Set the error information.
+ * @param file the file name of the program source code.
+ * @param line the line number of the program source code.
+ * @param func the function name of the program source code.
+ * @param code an error code.
+ * @param message a supplement message.
+ */
+ void set_error(const char* file, int32_t line, const char* func,
+ BasicDB::Error::Code code, const char* message) {
+ _assert_(file && line > 0 && func && message);
+ db_.set_error(file, line, func, code, message);
+ }
+ /**
+ * Set the error information without source code information.
+ * @param code an error code.
+ * @param message a supplement message.
+ */
+ void set_error(BasicDB::Error::Code code, const char* message) {
+ _assert_(message);
+ db_.set_error(_KCCODELINE_, code, message);
+ }
+ /**
+ * Open a database file.
+ * @param path the path of a database file. The same as with PolyDB. In addition, the
+ * following tuning parameters are supported. "idxclim" specifies the limit size of the
+ * internal cache. "idxcbnum" the bucket number of the internal cache. "idxdbnum" specifies
+ * the number of internal databases. "idxtmppath' specifies the path of the temporary
+ * directory.
+ * @param mode the connection mode. The same as with PolyDB.
+ * @return true on success, or false on failure.
+ * @note Every opened database must be closed by the IndexDB::close method when it is no longer
+ * in use. It is not allowed for two or more database objects in the same process to keep
+ * their connections to the same database file at the same time.
+ */
+ bool open(const std::string& path = ":",
+ uint32_t mode = BasicDB::OWRITER | BasicDB::OCREATE) {
+ _assert_(true);
+ ScopedRWLock lock(&mlock_, true);
+ if (omode_ != 0) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "already opened");
+ return false;
+ }
+ std::vector<std::string> elems;
+ strsplit(path, '#', &elems);
+ int64_t clim = 0;
+ int64_t cbnum = 0;
+ size_t dbnum = 0;
+ std::string tmppath = "";
+ std::vector<std::string>::iterator it = elems.begin();
+ std::vector<std::string>::iterator itend = elems.end();
+ if (it != itend) ++it;
+ while (it != itend) {
+ std::vector<std::string> fields;
+ if (strsplit(*it, '=', &fields) > 1) {
+ const char* key = fields[0].c_str();
+ const char* value = fields[1].c_str();
+ if (!std::strcmp(key, "idxclim") || !std::strcmp(key, "idxcachelimit")) {
+ clim = atoix(value);
+ } else if (!std::strcmp(key, "idxcbnum") || !std::strcmp(key, "idxcachebuckets")) {
+ cbnum = atoix(value);
+ } else if (!std::strcmp(key, "idxdbnum")) {
+ dbnum = atoix(value);
+ } else if (!std::strcmp(key, "idxtmppath")) {
+ tmppath = value;
+ }
+ }
+ ++it;
+ }
+ if (!db_.open(path, mode)) return false;
+ tmppath_ = tmppath;
+ rcomp_ = LEXICALCOMP;
+ BasicDB* idb = &db_;
+ if (typeid(db_) == typeid(PolyDB)) {
+ PolyDB* pdb = (PolyDB*)idb;
+ idb = pdb->reveal_inner_db();
+ }
+ const std::type_info& info = typeid(*idb);
+ if (info == typeid(GrassDB)) {
+ GrassDB* gdb = (GrassDB*)idb;
+ rcomp_ = gdb->rcomp();
+ } else if (info == typeid(TreeDB)) {
+ TreeDB* tdb = (TreeDB*)idb;
+ rcomp_ = tdb->rcomp();
+ } else if (info == typeid(ForestDB)) {
+ ForestDB* fdb = (ForestDB*)idb;
+ rcomp_ = fdb->rcomp();
+ }
+ dbnum_ = dbnum < MAXDBNUM ? dbnum : MAXDBNUM;
+ dbclock_ = 0;
+ if ((mode & BasicDB::OWRITER) && dbnum > 0) {
+ tmpdbs_ = new BasicDB*[dbnum_];
+ if (tmppath_.empty()) {
+ report(_KCCODELINE_, "started to open temporary databases on memory");
+ double stime = time();
+ for (size_t i = 0; i < dbnum_; i++) {
+ GrassDB* gdb = new GrassDB;
+ gdb->tune_options(GrassDB::TCOMPRESS);
+ gdb->tune_buckets(DBBNUM / 2);
+ gdb->tune_page(DBPSIZ);
+ gdb->tune_page_cache(DBPCCAP);
+ gdb->tune_comparator(rcomp_);
+ gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUNCATE);
+ tmpdbs_[i] = gdb;
+ }
+ double etime = time();
+ report(_KCCODELINE_, "opening temporary databases finished: time=%.6f", etime - stime);
+ } else {
+ File::Status sbuf;
+ if (!File::status(tmppath_, &sbuf) || !sbuf.isdir) {
+ set_error(_KCCODELINE_, BasicDB::Error::NOREPOS, "no such directory");
+ delete[] tmpdbs_;
+ db_.close();
+ return false;
+ }
+ report(_KCCODELINE_, "started to open temporary databases under %s", tmppath.c_str());
+ double stime = time();
+ uint32_t pid = getpid() & UINT16MAX;
+ uint32_t tid = Thread::hash() & UINT16MAX;
+ uint32_t ts = time() * 1000;
+ bool err = false;
+ for (size_t i = 0; i < dbnum_; i++) {
+ std::string childpath =
+ strprintf("%s%cidx-%04x-%04x-%08x-%03d%ckct",
+ tmppath_.c_str(), File::PATHCHR, pid, tid, ts,
+ (int)(i + 1), File::EXTCHR);
+ TreeDB* tdb = new TreeDB;
+ tdb->tune_options(TreeDB::TSMALL | TreeDB::TLINEAR);
+ tdb->tune_buckets(DBBNUM);
+ tdb->tune_page(DBPSIZ);
+ tdb->tune_map(DBMSIZ);
+ tdb->tune_page_cache(DBPCCAP);
+ tdb->tune_comparator(rcomp_);
+ if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeDB::OTRUNCATE)) {
+ const BasicDB::Error& e = tdb->error();
+ set_error(_KCCODELINE_, e.code(), e.message());
+ err = true;
+ }
+ tmpdbs_[i] = tdb;
+ }
+ double etime = time();
+ report(_KCCODELINE_, "opening temporary databases finished: time=%.6f", etime - stime);
+ if (err) {
+ for (size_t i = 0; i < dbnum_; i++) {
+ delete tmpdbs_[i];
+ }
+ delete[] tmpdbs_;
+ db_.close();
+ return false;
+ }
+ }
+ } else {
+ tmpdbs_ = NULL;
+ }
+ if (mode & BasicDB::OWRITER) {
+ cache_ = new TinyHashMap(cbnum > 0 ? cbnum : DEFCBNUM);
+ } else {
+ cache_ = NULL;
+ }
+ clim_ = clim > 0 ? clim : DEFCLIM;
+ csiz_ = 0;
+ omode_ = mode;
+ return true;
+ }
+ /**
+ * Close the database file.
+ * @return true on success, or false on failure.
+ */
+ bool close() {
+ _assert_(true);
+ ScopedRWLock lock(&mlock_, true);
+ if (omode_ == 0) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
+ return false;
+ }
+ bool err = false;
+ if (cache_) {
+ if (!flush_cache()) err = true;
+ delete cache_;
+ if (tmpdbs_) {
+ if (!merge_tmpdbs()) err = true;
+ report(_KCCODELINE_, "closing the temporary databases");
+ double stime = time();
+ for (size_t i = 0; i < dbnum_; i++) {
+ BasicDB* tmpdb = tmpdbs_[i];
+ const std::string& path = tmpdb->path();
+ if (!tmpdb->close()) {
+ const BasicDB::Error& e = tmpdb->error();
+ set_error(_KCCODELINE_, e.code(), e.message());
+ err = true;
+ }
+ if (!tmppath_.empty()) File::remove(path);
+ delete tmpdb;
+ }
+ double etime = time();
+ report(_KCCODELINE_, "closing the temporary databases finished: %.6f", etime - stime);
+ delete[] tmpdbs_;
+ }
+ }
+ if (!db_.close()) err = true;
+ omode_ = 0;
+ return !err;
+ }
+ /**
+ * Set the value of a record.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @param vbuf the pointer to the value region.
+ * @param vsiz the size of the value region.
+ * @return true on success, or false on failure.
+ * @note If no record corresponds to the key, a new record is created. If the corresponding
+ * record exists, the value is overwritten.
+ */
+ bool set(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
+ _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
+ ScopedRWLock lock(&mlock_, true);
+ if (omode_ == 0) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
+ return false;
+ }
+ if (!cache_) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
+ return false;
+ }
+ bool err = false;
+ if (!clean_dbs(kbuf, ksiz)) err = true;
+ cache_->set(kbuf, ksiz, vbuf, vsiz);
+ csiz_ += ksiz + vsiz;
+ if (csiz_ > clim_ && !flush_cache()) err = false;
+ return !err;
+ }
+ /**
+ * Set the value of a record.
+ * @note Equal to the original DB::set method except that the parameters are std::string.
+ */
+ bool set(const std::string& key, const std::string& value) {
+ _assert_(true);
+ return set(key.c_str(), key.size(), value.c_str(), value.size());
+ }
+ /**
+ * Add a record.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @param vbuf the pointer to the value region.
+ * @param vsiz the size of the value region.
+ * @return true on success, or false on failure.
+ * @note If no record corresponds to the key, a new record is created. If the corresponding
+ * record exists, the record is not modified and false is returned.
+ */
+ bool add(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
+ _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
+ ScopedRWLock lock(&mlock_, true);
+ if (omode_ == 0) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
+ return false;
+ }
+ if (!cache_) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
+ return false;
+ }
+ if (check_impl(kbuf, ksiz)) {
+ set_error(_KCCODELINE_, BasicDB::Error::DUPREC, "record duplication");
+ return false;
+ }
+ bool err = false;
+ cache_->set(kbuf, ksiz, vbuf, vsiz);
+ csiz_ += ksiz + vsiz;
+ if (csiz_ > clim_ && !flush_cache()) err = false;
+ return !err;
+ }
+ /**
+ * Set the value of a record.
+ * @note Equal to the original DB::add method except that the parameters are std::string.
+ */
+ bool add(const std::string& key, const std::string& value) {
+ _assert_(true);
+ return add(key.c_str(), key.size(), value.c_str(), value.size());
+ }
+ /**
+ * Replace the value of a record.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @param vbuf the pointer to the value region.
+ * @param vsiz the size of the value region.
+ * @return true on success, or false on failure.
+ * @note If no record corresponds to the key, no new record is created and false is returned.
+ * If the corresponding record exists, the value is modified.
+ */
+ bool replace(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
+ _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
+ ScopedRWLock lock(&mlock_, true);
+ if (omode_ == 0) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
+ return false;
+ }
+ if (!cache_) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
+ return false;
+ }
+ if (!check_impl(kbuf, ksiz)) {
+ set_error(_KCCODELINE_, BasicDB::Error::NOREC, "no record");
+ return false;
+ }
+ bool err = false;
+ if (!clean_dbs(kbuf, ksiz)) err = true;
+ cache_->set(kbuf, ksiz, vbuf, vsiz);
+ csiz_ += ksiz + vsiz;
+ if (csiz_ > clim_ && !flush_cache()) err = false;
+ return !err;
+ }
+ /**
+ * Replace the value of a record.
+ * @note Equal to the original DB::replace method except that the parameters are std::string.
+ */
+ bool replace(const std::string& key, const std::string& value) {
+ _assert_(true);
+ return replace(key.c_str(), key.size(), value.c_str(), value.size());
+ }
+ /**
+ * Append the value of a record.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @param vbuf the pointer to the value region.
+ * @param vsiz the size of the value region.
+ * @return true on success, or false on failure.
+ * @note If no record corresponds to the key, a new record is created. If the corresponding
+ * record exists, the given value is appended at the end of the existing value.
+ */
+ bool append(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) {
+ _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ);
+ ScopedRWLock lock(&mlock_, true);
+ if (omode_ == 0) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
+ return false;
+ }
+ if (!cache_) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
+ return false;
+ }
+ bool err = false;
+ cache_->append(kbuf, ksiz, vbuf, vsiz);
+ csiz_ += ksiz + vsiz;
+ if (csiz_ > clim_ && !flush_cache()) err = false;
+ return !err;
+ }
+ /**
+ * Set the value of a record.
+ * @note Equal to the original DB::append method except that the parameters are std::string.
+ */
+ bool append(const std::string& key, const std::string& value) {
+ _assert_(true);
+ return append(key.c_str(), key.size(), value.c_str(), value.size());
+ }
+ /**
+ * Remove a record.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @return true on success, or false on failure.
+ * @note If no record corresponds to the key, false is returned.
+ */
+ bool remove(const char* kbuf, size_t ksiz) {
+ _assert_(kbuf && ksiz <= MEMMAXSIZ);
+ ScopedRWLock lock(&mlock_, true);
+ if (omode_ == 0) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
+ return false;
+ }
+ if (!cache_) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
+ return false;
+ }
+ bool err = false;
+ if (!clean_dbs(kbuf, ksiz)) err = true;
+ cache_->remove(kbuf, ksiz);
+ return !err;
+ }
+ /**
+ * Remove a record.
+ * @note Equal to the original DB::remove method except that the parameter is std::string.
+ */
+ bool remove(const std::string& key) {
+ _assert_(true);
+ return remove(key.c_str(), key.size());
+ }
+ /**
+ * Retrieve the value of a record.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @param sp the pointer to the variable into which the size of the region of the return
+ * value is assigned.
+ * @return the pointer to the value region of the corresponding record, or NULL on failure.
+ * @note If no record corresponds to the key, NULL is returned. Because an additional zero
+ * code is appended at the end of the region of the return value, the return value can be
+ * treated as a C-style string. Because the region of the return value is allocated with the
+ * the new[] operator, it should be released with the delete[] operator when it is no longer
+ * in use.
+ */
+ char* get(const char* kbuf, size_t ksiz, size_t* sp) {
+ _assert_(kbuf && ksiz <= MEMMAXSIZ && sp);
+ ScopedRWLock lock(&mlock_, false);
+ if (omode_ == 0) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
+ *sp = 0;
+ return false;
+ }
+ if (!cache_) return db_.get(kbuf, ksiz, sp);
+ size_t dvsiz = 0;
+ char* dvbuf = db_.get(kbuf, ksiz, &dvsiz);
+ size_t cvsiz = 0;
+ const char* cvbuf = cache_->get(kbuf, ksiz, &cvsiz);
+ struct Record {
+ char* buf;
+ size_t size;
+ };
+ Record* recs = NULL;
+ bool hit = false;
+ size_t rsiz = 0;
+ if (tmpdbs_) {
+ recs = new Record[dbnum_];
+ for (size_t i = 0; i < dbnum_; i++) {
+ BasicDB* tmpdb = tmpdbs_[i];
+ Record* rp = recs + i;
+ rp->buf = tmpdb->get(kbuf, ksiz, &rp->size);
+ if (rp->buf) {
+ rsiz += rp->size;
+ hit = true;
+ }
+ }
+ }
+ if (!hit) {
+ delete[] recs;
+ if (!dvbuf && !cvbuf) {
+ *sp = 0;
+ return NULL;
+ }
+ if (!dvbuf) {
+ dvbuf = new char[cvsiz+1];
+ std::memcpy(dvbuf, cvbuf, cvsiz);
+ *sp = cvsiz;
+ return dvbuf;
+ }
+ if (!cvbuf) {
+ *sp = dvsiz;
+ return dvbuf;
+ }
+ char* rbuf = new char[dvsiz+cvsiz+1];
+ std::memcpy(rbuf, dvbuf, dvsiz);
+ std::memcpy(rbuf + dvsiz, cvbuf, cvsiz);
+ delete[] dvbuf;
+ *sp = dvsiz + cvsiz;
+ return rbuf;
+ }
+ if (dvbuf) rsiz += dvsiz;
+ if (cvbuf) rsiz += cvsiz;
+ char* rbuf = new char[rsiz+1];
+ char* wp = rbuf;
+ if (dvbuf) {
+ std::memcpy(wp, dvbuf, dvsiz);
+ wp += dvsiz;
+ delete[] dvbuf;
+ }
+ if (cvbuf) {
+ std::memcpy(wp, cvbuf, cvsiz);
+ wp += cvsiz;
+ }
+ if (recs) {
+ for (size_t i = 0; i < dbnum_; i++) {
+ Record* rp = recs + i;
+ if (rp->buf) {
+ std::memcpy(wp, rp->buf, rp->size);
+ wp += rp->size;
+ delete[] rp->buf;
+ }
+ }
+ delete[] recs;
+ }
+ *sp = rsiz;
+ return rbuf;
+ }
+ /**
+ * Retrieve the value of a record.
+ * @note Equal to the original DB::get method except that the first parameters is the key
+ * string and the second parameter is a string to contain the result and the return value is
+ * bool for success.
+ */
+ bool get(const std::string& key, std::string* value) {
+ _assert_(value);
+ size_t vsiz;
+ char* vbuf = get(key.c_str(), key.size(), &vsiz);
+ if (!vbuf) return false;
+ value->clear();
+ value->append(vbuf, vsiz);
+ delete[] vbuf;
+ return true;
+ }
+ /**
+ * Synchronize updated contents with the file and the device.
+ * @param hard true for physical synchronization with the device, or false for logical
+ * synchronization with the file system.
+ * @param proc a postprocessor object. If it is NULL, no postprocessing is performed.
+ * @return true on success, or false on failure.
+ * @note The operation of the postprocessor is performed atomically and other threads accessing
+ * the same record are blocked. To avoid deadlock, any explicit database operation must not
+ * be performed in this function.
+ */
+ bool synchronize(bool hard = false, BasicDB::FileProcessor* proc = NULL) {
+ _assert_(true);
+ ScopedRWLock lock(&mlock_, true);
+ if (omode_ == 0) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
+ return false;
+ }
+ if (!cache_) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
+ return false;
+ }
+ bool err = false;
+ if (!flush_cache()) err = true;
+ if (tmpdbs_ && !merge_tmpdbs()) err = true;
+ if (!db_.synchronize(hard, proc)) err = true;
+ return !err;
+ }
+ /**
+ * Remove all records.
+ * @return true on success, or false on failure.
+ */
+ bool clear() {
+ _assert_(true);
+ ScopedRWLock lock(&mlock_, true);
+ if (omode_ == 0) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened");
+ return false;
+ }
+ if (!cache_) {
+ set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied");
+ return false;
+ }
+ cache_->clear();
+ csiz_ = 0;
+ return db_.clear();
+ }
+ /**
+ * Get the number of records.
+ * @return the number of records, or -1 on failure.
+ */
+ int64_t count() {
+ _assert_(true);
+ ScopedRWLock lock(&mlock_, false);
+ return count_impl();
+ }
+ /**
+ * Get the size of the database file.
+ * @return the size of the database file in bytes, or -1 on failure.
+ */
+ int64_t size() {
+ _assert_(true);
+ ScopedRWLock lock(&mlock_, false);
+ return size_impl();
+ }
+ /**
+ * Get the path of the database file.
+ * @return the path of the database file, or an empty string on failure.
+ */
+ std::string path() {
+ _assert_(true);
+ return db_.path();
+ }
+ /**
+ * Get the miscellaneous status information.
+ * @param strmap a string map to contain the result.
+ * @return true on success, or false on failure.
+ */
+ bool status(std::map<std::string, std::string>* strmap) {
+ _assert_(strmap);
+ return db_.status(strmap);
+ }
+ /**
+ * Reveal the inner database object.
+ * @return the inner database object, or NULL on failure.
+ */
+ PolyDB* reveal_inner_db() {
+ _assert_(true);
+ return &db_;
+ }
+ /**
+ * Create a cursor object.
+ * @return the return value is the created cursor object.
+ * @note Because the object of the return value is allocated by the constructor, it should be
+ * released with the delete operator when it is no longer in use.
+ */
+ BasicDB::Cursor* cursor() {
+ _assert_(true);
+ return db_.cursor();
+ }
+ /**
+ * Write a log message.
+ * @param file the file name of the program source code.
+ * @param line the line number of the program source code.
+ * @param func the function name of the program source code.
+ * @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
+ * information, Logger::WARN for warning, and Logger::ERROR for fatal error.
+ * @param message the supplement message.
+ */
+ void log(const char* file, int32_t line, const char* func, BasicDB::Logger::Kind kind,
+ const char* message) {
+ _assert_(file && line > 0 && func && message);
+ db_.log(file, line, func, kind, message);
+ }
+ /**
+ * Set the internal logger.
+ * @param logger the logger object.
+ * @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging,
+ * Logger::INFO for normal information, Logger::WARN for warning, and Logger::ERROR for fatal
+ * error.
+ * @return true on success, or false on failure.
+ */
+ bool tune_logger(BasicDB::Logger* logger,
+ uint32_t kinds = BasicDB::Logger::WARN | BasicDB::Logger::ERROR) {
+ _assert_(logger);
+ return db_.tune_logger(logger, kinds);
+ }
+ /**
+ * Set the internal meta operation trigger.
+ * @param trigger the trigger object.
+ * @return true on success, or false on failure.
+ */
+ bool tune_meta_trigger(BasicDB::MetaTrigger* trigger) {
+ _assert_(trigger);
+ return db_.tune_meta_trigger(trigger);
+ }
+ protected:
+ /**
+ * Report a message for debugging.
+ * @param file the file name of the program source code.
+ * @param line the line number of the program source code.
+ * @param func the function name of the program source code.
+ * @param format the printf-like format string.
+ * @param ... used according to the format string.
+ */
+ void report(const char* file, int32_t line, const char* func, const char* format, ...) {
+ _assert_(file && line > 0 && func && format);
+ std::string message;
+ va_list ap;
+ va_start(ap, format);
+ vstrprintf(&message, format, ap);
+ va_end(ap);
+ db_.log(file, line, func, BasicDB::Logger::INFO, message.c_str());
+ }
+ private:
+ /**
+ * Flush all cache records.
+ * @return true on success, or false on failure.
+ */
+ bool flush_cache() {
+ _assert_(true);
+ bool err = false;
+ double stime = time();
+ report(_KCCODELINE_, "flushing the cache");
+ if (tmpdbs_) {
+ BasicDB* tmpdb = tmpdbs_[dbclock_];
+ TinyHashMap::Sorter sorter(cache_);
+ const char* kbuf, *vbuf;
+ size_t ksiz, vsiz;
+ while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) {
+ if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) {
+ const BasicDB::Error& e = tmpdb->error();
+ db_.set_error(_KCCODELINE_, e.code(), e.message());
+ err = true;
+ }
+ sorter.step();
+ }
+ dbclock_ = (dbclock_ + 1) % dbnum_;
+ } else {
+ TinyHashMap::Sorter sorter(cache_);
+ const char* kbuf, *vbuf;
+ size_t ksiz, vsiz;
+ while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) {
+ if (!db_.append(kbuf, ksiz, vbuf, vsiz)) err = true;
+ sorter.step();
+ }
+ }
+ cache_->clear();
+ csiz_ = 0;
+ double etime = time();
+ report(_KCCODELINE_, "flushing the cache finished: time=%.6f", etime - stime);
+ return !err;
+ }
+ /**
+ * Merge temporary databases.
+ * @return true on success, or false on failure.
+ */
+ bool merge_tmpdbs() {
+ _assert_(true);
+ bool err = false;
+ report(_KCCODELINE_, "merging the temporary databases");
+ double stime = time();
+ if (!db_.merge(tmpdbs_, dbnum_, PolyDB::MAPPEND)) err = true;
+ dbclock_ = 0;
+ for (size_t i = 0; i < dbnum_; i++) {
+ BasicDB* tmpdb = tmpdbs_[i];
+ if (!tmpdb->clear()) {
+ const BasicDB::Error& e = tmpdb->error();
+ set_error(_KCCODELINE_, e.code(), e.message());
+ err = true;
+ }
+ }
+ double etime = time();
+ report(_KCCODELINE_, "merging the temporary databases finished: %.6f", etime - stime);
+ return !err;
+ }
+ /**
+ * Remove a record from databases.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @return true on success, or false on failure.
+ */
+ bool clean_dbs(const char* kbuf, size_t ksiz) {
+ _assert_(kbuf && ksiz <= MEMMAXSIZ);
+ if (db_.remove(kbuf, ksiz)) return true;
+ bool err = false;
+ if (db_.error() != BasicDB::Error::NOREC) err = true;
+ if (tmpdbs_) {
+ for (size_t i = 0; i < dbnum_; i++) {
+ BasicDB* tmpdb = tmpdbs_[i];
+ if (!tmpdb->remove(kbuf, ksiz)) {
+ const BasicDB::Error& e = tmpdb->error();
+ if (e != BasicDB::Error::NOREC) {
+ set_error(_KCCODELINE_, e.code(), e.message());
+ err = true;
+ }
+ }
+ }
+ }
+ return !err;
+ }
+ /**
+ * Check whether a record exists.
+ * @param kbuf the pointer to the key region.
+ * @param ksiz the size of the key region.
+ * @return true if the record exists, or false if not.
+ */
+ bool check_impl(const char* kbuf, size_t ksiz) {
+ _assert_(kbuf && ksiz <= MEMMAXSIZ);
+ char vbuf;
+ if (db_.get(kbuf, ksiz, &vbuf, 1) >= 0) return true;
+ if (cache_) {
+ size_t vsiz;
+ if (cache_->get(kbuf, ksiz, &vsiz)) return true;
+ if (tmpdbs_) {
+ for (size_t i = 0; i < dbnum_; i++) {
+ BasicDB* tmpdb = tmpdbs_[i];
+ if (tmpdb->get(kbuf, ksiz, &vbuf, 1)) return true;
+ }
+ }
+ }
+ return false;
+ }
+ /**
+ * Get the number of records.
+ * @return the number of records, or -1 on failure.
+ */
+ int64_t count_impl() {
+ _assert_(true);
+ int64_t dbcnt = db_.count();
+ if (dbcnt < 0) return -1;
+ if (!cache_) return dbcnt;
+ int64_t ccnt = cache_->count();
+ return dbcnt > ccnt ? dbcnt : ccnt;
+ }
+ /**
+ * Get the size of the database file.
+ * @return the size of the database file in bytes.
+ */
+ int64_t size_impl() {
+ _assert_(true);
+ int64_t dbsiz = db_.size();
+ if (dbsiz < 0) return -1;
+ return dbsiz + csiz_;
+ }
+ /** Dummy constructor to forbid the use. */
+ IndexDB(const IndexDB&);
+ /** Dummy Operator to forbid the use. */
+ IndexDB& operator =(const IndexDB&);
+ /** The method lock. */
+ RWLock mlock_;
+ /** The internal database. */
+ PolyDB db_;
+ /** The open mode. */
+ uint32_t omode_;
+ /** The record comparator. */
+ Comparator* rcomp_;
+ /** The base path of temporary databases. */
+ std::string tmppath_;
+ /** The temporary databases. */
+ BasicDB** tmpdbs_;
+ /** The number of temporary databases. */
+ size_t dbnum_;
+ /** The logical clock for temporary databases. */
+ int64_t dbclock_;
+ /** The internal cache. */
+ TinyHashMap* cache_;
+ /** The current size of the internal cache. */
+ int64_t csiz_;
+ /** The limit size of the internal cache. */
+ int64_t clim_;
+};
+
+
+} // common namespace
+
+#endif // duplication check
+
+// END OF FILE