diff options
Diffstat (limited to 'plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h')
-rw-r--r-- | plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h | 1690 |
1 files changed, 1690 insertions, 0 deletions
diff --git a/plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h b/plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h new file mode 100644 index 0000000000..001c09a1d4 --- /dev/null +++ b/plugins/Dbx_kyoto/src/kyotocabinet/kcdbext.h @@ -0,0 +1,1690 @@ +/************************************************************************************************* + * Database extension + * Copyright (C) 2009-2012 FAL Labs + * This file is part of Kyoto Cabinet. + * This program is free software: you can redistribute it and/or modify it under the terms of + * the GNU General Public License as published by the Free Software Foundation, either version + * 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * You should have received a copy of the GNU General Public License along with this program. + * If not, see <http://www.gnu.org/licenses/>. + *************************************************************************************************/ + + +#ifndef _KCDBEXT_H // duplication check +#define _KCDBEXT_H + +#include <kccommon.h> +#include <kcutil.h> +#include <kcthread.h> +#include <kcfile.h> +#include <kccompress.h> +#include <kccompare.h> +#include <kcmap.h> +#include <kcregex.h> +#include <kcdb.h> +#include <kcplantdb.h> +#include <kcprotodb.h> +#include <kcstashdb.h> +#include <kccachedb.h> +#include <kchashdb.h> +#include <kcdirdb.h> +#include <kcpolydb.h> + +namespace kyotocabinet { // common namespace + + +/** + * MapReduce framework. + * @note Although this framework is not distributed or concurrent, it is useful for aggregate + * calculation with less CPU loading and less memory usage. + */ +class MapReduce { + public: + class ValueIterator; + private: + class FlushThread; + class ReduceTaskQueue; + class MapVisitor; + struct MergeLine; + /** An alias of vector of loaded values. */ + typedef std::vector<std::string> Values; + /** The default number of temporary databases. */ + static const size_t DEFDBNUM = 8; + /** The maxinum number of temporary databases. */ + static const size_t MAXDBNUM = 256; + /** The default cache limit. */ + static const int64_t DEFCLIM = 512LL << 20; + /** The default cache bucket numer. */ + static const int64_t DEFCBNUM = 1048583LL; + /** The bucket number of temprary databases. */ + static const int64_t DBBNUM = 512LL << 10; + /** The page size of temprary databases. */ + static const int32_t DBPSIZ = 32768; + /** The mapped size of temprary databases. */ + static const int64_t DBMSIZ = 516LL * 4096; + /** The page cache capacity of temprary databases. */ + static const int64_t DBPCCAP = 16LL << 20; + /** The default number of threads in parallel mode. */ + static const size_t DEFTHNUM = 8; + /** The number of slots of the record lock. */ + static const int32_t RLOCKSLOT = 256; + public: + /** + * Value iterator for the reducer. + */ + class ValueIterator { + friend class MapReduce; + public: + /** + * Get the next value. + * @param sp the pointer to the variable into which the size of the region of the return + * value is assigned. + * @return the pointer to the next value region, or NULL if no value remains. + */ + const char* next(size_t* sp) { + _assert_(sp); + if (!vptr_) { + if (vit_ == vend_) return NULL; + vptr_ = vit_->data(); + vsiz_ = vit_->size(); + vit_++; + } + uint64_t vsiz; + size_t step = readvarnum(vptr_, vsiz_, &vsiz); + vptr_ += step; + vsiz_ -= step; + const char* vbuf = vptr_; + *sp = vsiz; + vptr_ += vsiz; + vsiz_ -= vsiz; + if (vsiz_ < 1) vptr_ = NULL; + return vbuf; + } + private: + /** + * Default constructor. + */ + explicit ValueIterator(Values::const_iterator vit, Values::const_iterator vend) : + vit_(vit), vend_(vend), vptr_(NULL), vsiz_(0) { + _assert_(true); + } + /** + * Destructor. + */ + ~ValueIterator() { + _assert_(true); + } + /** Dummy constructor to forbid the use. */ + ValueIterator(const ValueIterator&); + /** Dummy Operator to forbid the use. */ + ValueIterator& operator =(const ValueIterator&); + /** The current iterator of loaded values. */ + Values::const_iterator vit_; + /** The ending iterator of loaded values. */ + Values::const_iterator vend_; + /** The pointer of the current value. */ + const char* vptr_; + /** The size of the current value. */ + size_t vsiz_; + }; + /** + * Execution options. + */ + enum Option { + XNOLOCK = 1 << 0, ///< avoid locking against update operations + XPARAMAP = 1 << 1, ///< run mappers in parallel + XPARARED = 1 << 2, ///< run reducers in parallel + XPARAFLS = 1 << 3, ///< run cache flushers in parallel + XNOCOMP = 1 << 8 ///< avoid compression of temporary databases + }; + /** + * Default constructor. + */ + explicit MapReduce() : + db_(NULL), rcomp_(NULL), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0), + mapthnum_(DEFTHNUM), redthnum_(DEFTHNUM), flsthnum_(DEFTHNUM), + cache_(NULL), csiz_(0), clim_(DEFCLIM), cbnum_(DEFCBNUM), flsths_(NULL), + redtasks_(NULL), redaborted_(false), rlocks_(NULL) { + _assert_(true); + } + /** + * Destructor. + */ + virtual ~MapReduce() { + _assert_(true); + } + /** + * Map a record data. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @param vbuf the pointer to the value region. + * @param vsiz the size of the value region. + * @return true on success, or false on failure. + * @note This function can call the MapReduce::emit method to emit a record. To avoid + * deadlock, any explicit database operation must not be performed in this function. + */ + virtual bool map(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) = 0; + /** + * Reduce a record data. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @param iter the iterator to get the values. + * @return true on success, or false on failure. + * @note To avoid deadlock, any explicit database operation must not be performed in this + * function. + */ + virtual bool reduce(const char* kbuf, size_t ksiz, ValueIterator* iter) = 0; + /** + * Preprocess the map operations. + * @return true on success, or false on failure. + * @note This function can call the MapReduce::emit method to emit a record. To avoid + * deadlock, any explicit database operation must not be performed in this function. + */ + virtual bool preprocess() { + _assert_(true); + return true; + } + /** + * Mediate between the map and the reduce phases. + * @return true on success, or false on failure. + * @note This function can call the MapReduce::emit method to emit a record. To avoid + * deadlock, any explicit database operation must not be performed in this function. + */ + virtual bool midprocess() { + _assert_(true); + return true; + } + /** + * Postprocess the reduce operations. + * @return true on success, or false on failure. + * @note To avoid deadlock, any explicit database operation must not be performed in this + * function. + */ + virtual bool postprocess() { + _assert_(true); + return true; + } + /** + * Process a log message. + * @param name the name of the event. + * @param message a supplement message. + * @return true on success, or false on failure. + */ + virtual bool log(const char* name, const char* message) { + _assert_(name && message); + return true; + } + /** + * Execute the MapReduce process about a database. + * @param db the source database. + * @param tmppath the path of a directory for the temporary data storage. If it is an empty + * string, temporary data are handled on memory. + * @param opts the optional features by bitwise-or: MapReduce::XNOLOCK to avoid locking + * against update operations by other threads, MapReduce::XPARAMAP to run the mapper in + * parallel, MapReduce::XPARARED to run the reducer in parallel, MapReduce::XNOCOMP to avoid + * compression of temporary databases. + * @return true on success, or false on failure. + */ + bool execute(BasicDB* db, const std::string& tmppath = "", uint32_t opts = 0) { + int64_t count = db->count(); + if (count < 0) { + if (db->error() != BasicDB::Error::NOIMPL) return false; + count = 0; + } + bool err = false; + double stime, etime; + db_ = db; + rcomp_ = LEXICALCOMP; + BasicDB* idb = db; + if (typeid(*db) == typeid(PolyDB)) { + PolyDB* pdb = (PolyDB*)idb; + idb = pdb->reveal_inner_db(); + } + const std::type_info& info = typeid(*idb); + if (info == typeid(GrassDB)) { + GrassDB* gdb = (GrassDB*)idb; + rcomp_ = gdb->rcomp(); + } else if (info == typeid(TreeDB)) { + TreeDB* tdb = (TreeDB*)idb; + rcomp_ = tdb->rcomp(); + } else if (info == typeid(ForestDB)) { + ForestDB* fdb = (ForestDB*)idb; + rcomp_ = fdb->rcomp(); + } + tmpdbs_ = new BasicDB*[dbnum_]; + if (tmppath.empty()) { + if (!logf("prepare", "started to open temporary databases on memory")) err = true; + stime = time(); + for (size_t i = 0; i < dbnum_; i++) { + GrassDB* gdb = new GrassDB; + int32_t myopts = 0; + if (!(opts & XNOCOMP)) myopts |= GrassDB::TCOMPRESS; + gdb->tune_options(myopts); + gdb->tune_buckets(DBBNUM / 2); + gdb->tune_page(DBPSIZ); + gdb->tune_page_cache(DBPCCAP); + gdb->tune_comparator(rcomp_); + gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUNCATE); + tmpdbs_[i] = gdb; + } + etime = time(); + if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime)) + err = true; + if (err) { + delete[] tmpdbs_; + return false; + } + } else { + File::Status sbuf; + if (!File::status(tmppath, &sbuf) || !sbuf.isdir) { + db->set_error(_KCCODELINE_, BasicDB::Error::NOREPOS, "no such directory"); + delete[] tmpdbs_; + return false; + } + if (!logf("prepare", "started to open temporary databases under %s", tmppath.c_str())) + err = true; + stime = time(); + uint32_t pid = getpid() & UINT16MAX; + uint32_t tid = Thread::hash() & UINT16MAX; + uint32_t ts = time() * 1000; + for (size_t i = 0; i < dbnum_; i++) { + std::string childpath = + strprintf("%s%cmr-%04x-%04x-%08x-%03d%ckct", + tmppath.c_str(), File::PATHCHR, pid, tid, ts, (int)(i + 1), File::EXTCHR); + TreeDB* tdb = new TreeDB; + int32_t myopts = TreeDB::TSMALL | TreeDB::TLINEAR; + if (!(opts & XNOCOMP)) myopts |= TreeDB::TCOMPRESS; + tdb->tune_options(myopts); + tdb->tune_buckets(DBBNUM); + tdb->tune_page(DBPSIZ); + tdb->tune_map(DBMSIZ); + tdb->tune_page_cache(DBPCCAP); + tdb->tune_comparator(rcomp_); + if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeDB::OTRUNCATE)) { + const BasicDB::Error& e = tdb->error(); + db->set_error(_KCCODELINE_, e.code(), e.message()); + err = true; + } + tmpdbs_[i] = tdb; + } + etime = time(); + if (!logf("prepare", "opening temporary databases finished: time=%.6f", etime - stime)) + err = true; + if (err) { + for (size_t i = 0; i < dbnum_; i++) { + delete tmpdbs_[i]; + } + delete[] tmpdbs_; + return false; + } + } + if (opts & XPARARED) redtasks_ = new ReduceTaskQueue; + if (opts & XPARAFLS) flsths_ = new std::deque<FlushThread*>; + if (opts & XNOLOCK) { + MapChecker mapchecker; + MapVisitor mapvisitor(this, &mapchecker, count); + mapvisitor.visit_before(); + if (!err) { + BasicDB::Cursor* cur = db->cursor(); + if (!cur->jump() && cur->error() != BasicDB::Error::NOREC) err = true; + while (!err) { + if (!cur->accept(&mapvisitor, false, true)) { + if (cur->error() != BasicDB::Error::NOREC) err = true; + break; + } + } + delete cur; + } + if (mapvisitor.error()) { + db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"); + err = true; + } + mapvisitor.visit_after(); + } else if (opts & XPARAMAP) { + MapChecker mapchecker; + MapVisitor mapvisitor(this, &mapchecker, count); + rlocks_ = new SlottedMutex(RLOCKSLOT); + if (!err && !db->scan_parallel(&mapvisitor, mapthnum_, &mapchecker)) { + db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"); + err = true; + } + delete rlocks_; + rlocks_ = NULL; + if (mapvisitor.error()) err = true; + } else { + MapChecker mapchecker; + MapVisitor mapvisitor(this, &mapchecker, count); + if (!err && !db->iterate(&mapvisitor, false, &mapchecker)) err = true; + if (mapvisitor.error()) { + db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "mapper failed"); + err = true; + } + } + if (flsths_) { + delete flsths_; + flsths_ = NULL; + } + if (redtasks_) { + delete redtasks_; + redtasks_ = NULL; + } + if (!logf("clean", "closing the temporary databases")) err = true; + stime = time(); + for (size_t i = 0; i < dbnum_; i++) { + const std::string& path = tmpdbs_[i]->path(); + if (!tmpdbs_[i]->clear()) { + const BasicDB::Error& e = tmpdbs_[i]->error(); + db->set_error(_KCCODELINE_, e.code(), e.message()); + err = true; + } + if (!tmpdbs_[i]->close()) { + const BasicDB::Error& e = tmpdbs_[i]->error(); + db->set_error(_KCCODELINE_, e.code(), e.message()); + err = true; + } + if (!tmppath.empty()) File::remove(path); + delete tmpdbs_[i]; + } + etime = time(); + if (!logf("clean", "closing the temporary databases finished: time=%.6f", + etime - stime)) err = true; + delete[] tmpdbs_; + return !err; + } + /** + * Set the storage configurations. + * @param dbnum the number of temporary databases. + * @param clim the limit size of the internal cache. + * @param cbnum the bucket number of the internal cache. + */ + void tune_storage(int32_t dbnum, int64_t clim, int64_t cbnum) { + _assert_(true); + dbnum_ = dbnum > 0 ? dbnum : DEFDBNUM; + if (dbnum_ > MAXDBNUM) dbnum_ = MAXDBNUM; + clim_ = clim > 0 ? clim : DEFCLIM; + cbnum_ = cbnum > 0 ? cbnum : DEFCBNUM; + if (cbnum_ > INT16MAX) cbnum_ = nearbyprime(cbnum_); + } + /** + * Set the thread configurations. + * @param mapthnum the number of threads for the mapper. + * @param redthnum the number of threads for the reducer. + * @param flsthnum the number of threads for the internal flusher. + */ + void tune_thread(int32_t mapthnum, int32_t redthnum, int32_t flsthnum) { + _assert_(true); + mapthnum_ = mapthnum > 0 ? mapthnum : DEFTHNUM; + redthnum_ = redthnum > 0 ? redthnum : DEFTHNUM; + flsthnum_ = flsthnum > 0 ? flsthnum : DEFTHNUM; + } + protected: + /** + * Emit a record from the mapper. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @param vbuf the pointer to the value region. + * @param vsiz the size of the value region. + * @return true on success, or false on failure. + */ + bool emit(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { + _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); + bool err = false; + size_t rsiz = sizevarnum(vsiz) + vsiz; + char stack[NUMBUFSIZ*4]; + char* rbuf = rsiz > sizeof(stack) ? new char[rsiz] : stack; + char* wp = rbuf; + wp += writevarnum(rbuf, vsiz); + std::memcpy(wp, vbuf, vsiz); + if (rlocks_) { + size_t bidx = TinyHashMap::hash_record(kbuf, ksiz) % cbnum_; + size_t lidx = bidx % RLOCKSLOT; + rlocks_->lock(lidx); + cache_->append(kbuf, ksiz, rbuf, rsiz); + rlocks_->unlock(lidx); + } else { + cache_->append(kbuf, ksiz, rbuf, rsiz); + } + if (rbuf != stack) delete[] rbuf; + csiz_ += sizevarnum(ksiz) + ksiz + rsiz; + return !err; + } + private: + /** + * Cache flusher. + */ + class FlushThread : public Thread { + public: + /** constructor */ + explicit FlushThread(MapReduce* mr, BasicDB* tmpdb, + TinyHashMap* cache, size_t csiz, bool cown) : + mr_(mr), tmpdb_(tmpdb), cache_(cache), csiz_(csiz), cown_(cown), err_(false) {} + /** perform the concrete process */ + void run() { + if (!mr_->logf("map", "started to flushing the cache: count=%lld size=%lld", + (long long)cache_->count(), (long long)csiz_)) err_ = true; + double stime = time(); + BasicDB* tmpdb = tmpdb_; + TinyHashMap* cache = cache_; + bool cown = cown_; + TinyHashMap::Sorter sorter(cache); + const char* kbuf, *vbuf; + size_t ksiz, vsiz; + while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { + if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) { + const BasicDB::Error& e = tmpdb->error(); + mr_->db_->set_error(_KCCODELINE_, e.code(), e.message()); + err_ = true; + } + sorter.step(); + if (cown) cache->remove(kbuf, ksiz); + } + double etime = time(); + if (!mr_->logf("map", "flushing the cache finished: time=%.6f", etime - stime)) + err_ = true; + if (cown) delete cache; + } + /** check the error flag. */ + bool error() { + return err_; + } + private: + MapReduce* mr_; ///< driver + BasicDB* tmpdb_; ///< temprary database + TinyHashMap* cache_; ///< cache for emitter + size_t csiz_; ///< current cache size + bool cown_; ///< cache ownership flag + bool err_; ///< error flag + }; + /** + * Task queue for parallel reducer. + */ + class ReduceTaskQueue : public TaskQueue { + public: + /** + * Task for parallel reducer. + */ + class ReduceTask : public Task { + friend class ReduceTaskQueue; + public: + /** constructor */ + explicit ReduceTask(MapReduce* mr, const char* kbuf, size_t ksiz, const Values& values) : + mr_(mr), key_(kbuf, ksiz), values_(values) {} + private: + MapReduce* mr_; ///< driver + std::string key_; ///< key + Values values_; ///< values + }; + /** constructor */ + explicit ReduceTaskQueue() {} + private: + /** process a task */ + void do_task(Task* task) { + ReduceTask* rtask = (ReduceTask*)task; + ValueIterator iter(rtask->values_.begin(), rtask->values_.end()); + if (!rtask->mr_->reduce(rtask->key_.data(), rtask->key_.size(), &iter)) + rtask->mr_->redaborted_ = true; + delete rtask; + } + }; + /** + * Checker for the map process. + */ + class MapChecker : public BasicDB::ProgressChecker { + public: + /** constructor */ + explicit MapChecker() : stop_(false) {} + /** stop the process */ + void stop() { + stop_ = true; + } + /** check whether stopped */ + bool stopped() { + return stop_; + } + private: + /** check whether stopped */ + bool check(const char* name, const char* message, int64_t curcnt, int64_t allcnt) { + return !stop_; + } + bool stop_; ///< flag for stop + }; + /** + * Visitor for the map process. + */ + class MapVisitor : public BasicDB::Visitor { + public: + /** constructor */ + explicit MapVisitor(MapReduce* mr, MapChecker* checker, int64_t scale) : + mr_(mr), checker_(checker), scale_(scale), stime_(0), err_(false) {} + /** get the error flag */ + bool error() { + return err_; + } + /** preprocess the mappter */ + void visit_before() { + mr_->dbclock_ = 0; + mr_->cache_ = new TinyHashMap(mr_->cbnum_); + mr_->csiz_ = 0; + if (!mr_->preprocess()) err_ = true; + if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; + if (!mr_->logf("map", "started the map process: scale=%lld", (long long)scale_)) + err_ = true; + stime_ = time(); + } + /** postprocess the mappter and call the reducer */ + void visit_after() { + if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; + double etime = time(); + if (!mr_->logf("map", "the map process finished: time=%.6f", etime - stime_)) + err_ = true; + if (!mr_->midprocess()) err_ = true; + if (mr_->csiz_ > 0 && !mr_->flush_cache()) err_ = true; + delete mr_->cache_; + if (mr_->flsths_ && !mr_->flsths_->empty()) { + std::deque<FlushThread*>::iterator flthit = mr_->flsths_->begin(); + std::deque<FlushThread*>::iterator flthitend = mr_->flsths_->end(); + while (flthit != flthitend) { + FlushThread* flth = *flthit; + flth->join(); + if (flth->error()) err_ = true; + delete flth; + ++flthit; + } + } + if (!err_ && !mr_->execute_reduce()) err_ = true; + if (!mr_->postprocess()) err_ = true; + } + private: + /** visit a record */ + const char* visit_full(const char* kbuf, size_t ksiz, + const char* vbuf, size_t vsiz, size_t* sp) { + if (!mr_->map(kbuf, ksiz, vbuf, vsiz)) { + checker_->stop(); + err_ = true; + } + if (mr_->rlocks_) { + if (mr_->csiz_ >= mr_->clim_) { + mr_->rlocks_->lock_all(); + if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { + checker_->stop(); + err_ = true; + } + mr_->rlocks_->unlock_all(); + } + } else { + if (mr_->csiz_ >= mr_->clim_ && !mr_->flush_cache()) { + checker_->stop(); + err_ = true; + } + } + return NOP; + } + MapReduce* mr_; ///< driver + MapChecker* checker_; ///< checker + int64_t scale_; ///< number of records + double stime_; ///< start time + bool err_; ///< error flag + }; + /** + * Front line of a merging list. + */ + struct MergeLine { + BasicDB::Cursor* cur; ///< cursor + Comparator* rcomp; ///< record comparator + char* kbuf; ///< pointer to the key + size_t ksiz; ///< size of the key + const char* vbuf; ///< pointer to the value + size_t vsiz; ///< size of the value + /** comparing operator */ + bool operator <(const MergeLine& right) const { + return rcomp->compare(kbuf, ksiz, right.kbuf, right.ksiz) > 0; + } + }; + /** + * Process a log message. + * @param name the name of the event. + * @param format the printf-like format string. + * @param ... used according to the format string. + * @return true on success, or false on failure. + */ + bool logf(const char* name, const char* format, ...) { + _assert_(name && format); + va_list ap; + va_start(ap, format); + std::string message; + vstrprintf(&message, format, ap); + va_end(ap); + return log(name, message.c_str()); + } + /** + * Flush all cache records. + * @return true on success, or false on failure. + */ + bool flush_cache() { + _assert_(true); + bool err = false; + BasicDB* tmpdb = tmpdbs_[dbclock_]; + dbclock_ = (dbclock_ + 1) % dbnum_; + if (flsths_) { + size_t num = flsths_->size(); + if (num >= flsthnum_ || num >= dbnum_) { + FlushThread* flth = flsths_->front(); + flsths_->pop_front(); + flth->join(); + if (flth->error()) err = true; + delete flth; + } + FlushThread* flth = new FlushThread(this, tmpdb, cache_, csiz_, true); + cache_ = new TinyHashMap(cbnum_); + csiz_ = 0; + flth->start(); + flsths_->push_back(flth); + } else { + FlushThread flth(this, tmpdb, cache_, csiz_, false); + flth.run(); + if (flth.error()) err = true; + cache_->clear(); + csiz_ = 0; + } + return !err; + } + /** + * Execute the reduce part. + * @return true on success, or false on failure. + */ + bool execute_reduce() { + bool err = false; + int64_t scale = 0; + for (size_t i = 0; i < dbnum_; i++) { + scale += tmpdbs_[i]->count(); + } + if (!logf("reduce", "started the reduce process: scale=%lld", (long long)scale)) err = true; + double stime = time(); + if (redtasks_) redtasks_->start(redthnum_); + std::priority_queue<MergeLine> lines; + for (size_t i = 0; i < dbnum_; i++) { + MergeLine line; + line.cur = tmpdbs_[i]->cursor(); + line.rcomp = rcomp_; + line.cur->jump(); + line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); + if (line.kbuf) { + lines.push(line); + } else { + delete line.cur; + } + } + char* lkbuf = NULL; + size_t lksiz = 0; + Values values; + while (!err && !lines.empty()) { + MergeLine line = lines.top(); + lines.pop(); + if (lkbuf && (lksiz != line.ksiz || std::memcmp(lkbuf, line.kbuf, lksiz))) { + if (!call_reducer(lkbuf, lksiz, values)) { + db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed"); + err = true; + } + values.clear(); + } + delete[] lkbuf; + lkbuf = line.kbuf; + lksiz = line.ksiz; + values.push_back(std::string(line.vbuf, line.vsiz)); + line.kbuf = line.cur->get(&line.ksiz, &line.vbuf, &line.vsiz, true); + if (line.kbuf) { + lines.push(line); + } else { + delete line.cur; + } + } + if (lkbuf) { + if (!err && !call_reducer(lkbuf, lksiz, values)) { + db_->set_error(_KCCODELINE_, BasicDB::Error::LOGIC, "reducer failed"); + err = true; + } + delete[] lkbuf; + } + while (!lines.empty()) { + MergeLine line = lines.top(); + lines.pop(); + delete[] line.kbuf; + delete line.cur; + } + if (redtasks_) redtasks_->finish(); + double etime = time(); + if (!logf("reduce", "the reduce process finished: time=%.6f", etime - stime)) err = true; + return !err; + } + /** + * Call the reducer. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @param values a vector of the values. + * @return true on success, or false on failure. + */ + bool call_reducer(const char* kbuf, size_t ksiz, const Values& values) { + _assert_(kbuf && ksiz <= MEMMAXSIZ); + if (redtasks_) { + if (redaborted_) return false; + ReduceTaskQueue::ReduceTask* task = + new ReduceTaskQueue::ReduceTask(this, kbuf, ksiz, values); + redtasks_->add_task(task); + return true; + } + bool err = false; + ValueIterator iter(values.begin(), values.end()); + if (!reduce(kbuf, ksiz, &iter)) err = true; + return !err; + } + /** Dummy constructor to forbid the use. */ + MapReduce(const MapReduce&); + /** Dummy Operator to forbid the use. */ + MapReduce& operator =(const MapReduce&); + /** The internal database. */ + BasicDB* db_; + /** The record comparator. */ + Comparator* rcomp_; + /** The temporary databases. */ + BasicDB** tmpdbs_; + /** The number of temporary databases. */ + size_t dbnum_; + /** The logical clock for temporary databases. */ + int64_t dbclock_; + /** The number of the mapper threads. */ + size_t mapthnum_; + /** The number of the reducer threads. */ + size_t redthnum_; + /** The number of the flusher threads. */ + size_t flsthnum_; + /** The cache for emitter. */ + TinyHashMap* cache_; + /** The current size of the cache for emitter. */ + AtomicInt64 csiz_; + /** The limit size of the cache for emitter. */ + int64_t clim_; + /** The bucket number of the cache for emitter. */ + int64_t cbnum_; + /** The flush threads. */ + std::deque<FlushThread*>* flsths_; + /** The task queue for parallel reducer. */ + TaskQueue* redtasks_; + /** The flag whether aborted. */ + bool redaborted_; + /** The whole lock. */ + SlottedMutex* rlocks_; +}; + + +/** + * Index database. + * @note This class is designed to implement an indexing storage with an efficient appending + * operation for the existing record values. This class is a wrapper of the polymorphic + * database, featuring buffering mechanism to alleviate IO overhead in the database layer. This + * class can be inherited but overwriting methods is forbidden. Before every database operation, + * it is necessary to call the IndexDB::open method in order to open a database file and connect + * the database object to it. To avoid data missing or corruption, it is important to close + * every database file by the IndexDB::close method when the database is no longer in use. It + * is forbidden for multible database objects in a process to open the same database at the same + * time. It is forbidden to share a database object with child processes. + */ +class IndexDB { + private: + /** The default number of temporary databases. */ + static const size_t DEFDBNUM = 8; + /** The maxinum number of temporary databases. */ + static const size_t MAXDBNUM = 256; + /** The default cache limit size. */ + static const int64_t DEFCLIM = 256LL << 20; + /** The default cache bucket number. */ + static const int64_t DEFCBNUM = 1048583LL; + /** The bucket number of temprary databases. */ + static const int64_t DBBNUM = 512LL << 10; + /** The page size of temprary databases. */ + static const int32_t DBPSIZ = 32768; + /** The mapped size of temprary databases. */ + static const int64_t DBMSIZ = 516LL * 4096; + /** The page cache capacity of temprary databases. */ + static const int64_t DBPCCAP = 16LL << 20; + public: + /** + * Default constructor. + */ + explicit IndexDB() : + mlock_(), db_(), omode_(0), + rcomp_(NULL), tmppath_(""), tmpdbs_(NULL), dbnum_(DEFDBNUM), dbclock_(0), + cache_(NULL), csiz_(0), clim_(0) { + _assert_(true); + } + /** + * Destructor. + * @note If the database is not closed, it is closed implicitly. + */ + virtual ~IndexDB() { + _assert_(true); + if (omode_ != 0) close(); + } + /** + * Get the last happened error. + * @return the last happened error. + */ + BasicDB::Error error() const { + _assert_(true); + return db_.error(); + } + /** + * Set the error information. + * @param file the file name of the program source code. + * @param line the line number of the program source code. + * @param func the function name of the program source code. + * @param code an error code. + * @param message a supplement message. + */ + void set_error(const char* file, int32_t line, const char* func, + BasicDB::Error::Code code, const char* message) { + _assert_(file && line > 0 && func && message); + db_.set_error(file, line, func, code, message); + } + /** + * Set the error information without source code information. + * @param code an error code. + * @param message a supplement message. + */ + void set_error(BasicDB::Error::Code code, const char* message) { + _assert_(message); + db_.set_error(_KCCODELINE_, code, message); + } + /** + * Open a database file. + * @param path the path of a database file. The same as with PolyDB. In addition, the + * following tuning parameters are supported. "idxclim" specifies the limit size of the + * internal cache. "idxcbnum" the bucket number of the internal cache. "idxdbnum" specifies + * the number of internal databases. "idxtmppath' specifies the path of the temporary + * directory. + * @param mode the connection mode. The same as with PolyDB. + * @return true on success, or false on failure. + * @note Every opened database must be closed by the IndexDB::close method when it is no longer + * in use. It is not allowed for two or more database objects in the same process to keep + * their connections to the same database file at the same time. + */ + bool open(const std::string& path = ":", + uint32_t mode = BasicDB::OWRITER | BasicDB::OCREATE) { + _assert_(true); + ScopedRWLock lock(&mlock_, true); + if (omode_ != 0) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "already opened"); + return false; + } + std::vector<std::string> elems; + strsplit(path, '#', &elems); + int64_t clim = 0; + int64_t cbnum = 0; + size_t dbnum = 0; + std::string tmppath = ""; + std::vector<std::string>::iterator it = elems.begin(); + std::vector<std::string>::iterator itend = elems.end(); + if (it != itend) ++it; + while (it != itend) { + std::vector<std::string> fields; + if (strsplit(*it, '=', &fields) > 1) { + const char* key = fields[0].c_str(); + const char* value = fields[1].c_str(); + if (!std::strcmp(key, "idxclim") || !std::strcmp(key, "idxcachelimit")) { + clim = atoix(value); + } else if (!std::strcmp(key, "idxcbnum") || !std::strcmp(key, "idxcachebuckets")) { + cbnum = atoix(value); + } else if (!std::strcmp(key, "idxdbnum")) { + dbnum = atoix(value); + } else if (!std::strcmp(key, "idxtmppath")) { + tmppath = value; + } + } + ++it; + } + if (!db_.open(path, mode)) return false; + tmppath_ = tmppath; + rcomp_ = LEXICALCOMP; + BasicDB* idb = &db_; + if (typeid(db_) == typeid(PolyDB)) { + PolyDB* pdb = (PolyDB*)idb; + idb = pdb->reveal_inner_db(); + } + const std::type_info& info = typeid(*idb); + if (info == typeid(GrassDB)) { + GrassDB* gdb = (GrassDB*)idb; + rcomp_ = gdb->rcomp(); + } else if (info == typeid(TreeDB)) { + TreeDB* tdb = (TreeDB*)idb; + rcomp_ = tdb->rcomp(); + } else if (info == typeid(ForestDB)) { + ForestDB* fdb = (ForestDB*)idb; + rcomp_ = fdb->rcomp(); + } + dbnum_ = dbnum < MAXDBNUM ? dbnum : MAXDBNUM; + dbclock_ = 0; + if ((mode & BasicDB::OWRITER) && dbnum > 0) { + tmpdbs_ = new BasicDB*[dbnum_]; + if (tmppath_.empty()) { + report(_KCCODELINE_, "started to open temporary databases on memory"); + double stime = time(); + for (size_t i = 0; i < dbnum_; i++) { + GrassDB* gdb = new GrassDB; + gdb->tune_options(GrassDB::TCOMPRESS); + gdb->tune_buckets(DBBNUM / 2); + gdb->tune_page(DBPSIZ); + gdb->tune_page_cache(DBPCCAP); + gdb->tune_comparator(rcomp_); + gdb->open("%", GrassDB::OWRITER | GrassDB::OCREATE | GrassDB::OTRUNCATE); + tmpdbs_[i] = gdb; + } + double etime = time(); + report(_KCCODELINE_, "opening temporary databases finished: time=%.6f", etime - stime); + } else { + File::Status sbuf; + if (!File::status(tmppath_, &sbuf) || !sbuf.isdir) { + set_error(_KCCODELINE_, BasicDB::Error::NOREPOS, "no such directory"); + delete[] tmpdbs_; + db_.close(); + return false; + } + report(_KCCODELINE_, "started to open temporary databases under %s", tmppath.c_str()); + double stime = time(); + uint32_t pid = getpid() & UINT16MAX; + uint32_t tid = Thread::hash() & UINT16MAX; + uint32_t ts = time() * 1000; + bool err = false; + for (size_t i = 0; i < dbnum_; i++) { + std::string childpath = + strprintf("%s%cidx-%04x-%04x-%08x-%03d%ckct", + tmppath_.c_str(), File::PATHCHR, pid, tid, ts, + (int)(i + 1), File::EXTCHR); + TreeDB* tdb = new TreeDB; + tdb->tune_options(TreeDB::TSMALL | TreeDB::TLINEAR); + tdb->tune_buckets(DBBNUM); + tdb->tune_page(DBPSIZ); + tdb->tune_map(DBMSIZ); + tdb->tune_page_cache(DBPCCAP); + tdb->tune_comparator(rcomp_); + if (!tdb->open(childpath, TreeDB::OWRITER | TreeDB::OCREATE | TreeDB::OTRUNCATE)) { + const BasicDB::Error& e = tdb->error(); + set_error(_KCCODELINE_, e.code(), e.message()); + err = true; + } + tmpdbs_[i] = tdb; + } + double etime = time(); + report(_KCCODELINE_, "opening temporary databases finished: time=%.6f", etime - stime); + if (err) { + for (size_t i = 0; i < dbnum_; i++) { + delete tmpdbs_[i]; + } + delete[] tmpdbs_; + db_.close(); + return false; + } + } + } else { + tmpdbs_ = NULL; + } + if (mode & BasicDB::OWRITER) { + cache_ = new TinyHashMap(cbnum > 0 ? cbnum : DEFCBNUM); + } else { + cache_ = NULL; + } + clim_ = clim > 0 ? clim : DEFCLIM; + csiz_ = 0; + omode_ = mode; + return true; + } + /** + * Close the database file. + * @return true on success, or false on failure. + */ + bool close() { + _assert_(true); + ScopedRWLock lock(&mlock_, true); + if (omode_ == 0) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); + return false; + } + bool err = false; + if (cache_) { + if (!flush_cache()) err = true; + delete cache_; + if (tmpdbs_) { + if (!merge_tmpdbs()) err = true; + report(_KCCODELINE_, "closing the temporary databases"); + double stime = time(); + for (size_t i = 0; i < dbnum_; i++) { + BasicDB* tmpdb = tmpdbs_[i]; + const std::string& path = tmpdb->path(); + if (!tmpdb->close()) { + const BasicDB::Error& e = tmpdb->error(); + set_error(_KCCODELINE_, e.code(), e.message()); + err = true; + } + if (!tmppath_.empty()) File::remove(path); + delete tmpdb; + } + double etime = time(); + report(_KCCODELINE_, "closing the temporary databases finished: %.6f", etime - stime); + delete[] tmpdbs_; + } + } + if (!db_.close()) err = true; + omode_ = 0; + return !err; + } + /** + * Set the value of a record. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @param vbuf the pointer to the value region. + * @param vsiz the size of the value region. + * @return true on success, or false on failure. + * @note If no record corresponds to the key, a new record is created. If the corresponding + * record exists, the value is overwritten. + */ + bool set(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { + _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); + ScopedRWLock lock(&mlock_, true); + if (omode_ == 0) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); + return false; + } + if (!cache_) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); + return false; + } + bool err = false; + if (!clean_dbs(kbuf, ksiz)) err = true; + cache_->set(kbuf, ksiz, vbuf, vsiz); + csiz_ += ksiz + vsiz; + if (csiz_ > clim_ && !flush_cache()) err = false; + return !err; + } + /** + * Set the value of a record. + * @note Equal to the original DB::set method except that the parameters are std::string. + */ + bool set(const std::string& key, const std::string& value) { + _assert_(true); + return set(key.c_str(), key.size(), value.c_str(), value.size()); + } + /** + * Add a record. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @param vbuf the pointer to the value region. + * @param vsiz the size of the value region. + * @return true on success, or false on failure. + * @note If no record corresponds to the key, a new record is created. If the corresponding + * record exists, the record is not modified and false is returned. + */ + bool add(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { + _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); + ScopedRWLock lock(&mlock_, true); + if (omode_ == 0) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); + return false; + } + if (!cache_) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); + return false; + } + if (check_impl(kbuf, ksiz)) { + set_error(_KCCODELINE_, BasicDB::Error::DUPREC, "record duplication"); + return false; + } + bool err = false; + cache_->set(kbuf, ksiz, vbuf, vsiz); + csiz_ += ksiz + vsiz; + if (csiz_ > clim_ && !flush_cache()) err = false; + return !err; + } + /** + * Set the value of a record. + * @note Equal to the original DB::add method except that the parameters are std::string. + */ + bool add(const std::string& key, const std::string& value) { + _assert_(true); + return add(key.c_str(), key.size(), value.c_str(), value.size()); + } + /** + * Replace the value of a record. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @param vbuf the pointer to the value region. + * @param vsiz the size of the value region. + * @return true on success, or false on failure. + * @note If no record corresponds to the key, no new record is created and false is returned. + * If the corresponding record exists, the value is modified. + */ + bool replace(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { + _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); + ScopedRWLock lock(&mlock_, true); + if (omode_ == 0) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); + return false; + } + if (!cache_) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); + return false; + } + if (!check_impl(kbuf, ksiz)) { + set_error(_KCCODELINE_, BasicDB::Error::NOREC, "no record"); + return false; + } + bool err = false; + if (!clean_dbs(kbuf, ksiz)) err = true; + cache_->set(kbuf, ksiz, vbuf, vsiz); + csiz_ += ksiz + vsiz; + if (csiz_ > clim_ && !flush_cache()) err = false; + return !err; + } + /** + * Replace the value of a record. + * @note Equal to the original DB::replace method except that the parameters are std::string. + */ + bool replace(const std::string& key, const std::string& value) { + _assert_(true); + return replace(key.c_str(), key.size(), value.c_str(), value.size()); + } + /** + * Append the value of a record. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @param vbuf the pointer to the value region. + * @param vsiz the size of the value region. + * @return true on success, or false on failure. + * @note If no record corresponds to the key, a new record is created. If the corresponding + * record exists, the given value is appended at the end of the existing value. + */ + bool append(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) { + _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ); + ScopedRWLock lock(&mlock_, true); + if (omode_ == 0) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); + return false; + } + if (!cache_) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); + return false; + } + bool err = false; + cache_->append(kbuf, ksiz, vbuf, vsiz); + csiz_ += ksiz + vsiz; + if (csiz_ > clim_ && !flush_cache()) err = false; + return !err; + } + /** + * Set the value of a record. + * @note Equal to the original DB::append method except that the parameters are std::string. + */ + bool append(const std::string& key, const std::string& value) { + _assert_(true); + return append(key.c_str(), key.size(), value.c_str(), value.size()); + } + /** + * Remove a record. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @return true on success, or false on failure. + * @note If no record corresponds to the key, false is returned. + */ + bool remove(const char* kbuf, size_t ksiz) { + _assert_(kbuf && ksiz <= MEMMAXSIZ); + ScopedRWLock lock(&mlock_, true); + if (omode_ == 0) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); + return false; + } + if (!cache_) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); + return false; + } + bool err = false; + if (!clean_dbs(kbuf, ksiz)) err = true; + cache_->remove(kbuf, ksiz); + return !err; + } + /** + * Remove a record. + * @note Equal to the original DB::remove method except that the parameter is std::string. + */ + bool remove(const std::string& key) { + _assert_(true); + return remove(key.c_str(), key.size()); + } + /** + * Retrieve the value of a record. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @param sp the pointer to the variable into which the size of the region of the return + * value is assigned. + * @return the pointer to the value region of the corresponding record, or NULL on failure. + * @note If no record corresponds to the key, NULL is returned. Because an additional zero + * code is appended at the end of the region of the return value, the return value can be + * treated as a C-style string. Because the region of the return value is allocated with the + * the new[] operator, it should be released with the delete[] operator when it is no longer + * in use. + */ + char* get(const char* kbuf, size_t ksiz, size_t* sp) { + _assert_(kbuf && ksiz <= MEMMAXSIZ && sp); + ScopedRWLock lock(&mlock_, false); + if (omode_ == 0) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); + *sp = 0; + return false; + } + if (!cache_) return db_.get(kbuf, ksiz, sp); + size_t dvsiz = 0; + char* dvbuf = db_.get(kbuf, ksiz, &dvsiz); + size_t cvsiz = 0; + const char* cvbuf = cache_->get(kbuf, ksiz, &cvsiz); + struct Record { + char* buf; + size_t size; + }; + Record* recs = NULL; + bool hit = false; + size_t rsiz = 0; + if (tmpdbs_) { + recs = new Record[dbnum_]; + for (size_t i = 0; i < dbnum_; i++) { + BasicDB* tmpdb = tmpdbs_[i]; + Record* rp = recs + i; + rp->buf = tmpdb->get(kbuf, ksiz, &rp->size); + if (rp->buf) { + rsiz += rp->size; + hit = true; + } + } + } + if (!hit) { + delete[] recs; + if (!dvbuf && !cvbuf) { + *sp = 0; + return NULL; + } + if (!dvbuf) { + dvbuf = new char[cvsiz+1]; + std::memcpy(dvbuf, cvbuf, cvsiz); + *sp = cvsiz; + return dvbuf; + } + if (!cvbuf) { + *sp = dvsiz; + return dvbuf; + } + char* rbuf = new char[dvsiz+cvsiz+1]; + std::memcpy(rbuf, dvbuf, dvsiz); + std::memcpy(rbuf + dvsiz, cvbuf, cvsiz); + delete[] dvbuf; + *sp = dvsiz + cvsiz; + return rbuf; + } + if (dvbuf) rsiz += dvsiz; + if (cvbuf) rsiz += cvsiz; + char* rbuf = new char[rsiz+1]; + char* wp = rbuf; + if (dvbuf) { + std::memcpy(wp, dvbuf, dvsiz); + wp += dvsiz; + delete[] dvbuf; + } + if (cvbuf) { + std::memcpy(wp, cvbuf, cvsiz); + wp += cvsiz; + } + if (recs) { + for (size_t i = 0; i < dbnum_; i++) { + Record* rp = recs + i; + if (rp->buf) { + std::memcpy(wp, rp->buf, rp->size); + wp += rp->size; + delete[] rp->buf; + } + } + delete[] recs; + } + *sp = rsiz; + return rbuf; + } + /** + * Retrieve the value of a record. + * @note Equal to the original DB::get method except that the first parameters is the key + * string and the second parameter is a string to contain the result and the return value is + * bool for success. + */ + bool get(const std::string& key, std::string* value) { + _assert_(value); + size_t vsiz; + char* vbuf = get(key.c_str(), key.size(), &vsiz); + if (!vbuf) return false; + value->clear(); + value->append(vbuf, vsiz); + delete[] vbuf; + return true; + } + /** + * Synchronize updated contents with the file and the device. + * @param hard true for physical synchronization with the device, or false for logical + * synchronization with the file system. + * @param proc a postprocessor object. If it is NULL, no postprocessing is performed. + * @return true on success, or false on failure. + * @note The operation of the postprocessor is performed atomically and other threads accessing + * the same record are blocked. To avoid deadlock, any explicit database operation must not + * be performed in this function. + */ + bool synchronize(bool hard = false, BasicDB::FileProcessor* proc = NULL) { + _assert_(true); + ScopedRWLock lock(&mlock_, true); + if (omode_ == 0) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); + return false; + } + if (!cache_) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); + return false; + } + bool err = false; + if (!flush_cache()) err = true; + if (tmpdbs_ && !merge_tmpdbs()) err = true; + if (!db_.synchronize(hard, proc)) err = true; + return !err; + } + /** + * Remove all records. + * @return true on success, or false on failure. + */ + bool clear() { + _assert_(true); + ScopedRWLock lock(&mlock_, true); + if (omode_ == 0) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "not opened"); + return false; + } + if (!cache_) { + set_error(_KCCODELINE_, BasicDB::Error::INVALID, "permission denied"); + return false; + } + cache_->clear(); + csiz_ = 0; + return db_.clear(); + } + /** + * Get the number of records. + * @return the number of records, or -1 on failure. + */ + int64_t count() { + _assert_(true); + ScopedRWLock lock(&mlock_, false); + return count_impl(); + } + /** + * Get the size of the database file. + * @return the size of the database file in bytes, or -1 on failure. + */ + int64_t size() { + _assert_(true); + ScopedRWLock lock(&mlock_, false); + return size_impl(); + } + /** + * Get the path of the database file. + * @return the path of the database file, or an empty string on failure. + */ + std::string path() { + _assert_(true); + return db_.path(); + } + /** + * Get the miscellaneous status information. + * @param strmap a string map to contain the result. + * @return true on success, or false on failure. + */ + bool status(std::map<std::string, std::string>* strmap) { + _assert_(strmap); + return db_.status(strmap); + } + /** + * Reveal the inner database object. + * @return the inner database object, or NULL on failure. + */ + PolyDB* reveal_inner_db() { + _assert_(true); + return &db_; + } + /** + * Create a cursor object. + * @return the return value is the created cursor object. + * @note Because the object of the return value is allocated by the constructor, it should be + * released with the delete operator when it is no longer in use. + */ + BasicDB::Cursor* cursor() { + _assert_(true); + return db_.cursor(); + } + /** + * Write a log message. + * @param file the file name of the program source code. + * @param line the line number of the program source code. + * @param func the function name of the program source code. + * @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal + * information, Logger::WARN for warning, and Logger::ERROR for fatal error. + * @param message the supplement message. + */ + void log(const char* file, int32_t line, const char* func, BasicDB::Logger::Kind kind, + const char* message) { + _assert_(file && line > 0 && func && message); + db_.log(file, line, func, kind, message); + } + /** + * Set the internal logger. + * @param logger the logger object. + * @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging, + * Logger::INFO for normal information, Logger::WARN for warning, and Logger::ERROR for fatal + * error. + * @return true on success, or false on failure. + */ + bool tune_logger(BasicDB::Logger* logger, + uint32_t kinds = BasicDB::Logger::WARN | BasicDB::Logger::ERROR) { + _assert_(logger); + return db_.tune_logger(logger, kinds); + } + /** + * Set the internal meta operation trigger. + * @param trigger the trigger object. + * @return true on success, or false on failure. + */ + bool tune_meta_trigger(BasicDB::MetaTrigger* trigger) { + _assert_(trigger); + return db_.tune_meta_trigger(trigger); + } + protected: + /** + * Report a message for debugging. + * @param file the file name of the program source code. + * @param line the line number of the program source code. + * @param func the function name of the program source code. + * @param format the printf-like format string. + * @param ... used according to the format string. + */ + void report(const char* file, int32_t line, const char* func, const char* format, ...) { + _assert_(file && line > 0 && func && format); + std::string message; + va_list ap; + va_start(ap, format); + vstrprintf(&message, format, ap); + va_end(ap); + db_.log(file, line, func, BasicDB::Logger::INFO, message.c_str()); + } + private: + /** + * Flush all cache records. + * @return true on success, or false on failure. + */ + bool flush_cache() { + _assert_(true); + bool err = false; + double stime = time(); + report(_KCCODELINE_, "flushing the cache"); + if (tmpdbs_) { + BasicDB* tmpdb = tmpdbs_[dbclock_]; + TinyHashMap::Sorter sorter(cache_); + const char* kbuf, *vbuf; + size_t ksiz, vsiz; + while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { + if (!tmpdb->append(kbuf, ksiz, vbuf, vsiz)) { + const BasicDB::Error& e = tmpdb->error(); + db_.set_error(_KCCODELINE_, e.code(), e.message()); + err = true; + } + sorter.step(); + } + dbclock_ = (dbclock_ + 1) % dbnum_; + } else { + TinyHashMap::Sorter sorter(cache_); + const char* kbuf, *vbuf; + size_t ksiz, vsiz; + while ((kbuf = sorter.get(&ksiz, &vbuf, &vsiz)) != NULL) { + if (!db_.append(kbuf, ksiz, vbuf, vsiz)) err = true; + sorter.step(); + } + } + cache_->clear(); + csiz_ = 0; + double etime = time(); + report(_KCCODELINE_, "flushing the cache finished: time=%.6f", etime - stime); + return !err; + } + /** + * Merge temporary databases. + * @return true on success, or false on failure. + */ + bool merge_tmpdbs() { + _assert_(true); + bool err = false; + report(_KCCODELINE_, "merging the temporary databases"); + double stime = time(); + if (!db_.merge(tmpdbs_, dbnum_, PolyDB::MAPPEND)) err = true; + dbclock_ = 0; + for (size_t i = 0; i < dbnum_; i++) { + BasicDB* tmpdb = tmpdbs_[i]; + if (!tmpdb->clear()) { + const BasicDB::Error& e = tmpdb->error(); + set_error(_KCCODELINE_, e.code(), e.message()); + err = true; + } + } + double etime = time(); + report(_KCCODELINE_, "merging the temporary databases finished: %.6f", etime - stime); + return !err; + } + /** + * Remove a record from databases. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @return true on success, or false on failure. + */ + bool clean_dbs(const char* kbuf, size_t ksiz) { + _assert_(kbuf && ksiz <= MEMMAXSIZ); + if (db_.remove(kbuf, ksiz)) return true; + bool err = false; + if (db_.error() != BasicDB::Error::NOREC) err = true; + if (tmpdbs_) { + for (size_t i = 0; i < dbnum_; i++) { + BasicDB* tmpdb = tmpdbs_[i]; + if (!tmpdb->remove(kbuf, ksiz)) { + const BasicDB::Error& e = tmpdb->error(); + if (e != BasicDB::Error::NOREC) { + set_error(_KCCODELINE_, e.code(), e.message()); + err = true; + } + } + } + } + return !err; + } + /** + * Check whether a record exists. + * @param kbuf the pointer to the key region. + * @param ksiz the size of the key region. + * @return true if the record exists, or false if not. + */ + bool check_impl(const char* kbuf, size_t ksiz) { + _assert_(kbuf && ksiz <= MEMMAXSIZ); + char vbuf; + if (db_.get(kbuf, ksiz, &vbuf, 1) >= 0) return true; + if (cache_) { + size_t vsiz; + if (cache_->get(kbuf, ksiz, &vsiz)) return true; + if (tmpdbs_) { + for (size_t i = 0; i < dbnum_; i++) { + BasicDB* tmpdb = tmpdbs_[i]; + if (tmpdb->get(kbuf, ksiz, &vbuf, 1)) return true; + } + } + } + return false; + } + /** + * Get the number of records. + * @return the number of records, or -1 on failure. + */ + int64_t count_impl() { + _assert_(true); + int64_t dbcnt = db_.count(); + if (dbcnt < 0) return -1; + if (!cache_) return dbcnt; + int64_t ccnt = cache_->count(); + return dbcnt > ccnt ? dbcnt : ccnt; + } + /** + * Get the size of the database file. + * @return the size of the database file in bytes. + */ + int64_t size_impl() { + _assert_(true); + int64_t dbsiz = db_.size(); + if (dbsiz < 0) return -1; + return dbsiz + csiz_; + } + /** Dummy constructor to forbid the use. */ + IndexDB(const IndexDB&); + /** Dummy Operator to forbid the use. */ + IndexDB& operator =(const IndexDB&); + /** The method lock. */ + RWLock mlock_; + /** The internal database. */ + PolyDB db_; + /** The open mode. */ + uint32_t omode_; + /** The record comparator. */ + Comparator* rcomp_; + /** The base path of temporary databases. */ + std::string tmppath_; + /** The temporary databases. */ + BasicDB** tmpdbs_; + /** The number of temporary databases. */ + size_t dbnum_; + /** The logical clock for temporary databases. */ + int64_t dbclock_; + /** The internal cache. */ + TinyHashMap* cache_; + /** The current size of the internal cache. */ + int64_t csiz_; + /** The limit size of the internal cache. */ + int64_t clim_; +}; + + +} // common namespace + +#endif // duplication check + +// END OF FILE |