diff options
Diffstat (limited to 'plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h')
| -rw-r--r-- | plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h | 1688 |
1 files changed, 0 insertions, 1688 deletions
diff --git a/plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h b/plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h deleted file mode 100644 index 9a6beb1360..0000000000 --- a/plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h +++ /dev/null @@ -1,1688 +0,0 @@ -/************************************************************************************************* - * Database extension - * Copyright (C) 2009-2012 FAL Labs - * This file is part of Kyoto Cabinet. - * This program is free software: you can redistribute it and/or modify it under the terms of - * the GNU General Public License as published by the Free Software Foundation, either version - * 3 of the License, or any later version. - * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; - * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. - * You should have received a copy of the GNU General Public License along with this program. - * If not, see <http://www.gnu.org/licenses/>. - *************************************************************************************************/ - - -#ifndef _KCDBEXT_H // duplication check -#define _KCDBEXT_H - -#include <kccommon.h> -#include <kcutil.h> -#include <kcthread.h> -#include <kcfile.h> -#include <kccompress.h> -#include <kccompare.h> -#include <kcmap.h> -#include <kcregex.h> -#include <kcdb.h> -#include <kcplantdb.h> -#include <kcprotodb.h> -#include <kcstashdb.h> -#include <kccachedb.h> -#include <kchashdb.h> -#include <kcdirdb.h> -#include <kcpolydb.h> - -namespace kyotocabinet { // common namespace - - -/** - * MapReduce framework. - * @note Although this framework is not distributed or concurrent, it is useful for aggregate - * calculation with less CPU loading and less memory usage. - */ -class MapReduce { - public: - class ValueIterator; - private: - class FlushThread; - class ReduceTaskQueue; - class MapVisitor; - struct MergeLine; - /** An alias of vector of loaded values. */ - typedef std::vector<std::string> Values; - /** The default number of temporary databases. */ - static const size_t DEFDBNUM = 8; - /** The maxinum number of temporary databases. */ - static const size_t MAXDBNUM = 256; - /** The default cache limit. */ - static const int64_t DEFCLIM = 512LL << 20; - /** The default cache bucket numer. */ - static const int64_t DEFCBNUM = 1048583LL; - /** The bucket number of temprary databases. */ - static const int64_t DBBNUM = 512LL << 10; - /** The page size of temprary databases. */ - static const int32_t DBPSIZ = 32768; - /** The mapped size of temprary databases. */ - static const int64_t DBMSIZ = 516LL * 4096; - /** The page cache capacity of temprary databases. */ - static const int64_t DBPCCAP = 16LL << 20; - /** The default number of threads in parallel mode. */ - static const size_t DEFTHNUM = 8; - /** The number of slots of the record lock. */ - static const int32_t RLOCKSLOT = 256; - public: - /** - * Value iterator for the reducer. - */ - class ValueIterator { - friend class MapReduce; - public: - /** - * Get the next value. - * @param sp the pointer to the variable into which the size of the region of the return - * value is assigned. - * @return the pointer to the next value region, or NULL if no value remains. - */ - const char* next(size_t* sp) { - _assert_(sp); - if (!vptr_) { - if (vit_ == vend_) return NULL; - vptr_ = vit_->data(); - vsiz_ = vit_->size(); - vit_++; - } - uint64_t vsiz; - size_t step = readvarnum(vptr_, vsiz_, &vsiz); - vptr_ += step; - vsiz_ -= step; - const char* vbuf = vptr_; - *sp = vsiz; - vptr_ += vsiz; - vsiz_ -= vsiz; - if (vsiz_ < 1) vptr_ = NULL; - return vbuf; - } - private: - /** - * Default constructor. - */ - explicit ValueIterator(Values::const_iterator vit, Values::const_iterator vend) : - vit_(vit), vend_(vend), vptr_(NULL), vsiz_(0) { - _assert_(true); - } - /** - * Destructor. - */ - ~ValueIterator() { - _assert_(true); - } - /** Dummy constructor to forbid the use. */ - ValueIterator(const ValueIterator&); - /** Dummy Operator to forbid the use. */ - ValueIterator& operator =(const ValueIterator&); - /** The current iterator of loaded values. */ - Values::const_iterator vit_; - /** The ending iterator of loaded values. */ - Values::const_iterator vend_; - /** The pointer of the current value. */ - const char* vptr_; - /** The size of the current value. */ - size_t vsiz_; - }; - /** - * Execution options. - */ - enum Option { - XNOLOCK = 1 << 0, ///< avoid locking against update operations - XPARAMAP = 1 << 1, ///< run mappers in parallel - XPARARED = 1 << 2, ///< run reducers in parallel - XPARAFLS = 1 << 3, ///< run cache flushers in parallel - XNOCOMP = 1 << 8 ///< avoid compression of temporary databases - }; - /** - * Default constructor. - */ - explicit MapReduce() : - db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0), - mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), flsthnum_(DEFTHNUM), - cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), flsths_(NULL), - redtasks_(NULL), redaborted_(false), rlocks_(NULL) { - _assert_(true); - } - /** - * Destructor. - */ - virtual ~MapReduce() { - _assert_(true); - } - /** - * Map a record data. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @param vbuf the pointer to the value region. - * @param vsiz the size of the value region. - * @return true on success, or false on failure. - * @note This function can call the MapReduce::emit method to emit a record. To avoid - * deadlock, any explicit database operation must not be performed in this function. - */ - virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) = 0; - /** - * Reduce a record data. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @param iter the iterator to get the values. - * @return true on success, or false on failure. - * @note To avoid deadlock, any explicit database operation must not be performed in this - * function. - */ - virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0; - /** - * Preprocess the map operations. - * @return true on success, or false on failure. - * @note This function can call the MapReduce::emit method to emit a record. To avoid - * deadlock, any explicit database operation must not be performed in this function. - */ - virtual bool preprocess() { - _assert_(true); - return true; - } - /** - * Mediate between the map and the reduce phases. - * @return true on success, or false on failure. - * @note This function can call the MapReduce::emit method to emit a record. To avoid - * deadlock, any explicit database operation must not be performed in this function. - */ - virtual bool midprocess() { - _assert_(true); - return true; - } - /** - * Postprocess the reduce operations. - * @return true on success, or false on failure. - * @note To avoid deadlock, any explicit database operation must not be performed in this - * function. - */ - virtual bool postprocess() { - _assert_(true); - return true; - } - /** - * Process a log message. - * @param name the name of the event. - * @param message a supplement message. - * @return true on success, or false on failure. - */ - virtual bool log(const char* name, const char* message) { - _assert_(name && message); - return true; - } - /** - * Execute the MapReduce process about a database. - * @param db the source database. - * @param tmppath the path of a directory for the temporary data storage. If it is an empty - * string, temporary data are handled on memory. - * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking - * against update operations by other threads, MapReduce::XPARAMAP to run the mapper in - * parallel, MapReduce::XPARARED to run the reducer in parallel, MapReduce::XNOCOMP to avoid - * compression of temporary databases. - * @return true on success, or false on failure. - */ - bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) { - int64_t count = db->count(); - if (count < 0) { - if (db->error() != BasicDB::Error::NOIMPL) return false; - count = 0; - } - bool err = false; - double stime, etime; - db_ = db; - rcomp_ = LEXICALCOMP; - BasicDB* idb = db; - if (typeid(*db) == typeid(PolyDB)) { - PolyDB* pdb = (PolyDB*)idb; - idb = pdb->reveal_inner_db(); - } - const std::type_info& info = typeid(*idb); - if (info == typeid(GrassDB)) { - GrassDB* gdb = (GrassDB*)idb; - rcomp_ = gdb->rcomp(); - } else if (info == typeid(TreeDB)) { - TreeDB* tdb = (TreeDB*)idb; - rcomp_ = tdb->rcomp(); - } else if (info == typeid(ForestDB)) { - ForestDB* fdb = (ForestDB*)idb; - rcomp_ = fdb->rcomp(); - } - tmpdbs_ = new BasicDB*[dbnum_]; - if (tmppath.empty()) { - if (!logf("prepare", "started to open temporary databases on memory")) err = true; - stime = time(); - for (size_t i = 0; i < dbnum_; i++) { - GrassDB* gdb = new GrassDB; - int32_t myopts = 0; - if (!(opts & XNOCOMP)) myopts |= GrassDB::TCOMPRESS; - gdb->tune_options(myopts); - gdb->tune_buckets(DBBNUM / 2); - gdb->tune_page(DBPSIZ); - gdb->tune_page_cache(DBPCCAP); - gdb->tune_comparator(rcomp_); - gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUNCATE); - tmpdbs_[i] = gdb; - } - etime = time(); - if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime)) - err = true; - if (err) { - delete[] tmpdbs_; - return false; - } - } else { - File::Status sbuf; - if (!File::status(tmppath, &sbuf) || !sbuf.isdir) { - db->set_error(_KCCODELINE_, BasicDB::Error::NOREPOS, "no such directory"); - delete[] tmpdbs_; - return false; - } - if (!logf("prepare", "started to open temporary databases under %s", tmppath.c_str())) - err = true; - stime = time(); - uint32_t pid = getpid() & UINT16MAX; - uint32_t tid = Thread::hash() & UINT16MAX; - uint32_t ts = time() * 1000; - for (size_t i = 0; i < dbnum_; i++) { - std::string childpath = - strprintf("%s%cmr-%04x-%04x-%08x-%03d%ckct", - tmppath.c_str(), File::PATHCHR, pid, tid, ts, (int)(i + 1), File::EXTCHR); - TreeDB* tdb = new TreeDB; - int32_t myopts = TreeDB::TSMALL | TreeDB::TLINEAR; - if (!(opts & XNOCOMP)) myopts |= TreeDB::TCOMPRESS; - tdb->tune_options(myopts); - tdb->tune_buckets(DBBNUM); - tdb->tune_page(DBPSIZ); - tdb->tune_map(DBMSIZ); - tdb->tune_page_cache(DBPCCAP); - tdb->tune_comparator(rcomp_); - if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeDB::OTRUNCATE)) { - const BasicDB::Error& e = tdb->error(); - db->set_error(_KCCODELINE_, e.code(), e.message()); - err = true; - } - tmpdbs_[i] = tdb; - } - etime = time(); - if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime)) - err = true; - if (err) { - for (size_t i = 0; i < dbnum_; i++) { - delete tmpdbs_[i]; - } - delete[] tmpdbs_; - return false; - } - } - if (opts & XPARARED) redtasks_ = new ReduceTaskQueue; - if (opts & XPARAFLS) flsths_ = new std::deque<FlushThread*>; - if (opts & XNOLOCK) { - MapChecker mapchecker; - MapVisitor mapvisitor(this, &mapchecker, count); - mapvisitor.visit_before(); - if (!err) { - BasicDB::Cursor* cur = db->cursor(); - if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = true; - while (!err) { - if (!cur->accept(&mapvisitor, false, true)) { - if (cur->error() != BasicDB::Error::NOREC) err = true; - break; - } - } - delete cur; - } - if (mapvisitor.error()) { - db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"); - err = true; - } - mapvisitor.visit_after(); - } else if (opts & XPARAMAP) { - MapChecker mapchecker; - MapVisitor mapvisitor(this, &mapchecker, count); - rlocks_ = new SlottedMutex(RLOCKSLOT); - if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker)) { - db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"); - err = true; - } - delete rlocks_; - rlocks_ = NULL; - if (mapvisitor.error()) err = true; - } else { - MapChecker mapchecker; - MapVisitor mapvisitor(this, &mapchecker, count); - if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true; - if (mapvisitor.error()) { - db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"); - err = true; - } - } - if (flsths_) { - delete flsths_; - flsths_ = NULL; - } - if (redtasks_) { - delete redtasks_; - redtasks_ = NULL; - } - if (!logf("clean", "closing the temporary databases")) err = true; - stime = time(); - for (size_t i = 0; i < dbnum_; i++) { - const std::string& path = tmpdbs_[i]->path(); - if (!tmpdbs_[i]->clear()) { - const BasicDB::Error& e = tmpdbs_[i]->error(); - db->set_error(_KCCODELINE_, e.code(), e.message()); - err = true; - } - if (!tmpdbs_[i]->close()) { - const BasicDB::Error& e = tmpdbs_[i]->error(); - db->set_error(_KCCODELINE_, e.code(), e.message()); - err = true; - } - if (!tmppath.empty()) File::remove(path); - delete tmpdbs_[i]; - } - etime = time(); - if (!logf("clean", "closing the temporary databases finished: time=%.6f", - etime - stime)) err = true; - delete[] tmpdbs_; - return !err; - } - /** - * Set the storage configurations. - * @param dbnum the number of temporary databases. - * @param clim the limit size of the internal cache. - * @param cbnum the bucket number of the internal cache. - */ - void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { - _assert_(true); - dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; - if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; - clim_ = clim > 0 ? clim : DEFCLIM; - cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; - if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); - } - /** - * Set the thread configurations. - * @param mapthnum the number of threads for the mapper. - * @param redthnum the number of threads for the reducer. - * @param flsthnum the number of threads for the internal flusher. - */ - void tune_thread(int32_t mapthnum, int32_t redthnum, int32_t flsthnum) { - _assert_(true); - mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM; - redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM; - flsthnum_ = flsthnum > 0 ? flsthnum : DEFTHNUM; - } - protected: - /** - * Emit a record from the mapper. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @param vbuf the pointer to the value region. - * @param vsiz the size of the value region. - * @return true on success, or false on failure. - */ - bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { - _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); - bool err = false; - size_t rsiz = sizevarnum(vsiz) + vsiz; - char stack[NUMBUFSIZ*4]; - char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; - char* wp = rbuf; - wp += writevarnum(rbuf, vsiz); - std::memcpy(wp, vbuf, vsiz); - if (rlocks_) { - size_t bidx = TinyHashMap::hash_record(kbuf, ksiz) % cbnum_; - size_t lidx = bidx % RLOCKSLOT; - rlocks_->lock(lidx); - cache_->append(kbuf, ksiz, rbuf, rsiz); - rlocks_->unlock(lidx); - } else { - cache_->append(kbuf, ksiz, rbuf, rsiz); - } - if (rbuf != stack) delete[] rbuf; - csiz_ += sizevarnum(ksiz) + ksiz + rsiz; - return !err; - } - private: - /** - * Cache flusher. - */ - class FlushThread : public Thread { - public: - /** constructor */ - explicit FlushThread(MapReduce* mr, BasicDB* tmpdb, - TinyHashMap* cache, size_t csiz, bool cown) : - mr_(mr), tmpdb_(tmpdb), cache_(cache), csiz_(csiz), cown_(cown), err_(false) {} - /** perform the concrete process */ - void run() { - if (!mr_->logf("map", "started to flushing the cache: count=%lld size=%lld", - (long long)cache_->count(), (long long)csiz_)) err_ = true; - double stime = time(); - BasicDB* tmpdb = tmpdb_; - TinyHashMap* cache = cache_; - bool cown = cown_; - TinyHashMap::Sorter sorter(cache); - const char* kbuf, *vbuf; - size_t ksiz, vsiz; - while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { - if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) { - const BasicDB::Error& e = tmpdb->error(); - mr_->db_->set_error(_KCCODELINE_, e.code(), e.message()); - err_ = true; - } - sorter.step(); - if (cown) cache->remove(kbuf, ksiz); - } - double etime = time(); - if (!mr_->logf("map", "flushing the cache finished: time=%.6f", etime - stime)) - err_ = true; - if (cown) delete cache; - } - /** check the error flag. */ - bool error() { - return err_; - } - private: - MapReduce* mr_; ///< driver - BasicDB* tmpdb_; ///< temprary database - TinyHashMap* cache_; ///< cache for emitter - size_t csiz_; ///< current cache size - bool cown_; ///< cache ownership flag - bool err_; ///< error flag - }; - /** - * Task queue for parallel reducer. - */ - class ReduceTaskQueue : public TaskQueue { - public: - /** - * Task for parallel reducer. - */ - class ReduceTask : public Task { - friend class ReduceTaskQueue; - public: - /** constructor */ - explicit ReduceTask(MapReduce* mr, const char* kbuf, size_t ksiz, const Values& values) : - mr_(mr), key_(kbuf, ksiz), values_(values) {} - private: - MapReduce* mr_; ///< driver - std::string key_; ///< key - Values values_; ///< values - }; - /** constructor */ - explicit ReduceTaskQueue() {} - private: - /** process a task */ - void do_task(Task* task) { - ReduceTask* rtask = (ReduceTask*)task; - ValueIterator iter(rtask->values_.begin(), rtask->values_.end()); - if (!rtask->mr_->reduce(rtask->key_.data(), rtask->key_.size(), &iter)) - rtask->mr_->redaborted_ = true; - delete rtask; - } - }; - /** - * Checker for the map process. - */ - class MapChecker : public BasicDB::ProgressChecker { - public: - /** constructor */ - explicit MapChecker() : stop_(false) {} - /** stop the process */ - void stop() { - stop_ = true; - } - /** check whether stopped */ - bool stopped() { - return stop_; - } - private: - /** check whether stopped */ - bool check(const char* name, const char* message, int64_t curcnt, int64_t allcnt) { - return !stop_; - } - bool stop_; ///< flag for stop - }; - /** - * Visitor for the map process. - */ - class MapVisitor : public BasicDB::Visitor { - public: - /** constructor */ - explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale) : - mr_(mr), checker_(checker), scale_(scale), stime_(0), err_(false) {} - /** get the error flag */ - bool error() { - return err_; - } - /** preprocess the mappter */ - void visit_before() { - mr_->dbclock_ = 0; - mr_->cache_ = new TinyHashMap(mr_->cbnum_); - mr_->csiz_ = 0; - if (!mr_->preprocess()) err_ = true; - if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; - if (!mr_->logf("map", "started the map process: scale=%lld", (long long)scale_)) - err_ = true; - stime_ = time(); - } - /** postprocess the mappter and call the reducer */ - void visit_after() { - if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; - double etime = time(); - if (!mr_->logf("map", "the map process finished: time=%.6f", etime - stime_)) - err_ = true; - if (!mr_->midprocess()) err_ = true; - if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; - delete mr_->cache_; - if (mr_->flsths_ && !mr_->flsths_->empty()) { - std::deque<FlushThread*>::iterator flthit = mr_->flsths_->begin(); - std::deque<FlushThread*>::iterator flthitend = mr_->flsths_->end(); - while (flthit != flthitend) { - FlushThread* flth = *flthit; - flth->join(); - if (flth->error()) err_ = true; - delete flth; - ++flthit; - } - } - if (!err_ && !mr_->execute_reduce()) err_ = true; - if (!mr_->postprocess()) err_ = true; - } - private: - /** visit a record */ - const char* visit_full(const char* kbuf, size_t ksiz, - const char* vbuf, size_t vsiz, size_t* sp) { - if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { - checker_->stop(); - err_ = true; - } - if (mr_->rlocks_) { - if (mr_->csiz_ >= mr_->clim_) { - mr_->rlocks_->lock_all(); - if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { - checker_->stop(); - err_ = true; - } - mr_->rlocks_->unlock_all(); - } - } else { - if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { - checker_->stop(); - err_ = true; - } - } - return NOP; - } - MapReduce* mr_; ///< driver - MapChecker* checker_; ///< checker - int64_t scale_; ///< number of records - double stime_; ///< start time - bool err_; ///< error flag - }; - /** - * Front line of a merging list. - */ - struct MergeLine { - BasicDB::Cursor* cur; ///< cursor - Comparator* rcomp; ///< record comparator - char* kbuf; ///< pointer to the key - size_t ksiz; ///< size of the key - const char* vbuf; ///< pointer to the value - size_t vsiz; ///< size of the value - /** comparing operator */ - bool operator <(const MergeLine& right) const { - return rcomp->compare(kbuf, ksiz, right.kbuf, right.ksiz) > 0; - } - }; - /** - * Process a log message. - * @param name the name of the event. - * @param format the printf-like format string. - * @param ... used according to the format string. - * @return true on success, or false on failure. - */ - bool logf(const char* name, const char* format, ...) { - _assert_(name && format); - va_list ap; - va_start(ap, format); - std::string message; - vstrprintf(&message, format, ap); - va_end(ap); - return log(name, message.c_str()); - } - /** - * Flush all cache records. - * @return true on success, or false on failure. - */ - bool flush_cache() { - _assert_(true); - bool err = false; - BasicDB* tmpdb = tmpdbs_[dbclock_]; - dbclock_ = (dbclock_ + 1) % dbnum_; - if (flsths_) { - size_t num = flsths_->size(); - if (num >= flsthnum_ || num >= dbnum_) { - FlushThread* flth = flsths_->front(); - flsths_->pop_front(); - flth->join(); - if (flth->error()) err = true; - delete flth; - } - FlushThread* flth = new FlushThread(this, tmpdb, cache_, csiz_, true); - cache_ = new TinyHashMap(cbnum_); - csiz_ = 0; - flth->start(); - flsths_->push_back(flth); - } else { - FlushThread flth(this, tmpdb, cache_, csiz_, false); - flth.run(); - if (flth.error()) err = true; - cache_->clear(); - csiz_ = 0; - } - return !err; - } - /** - * Execute the reduce part. - * @return true on success, or false on failure. - */ - bool execute_reduce() { - bool err = false; - int64_t scale = 0; - for (size_t i = 0; i < dbnum_; i++) { - scale += tmpdbs_[i]->count(); - } - if (!logf("reduce", "started the reduce process: scale=%lld", (long long)scale)) err = true; - double stime = time(); - if (redtasks_) redtasks_->start(redthnum_); - std::priority_queue<MergeLine> lines; - for (size_t i = 0; i < dbnum_; i++) { - MergeLine line; - line.cur = tmpdbs_[i]->cursor(); - line.rcomp = rcomp_; - line.cur->jump(); - line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); - if (line.kbuf) { - lines.push(line); - } else { - delete line.cur; - } - } - char* lkbuf = NULL; - size_t lksiz = 0; - Values values; - while (!err && !lines.empty()) { - MergeLine line = lines.top(); - lines.pop(); - if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lksiz))) { - if (!call_reducer(lkbuf, lksiz, values)) { - db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed"); - err = true; - } - values.clear(); - } - delete[] lkbuf; - lkbuf = line.kbuf; - lksiz = line.ksiz; - values.push_back(std::string(line.vbuf, line.vsiz)); - line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); - if (line.kbuf) { - lines.push(line); - } else { - delete line.cur; - } - } - if (lkbuf) { - if (!err && !call_reducer(lkbuf, lksiz, values)) { - db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed"); - err = true; - } - delete[] lkbuf; - } - while (!lines.empty()) { - MergeLine line = lines.top(); - lines.pop(); - delete[] line.kbuf; - delete line.cur; - } - if (redtasks_) redtasks_->finish(); - double etime = time(); - if (!logf("reduce", "the reduce process finished: time=%.6f", etime - stime)) err = true; - return !err; - } - /** - * Call the reducer. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @param values a vector of the values. - * @return true on success, or false on failure. - */ - bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { - _assert_(kbuf && ksiz <= MEMMAXSIZ); - if (redtasks_) { - if (redaborted_) return false; - ReduceTaskQueue::ReduceTask* task = - new ReduceTaskQueue::ReduceTask(this, kbuf, ksiz, values); - redtasks_->add_task(task); - return true; - } - bool err = false; - ValueIterator iter(values.begin(), values.end()); - if (!reduce(kbuf, ksiz, &iter)) err = true; - return !err; - } - /** Dummy constructor to forbid the use. */ - MapReduce(const MapReduce&); - /** Dummy Operator to forbid the use. */ - MapReduce& operator =(const MapReduce&); - /** The internal database. */ - BasicDB* db_; - /** The record comparator. */ - Comparator* rcomp_; - /** The temporary databases. */ - BasicDB** tmpdbs_; - /** The number of temporary databases. */ - size_t dbnum_; - /** The logical clock for temporary databases. */ - int64_t dbclock_; - /** The number of the mapper threads. */ - size_t mapthnum_; - /** The number of the reducer threads. */ - size_t redthnum_; - /** The number of the flusher threads. */ - size_t flsthnum_; - /** The cache for emitter. */ - TinyHashMap* cache_; - /** The current size of the cache for emitter. */ - AtomicInt64 csiz_; - /** The limit size of the cache for emitter. */ - int64_t clim_; - /** The bucket number of the cache for emitter. */ - int64_t cbnum_; - /** The flush threads. */ - std::deque<FlushThread*>* flsths_; - /** The task queue for parallel reducer. */ - TaskQueue* redtasks_; - /** The flag whether aborted. */ - bool redaborted_; - /** The whole lock. */ - SlottedMutex* rlocks_; -}; - - -/** - * Index database. - * @note This class is designed to implement an indexing storage with an efficient appending - * operation for the existing record values. This class is a wrapper of the polymorphic - * database, featuring buffering mechanism to alleviate IO overhead in the database layer. This - * class can be inherited but overwriting methods is forbidden. Before every database operation, - * it is necessary to call the IndexDB::open method in order to open a database file and connect - * the database object to it. To avoid data missing or corruption, it is important to close - * every database file by the IndexDB::close method when the database is no longer in use. It - * is forbidden for multible database objects in a process to open the same database at the same - * time. It is forbidden to share a database object with child processes. - */ -class IndexDB { - private: - /** The default number of temporary databases. */ - static const size_t DEFDBNUM = 8; - /** The maxinum number of temporary databases. */ - static const size_t MAXDBNUM = 256; - /** The default cache limit size. */ - static const int64_t DEFCLIM = 256LL << 20; - /** The default cache bucket number. */ - static const int64_t DEFCBNUM = 1048583LL; - /** The bucket number of temprary databases. */ - static const int64_t DBBNUM = 512LL << 10; - /** The page size of temprary databases. */ - static const int32_t DBPSIZ = 32768; - /** The mapped size of temprary databases. */ - static const int64_t DBMSIZ = 516LL * 4096; - /** The page cache capacity of temprary databases. */ - static const int64_t DBPCCAP = 16LL << 20; - public: - /** - * Default constructor. - */ - explicit IndexDB() : - db_(), omode_(0), - rcomp_(NULL), tmppath_(""), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0), - cache_(NULL), csiz_(0), clim_(0) { - _assert_(true); - } - /** - * Destructor. - * @note If the database is not closed, it is closed implicitly. - */ - virtual ~IndexDB() { - _assert_(true); - if (omode_ != 0) close(); - } - /** - * Get the last happened error. - * @return the last happened error. - */ - BasicDB::Error error() const { - _assert_(true); - return db_.error(); - } - /** - * Set the error information. - * @param file the file name of the program source code. - * @param line the line number of the program source code. - * @param func the function name of the program source code. - * @param code an error code. - * @param message a supplement message. - */ - void set_error(const char* file, int32_t line, const char* func, - BasicDB::Error::Code code, const char* message) { - _assert_(file && line > 0 && func && message); - db_.set_error(file, line, func, code, message); - } - /** - * Set the error information without source code information. - * @param code an error code. - * @param message a supplement message. - */ - void set_error(BasicDB::Error::Code code, const char* message) { - _assert_(message); - db_.set_error(_KCCODELINE_, code, message); - } - /** - * Open a database file. - * @param path the path of a database file. The same as with PolyDB. In addition, the - * following tuning parameters are supported. "idxclim" specifies the limit size of the - * internal cache. "idxcbnum" the bucket number of the internal cache. "idxdbnum" specifies - * the number of internal databases. "idxtmppath' specifies the path of the temporary - * directory. - * @param mode the connection mode. The same as with PolyDB. - * @return true on success, or false on failure. - * @note Every opened database must be closed by the IndexDB::close method when it is no longer - * in use. It is not allowed for two or more database objects in the same process to keep - * their connections to the same database file at the same time. - */ - bool open(const std::string& path = ":", - uint32_t mode = BasicDB::OWRITER | BasicDB::OCREATE) { - _assert_(true); - // ScopedRWLock lock(&mlock_, true); - if (omode_ != 0) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "already opened"); - return false; - } - std::vector<std::string> elems; - strsplit(path, '#', &elems); - int64_t clim = 0; - int64_t cbnum = 0; - size_t dbnum = 0; - std::string tmppath = ""; - std::vector<std::string>::iterator it = elems.begin(); - std::vector<std::string>::iterator itend = elems.end(); - if (it != itend) ++it; - while (it != itend) { - std::vector<std::string> fields; - if (strsplit(*it, '=', &fields) > 1) { - const char* key = fields[0].c_str(); - const char* value = fields[1].c_str(); - if (!std::strcmp(key, "idxclim") || !std::strcmp(key, "idxcachelimit")) { - clim = atoix(value); - } else if (!std::strcmp(key, "idxcbnum") || !std::strcmp(key, "idxcachebuckets")) { - cbnum = atoix(value); - } else if (!std::strcmp(key, "idxdbnum")) { - dbnum = atoix(value); - } else if (!std::strcmp(key, "idxtmppath")) { - tmppath = value; - } - } - ++it; - } - if (!db_.open(path, mode)) return false; - tmppath_ = tmppath; - rcomp_ = LEXICALCOMP; - BasicDB* idb = &db_; - if (typeid(db_) == typeid(PolyDB)) { - PolyDB* pdb = (PolyDB*)idb; - idb = pdb->reveal_inner_db(); - } - const std::type_info& info = typeid(*idb); - if (info == typeid(GrassDB)) { - GrassDB* gdb = (GrassDB*)idb; - rcomp_ = gdb->rcomp(); - } else if (info == typeid(TreeDB)) { - TreeDB* tdb = (TreeDB*)idb; - rcomp_ = tdb->rcomp(); - } else if (info == typeid(ForestDB)) { - ForestDB* fdb = (ForestDB*)idb; - rcomp_ = fdb->rcomp(); - } - dbnum_ = dbnum < MAXDBNUM ? dbnum : MAXDBNUM; - dbclock_ = 0; - if ((mode & BasicDB::OWRITER) && dbnum > 0) { - tmpdbs_ = new BasicDB*[dbnum_]; - if (tmppath_.empty()) { - report(_KCCODELINE_, "started to open temporary databases on memory"); - double stime = time(); - for (size_t i = 0; i < dbnum_; i++) { - GrassDB* gdb = new GrassDB; - gdb->tune_options(GrassDB::TCOMPRESS); - gdb->tune_buckets(DBBNUM / 2); - gdb->tune_page(DBPSIZ); - gdb->tune_page_cache(DBPCCAP); - gdb->tune_comparator(rcomp_); - gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUNCATE); - tmpdbs_[i] = gdb; - } - double etime = time(); - report(_KCCODELINE_, "opening temporary databases finished: time=%.6f", etime - stime); - } else { - File::Status sbuf; - if (!File::status(tmppath_, &sbuf) || !sbuf.isdir) { - set_error(_KCCODELINE_, BasicDB::Error::NOREPOS, "no such directory"); - delete[] tmpdbs_; - db_.close(); - return false; - } - report(_KCCODELINE_, "started to open temporary databases under %s", tmppath.c_str()); - double stime = time(); - uint32_t pid = getpid() & UINT16MAX; - uint32_t tid = Thread::hash() & UINT16MAX; - uint32_t ts = time() * 1000; - bool err = false; - for (size_t i = 0; i < dbnum_; i++) { - std::string childpath = - strprintf("%s%cidx-%04x-%04x-%08x-%03d%ckct", - tmppath_.c_str(), File::PATHCHR, pid, tid, ts, - (int)(i + 1), File::EXTCHR); - TreeDB* tdb = new TreeDB; - tdb->tune_options(TreeDB::TSMALL | TreeDB::TLINEAR); - tdb->tune_buckets(DBBNUM); - tdb->tune_page(DBPSIZ); - tdb->tune_map(DBMSIZ); - tdb->tune_page_cache(DBPCCAP); - tdb->tune_comparator(rcomp_); - if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeDB::OTRUNCATE)) { - const BasicDB::Error& e = tdb->error(); - set_error(_KCCODELINE_, e.code(), e.message()); - err = true; - } - tmpdbs_[i] = tdb; - } - double etime = time(); - report(_KCCODELINE_, "opening temporary databases finished: time=%.6f", etime - stime); - if (err) { - for (size_t i = 0; i < dbnum_; i++) { - delete tmpdbs_[i]; - } - delete[] tmpdbs_; - db_.close(); - return false; - } - } - } else { - tmpdbs_ = NULL; - } - if (mode & BasicDB::OWRITER) { - cache_ = new TinyHashMap(cbnum > 0 ? cbnum : DEFCBNUM); - } else { - cache_ = NULL; - } - clim_ = clim > 0 ? clim : DEFCLIM; - csiz_ = 0; - omode_ = mode; - return true; - } - /** - * Close the database file. - * @return true on success, or false on failure. - */ - bool close() { - _assert_(true); - // ScopedRWLock lock(&mlock_, true); - if (omode_ == 0) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); - return false; - } - bool err = false; - if (cache_) { - if (!flush_cache()) err = true; - delete cache_; - if (tmpdbs_) { - if (!merge_tmpdbs()) err = true; - report(_KCCODELINE_, "closing the temporary databases"); - double stime = time(); - for (size_t i = 0; i < dbnum_; i++) { - BasicDB* tmpdb = tmpdbs_[i]; - const std::string& path = tmpdb->path(); - if (!tmpdb->close()) { - const BasicDB::Error& e = tmpdb->error(); - set_error(_KCCODELINE_, e.code(), e.message()); - err = true; - } - if (!tmppath_.empty()) File::remove(path); - delete tmpdb; - } - double etime = time(); - report(_KCCODELINE_, "closing the temporary databases finished: %.6f", etime - stime); - delete[] tmpdbs_; - } - } - if (!db_.close()) err = true; - omode_ = 0; - return !err; - } - /** - * Set the value of a record. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @param vbuf the pointer to the value region. - * @param vsiz the size of the value region. - * @return true on success, or false on failure. - * @note If no record corresponds to the key, a new record is created. If the corresponding - * record exists, the value is overwritten. - */ - bool set(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { - _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); - // ScopedRWLock lock(&mlock_, true); - if (omode_ == 0) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); - return false; - } - if (!cache_) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); - return false; - } - bool err = false; - if (!clean_dbs(kbuf, ksiz)) err = true; - cache_->set(kbuf, ksiz, vbuf, vsiz); - csiz_ += ksiz + vsiz; - if (csiz_ > clim_ && !flush_cache()) err = false; - return !err; - } - /** - * Set the value of a record. - * @note Equal to the original DB::set method except that the parameters are std::string. - */ - bool set(const std::string& key, const std::string& value) { - _assert_(true); - return set(key.c_str(), key.size(), value.c_str(), value.size()); - } - /** - * Add a record. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @param vbuf the pointer to the value region. - * @param vsiz the size of the value region. - * @return true on success, or false on failure. - * @note If no record corresponds to the key, a new record is created. If the corresponding - * record exists, the record is not modified and false is returned. - */ - bool add(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { - _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); - // ScopedRWLock lock(&mlock_, true); - if (omode_ == 0) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); - return false; - } - if (!cache_) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); - return false; - } - if (check_impl(kbuf, ksiz)) { - set_error(_KCCODELINE_, BasicDB::Error::DUPREC, "record duplication"); - return false; - } - bool err = false; - cache_->set(kbuf, ksiz, vbuf, vsiz); - csiz_ += ksiz + vsiz; - if (csiz_ > clim_ && !flush_cache()) err = false; - return !err; - } - /** - * Set the value of a record. - * @note Equal to the original DB::add method except that the parameters are std::string. - */ - bool add(const std::string& key, const std::string& value) { - _assert_(true); - return add(key.c_str(), key.size(), value.c_str(), value.size()); - } - /** - * Replace the value of a record. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @param vbuf the pointer to the value region. - * @param vsiz the size of the value region. - * @return true on success, or false on failure. - * @note If no record corresponds to the key, no new record is created and false is returned. - * If the corresponding record exists, the value is modified. - */ - bool replace(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { - _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); - // ScopedRWLock lock(&mlock_, true); - if (omode_ == 0) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); - return false; - } - if (!cache_) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); - return false; - } - if (!check_impl(kbuf, ksiz)) { - set_error(_KCCODELINE_, BasicDB::Error::NOREC, "no record"); - return false; - } - bool err = false; - if (!clean_dbs(kbuf, ksiz)) err = true; - cache_->set(kbuf, ksiz, vbuf, vsiz); - csiz_ += ksiz + vsiz; - if (csiz_ > clim_ && !flush_cache()) err = false; - return !err; - } - /** - * Replace the value of a record. - * @note Equal to the original DB::replace method except that the parameters are std::string. - */ - bool replace(const std::string& key, const std::string& value) { - _assert_(true); - return replace(key.c_str(), key.size(), value.c_str(), value.size()); - } - /** - * Append the value of a record. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @param vbuf the pointer to the value region. - * @param vsiz the size of the value region. - * @return true on success, or false on failure. - * @note If no record corresponds to the key, a new record is created. If the corresponding - * record exists, the given value is appended at the end of the existing value. - */ - bool append(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { - _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); - // ScopedRWLock lock(&mlock_, true); - if (omode_ == 0) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); - return false; - } - if (!cache_) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); - return false; - } - bool err = false; - cache_->append(kbuf, ksiz, vbuf, vsiz); - csiz_ += ksiz + vsiz; - if (csiz_ > clim_ && !flush_cache()) err = false; - return !err; - } - /** - * Set the value of a record. - * @note Equal to the original DB::append method except that the parameters are std::string. - */ - bool append(const std::string& key, const std::string& value) { - _assert_(true); - return append(key.c_str(), key.size(), value.c_str(), value.size()); - } - /** - * Remove a record. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @return true on success, or false on failure. - * @note If no record corresponds to the key, false is returned. - */ - bool remove(const char* kbuf, size_t ksiz) { - _assert_(kbuf && ksiz <= MEMMAXSIZ); - // ScopedRWLock lock(&mlock_, true); - if (omode_ == 0) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); - return false; - } - if (!cache_) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); - return false; - } - bool err = false; - if (!clean_dbs(kbuf, ksiz)) err = true; - cache_->remove(kbuf, ksiz); - return !err; - } - /** - * Remove a record. - * @note Equal to the original DB::remove method except that the parameter is std::string. - */ - bool remove(const std::string& key) { - _assert_(true); - return remove(key.c_str(), key.size()); - } - /** - * Retrieve the value of a record. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @param sp the pointer to the variable into which the size of the region of the return - * value is assigned. - * @return the pointer to the value region of the corresponding record, or NULL on failure. - * @note If no record corresponds to the key, NULL is returned. Because an additional zero - * code is appended at the end of the region of the return value, the return value can be - * treated as a C-style string. Because the region of the return value is allocated with the - * the new[] operator, it should be released with the delete[] operator when it is no longer - * in use. - */ - char* get(const char* kbuf, size_t ksiz, size_t* sp) { - _assert_(kbuf && ksiz <= MEMMAXSIZ && sp); - // ScopedRWLock lock(&mlock_, false); - if (omode_ == 0) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); - *sp = 0; - return false; - } - if (!cache_) return db_.get(kbuf, ksiz, sp); - size_t dvsiz = 0; - char* dvbuf = db_.get(kbuf, ksiz, &dvsiz); - size_t cvsiz = 0; - const char* cvbuf = cache_->get(kbuf, ksiz, &cvsiz); - struct Record { - char* buf; - size_t size; - }; - Record* recs = NULL; - bool hit = false; - size_t rsiz = 0; - if (tmpdbs_) { - recs = new Record[dbnum_]; - for (size_t i = 0; i < dbnum_; i++) { - BasicDB* tmpdb = tmpdbs_[i]; - Record* rp = recs + i; - rp->buf = tmpdb->get(kbuf, ksiz, &rp->size); - if (rp->buf) { - rsiz += rp->size; - hit = true; - } - } - } - if (!hit) { - delete[] recs; - if (!dvbuf && !cvbuf) { - *sp = 0; - return NULL; - } - if (!dvbuf) { - dvbuf = new char[cvsiz+1]; - std::memcpy(dvbuf, cvbuf, cvsiz); - *sp = cvsiz; - return dvbuf; - } - if (!cvbuf) { - *sp = dvsiz; - return dvbuf; - } - char* rbuf = new char[dvsiz+cvsiz+1]; - std::memcpy(rbuf, dvbuf, dvsiz); - std::memcpy(rbuf + dvsiz, cvbuf, cvsiz); - delete[] dvbuf; - *sp = dvsiz + cvsiz; - return rbuf; - } - if (dvbuf) rsiz += dvsiz; - if (cvbuf) rsiz += cvsiz; - char* rbuf = new char[rsiz+1]; - char* wp = rbuf; - if (dvbuf) { - std::memcpy(wp, dvbuf, dvsiz); - wp += dvsiz; - delete[] dvbuf; - } - if (cvbuf) { - std::memcpy(wp, cvbuf, cvsiz); - wp += cvsiz; - } - if (recs) { - for (size_t i = 0; i < dbnum_; i++) { - Record* rp = recs + i; - if (rp->buf) { - std::memcpy(wp, rp->buf, rp->size); - wp += rp->size; - delete[] rp->buf; - } - } - delete[] recs; - } - *sp = rsiz; - return rbuf; - } - /** - * Retrieve the value of a record. - * @note Equal to the original DB::get method except that the first parameters is the key - * string and the second parameter is a string to contain the result and the return value is - * bool for success. - */ - bool get(const std::string& key, std::string* value) { - _assert_(value); - size_t vsiz; - char* vbuf = get(key.c_str(), key.size(), &vsiz); - if (!vbuf) return false; - value->clear(); - value->append(vbuf, vsiz); - delete[] vbuf; - return true; - } - /** - * Synchronize updated contents with the file and the device. - * @param hard true for physical synchronization with the device, or false for logical - * synchronization with the file system. - * @param proc a postprocessor object. If it is NULL, no postprocessing is performed. - * @return true on success, or false on failure. - * @note The operation of the postprocessor is performed atomically and other threads accessing - * the same record are blocked. To avoid deadlock, any explicit database operation must not - * be performed in this function. - */ - bool synchronize(bool hard = false, BasicDB::FileProcessor* proc = NULL) { - _assert_(true); - // ScopedRWLock lock(&mlock_, true); - if (omode_ == 0) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); - return false; - } - if (!cache_) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); - return false; - } - bool err = false; - if (!flush_cache()) err = true; - if (tmpdbs_ && !merge_tmpdbs()) err = true; - if (!db_.synchronize(hard, proc)) err = true; - return !err; - } - /** - * Remove all records. - * @return true on success, or false on failure. - */ - bool clear() { - _assert_(true); - // ScopedRWLock lock(&mlock_, true); - if (omode_ == 0) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); - return false; - } - if (!cache_) { - set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); - return false; - } - cache_->clear(); - csiz_ = 0; - return db_.clear(); - } - /** - * Get the number of records. - * @return the number of records, or -1 on failure. - */ - int64_t count() { - _assert_(true); - // ScopedRWLock lock(&mlock_, false); - return count_impl(); - } - /** - * Get the size of the database file. - * @return the size of the database file in bytes, or -1 on failure. - */ - int64_t size() { - _assert_(true); - // ScopedRWLock lock(&mlock_, false); - return size_impl(); - } - /** - * Get the path of the database file. - * @return the path of the database file, or an empty string on failure. - */ - std::string path() { - _assert_(true); - return db_.path(); - } - /** - * Get the miscellaneous status information. - * @param strmap a string map to contain the result. - * @return true on success, or false on failure. - */ - bool status(std::map<std::string, std::string>* strmap) { - _assert_(strmap); - return db_.status(strmap); - } - /** - * Reveal the inner database object. - * @return the inner database object, or NULL on failure. - */ - PolyDB* reveal_inner_db() { - _assert_(true); - return &db_; - } - /** - * Create a cursor object. - * @return the return value is the created cursor object. - * @note Because the object of the return value is allocated by the constructor, it should be - * released with the delete operator when it is no longer in use. - */ - BasicDB::Cursor* cursor() { - _assert_(true); - return db_.cursor(); - } - /** - * Write a log message. - * @param file the file name of the program source code. - * @param line the line number of the program source code. - * @param func the function name of the program source code. - * @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal - * information, Logger::WARN for warning, and Logger::ERROR for fatal error. - * @param message the supplement message. - */ - void log(const char* file, int32_t line, const char* func, BasicDB::Logger::Kind kind, - const char* message) { - _assert_(file && line > 0 && func && message); - db_.log(file, line, func, kind, message); - } - /** - * Set the internal logger. - * @param logger the logger object. - * @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging, - * Logger::INFO for normal information, Logger::WARN for warning, and Logger::ERROR for fatal - * error. - * @return true on success, or false on failure. - */ - bool tune_logger(BasicDB::Logger* logger, - uint32_t kinds = BasicDB::Logger::WARN | BasicDB::Logger::ERROR) { - _assert_(logger); - return db_.tune_logger(logger, kinds); - } - /** - * Set the internal meta operation trigger. - * @param trigger the trigger object. - * @return true on success, or false on failure. - */ - bool tune_meta_trigger(BasicDB::MetaTrigger* trigger) { - _assert_(trigger); - return db_.tune_meta_trigger(trigger); - } - protected: - /** - * Report a message for debugging. - * @param file the file name of the program source code. - * @param line the line number of the program source code. - * @param func the function name of the program source code. - * @param format the printf-like format string. - * @param ... used according to the format string. - */ - void report(const char* file, int32_t line, const char* func, const char* format, ...) { - _assert_(file && line > 0 && func && format); - std::string message; - va_list ap; - va_start(ap, format); - vstrprintf(&message, format, ap); - va_end(ap); - db_.log(file, line, func, BasicDB::Logger::INFO, message.c_str()); - } - private: - /** - * Flush all cache records. - * @return true on success, or false on failure. - */ - bool flush_cache() { - _assert_(true); - bool err = false; - double stime = time(); - report(_KCCODELINE_, "flushing the cache"); - if (tmpdbs_) { - BasicDB* tmpdb = tmpdbs_[dbclock_]; - TinyHashMap::Sorter sorter(cache_); - const char* kbuf, *vbuf; - size_t ksiz, vsiz; - while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { - if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) { - const BasicDB::Error& e = tmpdb->error(); - db_.set_error(_KCCODELINE_, e.code(), e.message()); - err = true; - } - sorter.step(); - } - dbclock_ = (dbclock_ + 1) % dbnum_; - } else { - TinyHashMap::Sorter sorter(cache_); - const char* kbuf, *vbuf; - size_t ksiz, vsiz; - while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { - if (!db_.append(kbuf, ksiz, vbuf, vsiz)) err = true; - sorter.step(); - } - } - cache_->clear(); - csiz_ = 0; - double etime = time(); - report(_KCCODELINE_, "flushing the cache finished: time=%.6f", etime - stime); - return !err; - } - /** - * Merge temporary databases. - * @return true on success, or false on failure. - */ - bool merge_tmpdbs() { - _assert_(true); - bool err = false; - report(_KCCODELINE_, "merging the temporary databases"); - double stime = time(); - if (!db_.merge(tmpdbs_, dbnum_, PolyDB::MAPPEND)) err = true; - dbclock_ = 0; - for (size_t i = 0; i < dbnum_; i++) { - BasicDB* tmpdb = tmpdbs_[i]; - if (!tmpdb->clear()) { - const BasicDB::Error& e = tmpdb->error(); - set_error(_KCCODELINE_, e.code(), e.message()); - err = true; - } - } - double etime = time(); - report(_KCCODELINE_, "merging the temporary databases finished: %.6f", etime - stime); - return !err; - } - /** - * Remove a record from databases. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @return true on success, or false on failure. - */ - bool clean_dbs(const char* kbuf, size_t ksiz) { - _assert_(kbuf && ksiz <= MEMMAXSIZ); - if (db_.remove(kbuf, ksiz)) return true; - bool err = false; - if (db_.error() != BasicDB::Error::NOREC) err = true; - if (tmpdbs_) { - for (size_t i = 0; i < dbnum_; i++) { - BasicDB* tmpdb = tmpdbs_[i]; - if (!tmpdb->remove(kbuf, ksiz)) { - const BasicDB::Error& e = tmpdb->error(); - if (e != BasicDB::Error::NOREC) { - set_error(_KCCODELINE_, e.code(), e.message()); - err = true; - } - } - } - } - return !err; - } - /** - * Check whether a record exists. - * @param kbuf the pointer to the key region. - * @param ksiz the size of the key region. - * @return true if the record exists, or false if not. - */ - bool check_impl(const char* kbuf, size_t ksiz) { - _assert_(kbuf && ksiz <= MEMMAXSIZ); - char vbuf; - if (db_.get(kbuf, ksiz, &vbuf, 1) >= 0) return true; - if (cache_) { - size_t vsiz; - if (cache_->get(kbuf, ksiz, &vsiz)) return true; - if (tmpdbs_) { - for (size_t i = 0; i < dbnum_; i++) { - BasicDB* tmpdb = tmpdbs_[i]; - if (tmpdb->get(kbuf, ksiz, &vbuf, 1)) return true; - } - } - } - return false; - } - /** - * Get the number of records. - * @return the number of records, or -1 on failure. - */ - int64_t count_impl() { - _assert_(true); - int64_t dbcnt = db_.count(); - if (dbcnt < 0) return -1; - if (!cache_) return dbcnt; - int64_t ccnt = cache_->count(); - return dbcnt > ccnt ? dbcnt : ccnt; - } - /** - * Get the size of the database file. - * @return the size of the database file in bytes. - */ - int64_t size_impl() { - _assert_(true); - int64_t dbsiz = db_.size(); - if (dbsiz < 0) return -1; - return dbsiz + csiz_; - } - /** Dummy constructor to forbid the use. */ - IndexDB(const IndexDB&); - /** Dummy Operator to forbid the use. */ - IndexDB& operator =(const IndexDB&); - /** The internal database. */ - PolyDB db_; - /** The open mode. */ - uint32_t omode_; - /** The record comparator. */ - Comparator* rcomp_; - /** The base path of temporary databases. */ - std::string tmppath_; - /** The temporary databases. */ - BasicDB** tmpdbs_; - /** The number of temporary databases. */ - size_t dbnum_; - /** The logical clock for temporary databases. */ - int64_t dbclock_; - /** The internal cache. */ - TinyHashMap* cache_; - /** The current size of the internal cache. */ - int64_t csiz_; - /** The limit size of the internal cache. */ - int64_t clim_; -}; - - -} // common namespace - -#endif // duplication check - -// END OF FILE |
