/************************************************************************************************* * Cache hash database * Copyright (C) 2009-2012 FAL Labs * This file is part of Kyoto Cabinet. * This program is free software: you can redistribute it and/or modify it under the terms of * the GNU General Public License as published by the Free Software Foundation, either version * 3 of the License, or any later version. * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU General Public License for more details. * You should have received a copy of the GNU General Public License along with this program. * If not, see . *************************************************************************************************/ #ifndef _KCCACHEDB_H // duplication check #define _KCCACHEDB_H #include #include #include #include #include #include #include #include #include #include namespace kyotocabinet { // common namespace /** * On-memory hash database with LRU deletion. * @note This class is a concrete class to operate a hash database on memory. This class can be * inherited but overwriting methods is forbidden. Before every database operation, it is * necessary to call the CacheDB::open method in order to open a database file and connect the * database object to it. To avoid data missing or corruption, it is important to close every * database file by the CacheDB::close method when the database is no longer in use. It is * forbidden for multible database objects in a process to open the same database at the same * time. It is forbidden to share a database object with child processes. */ class CacheDB : public BasicDB { friend class PlantDB; public: class Cursor; private: struct Record; struct TranLog; struct Slot; class Repeater; class Setter; class Remover; class ScopedVisitor; /** An alias of list of cursors. */ typedef std::list CursorList; /** An alias of list of transaction logs. */ typedef std::list TranLogList; /** The number of slot tables. */ static const int32_t SLOTNUM = 16; /** The default bucket number. */ static const size_t DEFBNUM = 1048583LL; /** The mininum number of buckets to use mmap. */ static const size_t ZMAPBNUM = 32768; /** The maximum size of each key. */ static const uint32_t KSIZMAX = 0xfffff; /** The size of the record buffer. */ static const size_t RECBUFSIZ = 48; /** The size of the opaque buffer. */ static const size_t OPAQUESIZ = 16; /** The threshold of busy loop and sleep for locking. */ static const uint32_t LOCKBUSYLOOP = 8192; public: /** * Cursor to indicate a record. */ class Cursor : public BasicDB::Cursor { friend class CacheDB; public: /** * Constructor. * @param db the container database object. */ explicit Cursor(CacheDB* db) : db_(db), sidx_(-1), rec_(NULL) { _assert_(db); ScopedRWLock lock(&db_->mlock_, true); db_->curs_.push_back(this); } /** * Destructor. */ virtual ~Cursor() { _assert_(true); if (!db_) return; ScopedRWLock lock(&db_->mlock_, true); db_->curs_.remove(this); } /** * Accept a visitor to the current record. * @param visitor a visitor object. * @param writable true for writable operation, or false for read-only operation. * @param step true to move the cursor to the next record, or false for no move. * @return true on success, or false on failure. * @note The operation for each record is performed atomically and other threads accessing * the same record are blocked. To avoid deadlock, any explicit database operation must not * be performed in this function. */ bool accept(Visitor* visitor, bool writable = true, bool step = false) { _assert_(visitor); ScopedRWLock lock(&db_->mlock_, true); if (db_->omode_ == 0) { db_->set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } if (writable && !(db_->omode_ & OWRITER)) { db_->set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); return false; } if (sidx_ < 0 || !rec_) { db_->set_error(_KCCODELINE_, Error::NOREC, "no record"); return false; } uint32_t rksiz = rec_->ksiz & KSIZMAX; char* dbuf = (char*)rec_ + sizeof(*rec_); const char* rvbuf = dbuf + rksiz; size_t rvsiz = rec_->vsiz; char* zbuf = NULL; size_t zsiz = 0; if (db_->comp_) { zbuf = db_->comp_->decompress(rvbuf, rvsiz, &zsiz); if (zbuf) { rvbuf = zbuf; rvsiz = zsiz; } } size_t vsiz; const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vsiz); delete[] zbuf; if (vbuf == Visitor::REMOVE) { uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; Slot* slot = db_->slots_ + sidx_; Repeater repeater(Visitor::REMOVE, 0); db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, false); } else if (vbuf == Visitor::NOP) { if (step) step_impl(); } else { uint64_t hash = db_->hash_record(dbuf, rksiz) / SLOTNUM; Slot* slot = db_->slots_ + sidx_; Repeater repeater(vbuf, vsiz); db_->accept_impl(slot, hash, dbuf, rksiz, &repeater, db_->comp_, false); if (step) step_impl(); } return true; } /** * Jump the cursor to the first record for forward scan. * @return true on success, or false on failure. */ bool jump() { _assert_(true); ScopedRWLock lock(&db_->mlock_, true); if (db_->omode_ == 0) { db_->set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } for (int32_t i = 0; i < SLOTNUM; i++) { Slot* slot = db_->slots_ + i; if (slot->first) { sidx_ = i; rec_ = slot->first; return true; } } db_->set_error(_KCCODELINE_, Error::NOREC, "no record"); sidx_ = -1; rec_ = NULL; return false; } /** * Jump the cursor to a record for forward scan. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @return true on success, or false on failure. */ bool jump(const char* kbuf, size_t ksiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ); ScopedRWLock lock(&db_->mlock_, true); if (db_->omode_ == 0) { db_->set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } if (ksiz > KSIZMAX) ksiz = KSIZMAX; uint64_t hash = db_->hash_record(kbuf, ksiz); int32_t sidx = hash % SLOTNUM; hash /= SLOTNUM; Slot* slot = db_->slots_ + sidx; size_t bidx = hash % slot->bnum; Record* rec = slot->buckets[bidx]; Record** entp = slot->buckets + bidx; uint32_t fhash = db_->fold_hash(hash) & ~KSIZMAX; while (rec) { uint32_t rhash = rec->ksiz & ~KSIZMAX; uint32_t rksiz = rec->ksiz & KSIZMAX; if (fhash > rhash) { entp = &rec->left; rec = rec->left; } else if (fhash < rhash) { entp = &rec->right; rec = rec->right; } else { char* dbuf = (char*)rec + sizeof(*rec); int32_t kcmp = db_->compare_keys(kbuf, ksiz, dbuf, rksiz); if (kcmp < 0) { entp = &rec->left; rec = rec->left; } else if (kcmp > 0) { entp = &rec->right; rec = rec->right; } else { sidx_ = sidx; rec_ = rec; return true; } } } db_->set_error(_KCCODELINE_, Error::NOREC, "no record"); sidx_ = -1; rec_ = NULL; return false; } /** * Jump the cursor to a record for forward scan. * @note Equal to the original Cursor::jump method except that the parameter is std::string. */ bool jump(const std::string& key) { _assert_(true); return jump(key.c_str(), key.size()); } /** * Jump the cursor to the last record for backward scan. * @note This is a dummy implementation for compatibility. */ bool jump_back() { _assert_(true); ScopedRWLock lock(&db_->mlock_, true); if (db_->omode_ == 0) { db_->set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented"); return false; } /** * Jump the cursor to a record for backward scan. * @note This is a dummy implementation for compatibility. */ bool jump_back(const char* kbuf, size_t ksiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ); ScopedRWLock lock(&db_->mlock_, true); if (db_->omode_ == 0) { db_->set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented"); return false; } /** * Jump the cursor to a record for backward scan. * @note This is a dummy implementation for compatibility. */ bool jump_back(const std::string& key) { _assert_(true); ScopedRWLock lock(&db_->mlock_, true); if (db_->omode_ == 0) { db_->set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented"); return false; } /** * Step the cursor to the next record. * @return true on success, or false on failure. */ bool step() { _assert_(true); ScopedRWLock lock(&db_->mlock_, true); if (db_->omode_ == 0) { db_->set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } if (sidx_ < 0 || !rec_) { db_->set_error(_KCCODELINE_, Error::NOREC, "no record"); return false; } bool err = false; if (!step_impl()) err = true; return !err; } /** * Step the cursor to the previous record. * @note This is a dummy implementation for compatibility. */ bool step_back() { _assert_(true); ScopedRWLock lock(&db_->mlock_, true); if (db_->omode_ == 0) { db_->set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } db_->set_error(_KCCODELINE_, Error::NOIMPL, "not implemented"); return false; } /** * Get the database object. * @return the database object. */ CacheDB* db() { _assert_(true); return db_; } private: /** * Step the cursor to the next record. * @return true on success, or false on failure. */ bool step_impl() { _assert_(true); rec_ = rec_->next; if (!rec_) { for (int32_t i = sidx_ + 1; i < SLOTNUM; i++) { Slot* slot = db_->slots_ + i; if (slot->first) { sidx_ = i; rec_ = slot->first; return true; } } db_->set_error(_KCCODELINE_, Error::NOREC, "no record"); sidx_ = -1; rec_ = NULL; return false; } return true; } /** Dummy constructor to forbid the use. */ Cursor(const Cursor&); /** Dummy Operator to forbid the use. */ Cursor& operator =(const Cursor&); /** The inner database. */ CacheDB* db_; /** The index of the current slot. */ int32_t sidx_; /** The current record. */ Record* rec_; }; /** * Tuning options. */ enum Option { TSMALL = 1 << 0, ///< dummy for compatibility TLINEAR = 1 << 1, ///< dummy for compatibility TCOMPRESS = 1 << 2 ///< compress each record }; /** * Status flags. */ enum Flag { FOPEN = 1 << 0, ///< dummy for compatibility FFATAL = 1 << 1 ///< dummy for compatibility }; /** * Default constructor. */ explicit CacheDB() : mlock_(), flock_(), error_(), logger_(NULL), logkinds_(0), mtrigger_(NULL), omode_(0), curs_(), path_(""), type_(TYPECACHE), opts_(0), bnum_(DEFBNUM), capcnt_(-1), capsiz_(-1), opaque_(), embcomp_(ZLIBRAWCOMP), comp_(NULL), slots_(), rttmode_(true), tran_(false) { _assert_(true); } /** * Destructor. * @note If the database is not closed, it is closed implicitly. */ virtual ~CacheDB() { _assert_(true); if (omode_ != 0) close(); if (!curs_.empty()) { CursorList::const_iterator cit = curs_.begin(); CursorList::const_iterator citend = curs_.end(); while (cit != citend) { Cursor* cur = *cit; cur->db_ = NULL; ++cit; } } } /** * Accept a visitor to a record. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param visitor a visitor object. * @param writable true for writable operation, or false for read-only operation. * @return true on success, or false on failure. * @note The operation for each record is performed atomically and other threads accessing the * same record are blocked. To avoid deadlock, any explicit database operation must not be * performed in this function. */ bool accept(const char* kbuf, size_t ksiz, Visitor* visitor, bool writable = true) { _assert_(kbuf && ksiz <= MEMMAXSIZ && visitor); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } if (writable && !(omode_ & OWRITER)) { set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); return false; } if (ksiz > KSIZMAX) ksiz = KSIZMAX; uint64_t hash = hash_record(kbuf, ksiz); int32_t sidx = hash % SLOTNUM; hash /= SLOTNUM; Slot* slot = slots_ + sidx; slot->lock.lock(); accept_impl(slot, hash, kbuf, ksiz, visitor, comp_, rttmode_); slot->lock.unlock(); return true; } /** * Accept a visitor to multiple records at once. * @param keys specifies a string vector of the keys. * @param visitor a visitor object. * @param writable true for writable operation, or false for read-only operation. * @return true on success, or false on failure. * @note The operations for specified records are performed atomically and other threads * accessing the same records are blocked. To avoid deadlock, any explicit database operation * must not be performed in this function. */ bool accept_bulk(const std::vector& keys, Visitor* visitor, bool writable = true) { _assert_(visitor); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } if (writable && !(omode_ & OWRITER)) { set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); return false; } ScopedVisitor svis(visitor); size_t knum = keys.size(); if (knum < 1) return true; struct RecordKey { const char* kbuf; size_t ksiz; uint64_t hash; int32_t sidx; }; RecordKey* rkeys = new RecordKey[knum]; std::set sidxs; for (size_t i = 0; i < knum; i++) { const std::string& key = keys[i]; RecordKey* rkey = rkeys + i; rkey->kbuf = key.data(); rkey->ksiz = key.size(); if (rkey->ksiz > KSIZMAX) rkey->ksiz = KSIZMAX; rkey->hash = hash_record(rkey->kbuf, rkey->ksiz); rkey->sidx = rkey->hash % SLOTNUM; sidxs.insert(rkey->sidx); rkey->hash /= SLOTNUM; } std::set::iterator sit = sidxs.begin(); std::set::iterator sitend = sidxs.end(); while (sit != sitend) { Slot* slot = slots_ + *sit; slot->lock.lock(); ++sit; } for (size_t i = 0; i < knum; i++) { RecordKey* rkey = rkeys + i; Slot* slot = slots_ + rkey->sidx; accept_impl(slot, rkey->hash, rkey->kbuf, rkey->ksiz, visitor, comp_, rttmode_); } sit = sidxs.begin(); sitend = sidxs.end(); while (sit != sitend) { Slot* slot = slots_ + *sit; slot->lock.unlock(); ++sit; } delete[] rkeys; return true; } /** * Iterate to accept a visitor for each record. * @param visitor a visitor object. * @param writable true for writable operation, or false for read-only operation. * @param checker a progress checker object. If it is NULL, no checking is performed. * @return true on success, or false on failure. * @note The whole iteration is performed atomically and other threads are blocked. To avoid * deadlock, any explicit database operation must not be performed in this function. */ bool iterate(Visitor *visitor, bool writable = true, ProgressChecker* checker = NULL) { _assert_(visitor); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } if (writable && !(omode_ & OWRITER)) { set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); return false; } ScopedVisitor svis(visitor); int64_t allcnt = count_impl(); if (checker && !checker->check("iterate", "beginning", 0, allcnt)) { set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); return false; } int64_t curcnt = 0; for (int32_t i = 0; i < SLOTNUM; i++) { Slot* slot = slots_ + i; Record* rec = slot->first; while (rec) { Record* next = rec->next; uint32_t rksiz = rec->ksiz & KSIZMAX; char* dbuf = (char*)rec + sizeof(*rec); const char* rvbuf = dbuf + rksiz; size_t rvsiz = rec->vsiz; char* zbuf = NULL; size_t zsiz = 0; if (comp_) { zbuf = comp_->decompress(rvbuf, rvsiz, &zsiz); if (zbuf) { rvbuf = zbuf; rvsiz = zsiz; } } size_t vsiz; const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vsiz); delete[] zbuf; if (vbuf == Visitor::REMOVE) { uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; Repeater repeater(Visitor::REMOVE, 0); accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, false); } else if (vbuf != Visitor::NOP) { uint64_t hash = hash_record(dbuf, rksiz) / SLOTNUM; Repeater repeater(vbuf, vsiz); accept_impl(slot, hash, dbuf, rksiz, &repeater, comp_, false); } rec = next; curcnt++; if (checker && !checker->check("iterate", "processing", curcnt, allcnt)) { set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); return false; } } } if (checker && !checker->check("iterate", "ending", -1, allcnt)) { set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); return false; } trigger_meta(MetaTrigger::ITERATE, "iterate"); return true; } /** * Scan each record in parallel. * @param visitor a visitor object. * @param thnum the number of worker threads. * @param checker a progress checker object. If it is NULL, no checking is performed. * @return true on success, or false on failure. * @note This function is for reading records and not for updating ones. The return value of * the visitor is just ignored. To avoid deadlock, any explicit database operation must not * be performed in this function. */ bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* checker = NULL) { _assert_(visitor && thnum <= MEMMAXSIZ); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } if (thnum < 1) thnum = 1; thnum = std::pow(2.0, (int32_t)(std::log(thnum * std::sqrt(2.0)) / std::log(2.0))); if (thnum > (size_t)SLOTNUM) thnum = SLOTNUM; ScopedVisitor svis(visitor); int64_t allcnt = count_impl(); if (checker && !checker->check("scan_parallel", "beginning", -1, allcnt)) { set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); return false; } class ThreadImpl : public Thread { public: explicit ThreadImpl() : db_(NULL), visitor_(NULL), checker_(NULL), allcnt_(0), slots_(), error_() {} void init(CacheDB* db, Visitor* visitor, ProgressChecker* checker, int64_t allcnt) { db_ = db; visitor_ = visitor; checker_ = checker; allcnt_ = allcnt; } void add_slot(Slot* slot) { slots_.push_back(slot); } const Error& error() { return error_; } private: void run() { CacheDB* db = db_; Visitor* visitor = visitor_; ProgressChecker* checker = checker_; int64_t allcnt = allcnt_; Compressor* comp = db->comp_; std::vector::iterator sit = slots_.begin(); std::vector::iterator sitend = slots_.end(); while (sit != sitend) { Slot* slot = *sit; Record* rec = slot->first; while (rec) { Record* next = rec->next; uint32_t rksiz = rec->ksiz & KSIZMAX; char* dbuf = (char*)rec + sizeof(*rec); const char* rvbuf = dbuf + rksiz; size_t rvsiz = rec->vsiz; char* zbuf = NULL; size_t zsiz = 0; if (comp) { zbuf = comp->decompress(rvbuf, rvsiz, &zsiz); if (zbuf) { rvbuf = zbuf; rvsiz = zsiz; } } size_t vsiz; visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vsiz); delete[] zbuf; rec = next; if (checker && !checker->check("scan_parallel", "processing", -1, allcnt)) { db->set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); error_ = db->error(); break; } } ++sit; } } CacheDB* db_; Visitor* visitor_; ProgressChecker* checker_; int64_t allcnt_; std::vector slots_; Error error_; }; bool err = false; bool orttmode = rttmode_; rttmode_ = false; ThreadImpl* threads = new ThreadImpl[thnum]; for (int32_t i = 0; i < SLOTNUM; i++) { ThreadImpl* thread = threads + (i % thnum); thread->add_slot(slots_ + i); } for (size_t i = 0; i < thnum; i++) { ThreadImpl* thread = threads + i; thread->init(this, visitor, checker, allcnt); thread->start(); } for (size_t i = 0; i < thnum; i++) { ThreadImpl* thread = threads + i; thread->join(); if (thread->error() != Error::SUCCESS) { *error_ = thread->error(); err = true; } } delete[] threads; rttmode_ = orttmode; if (err) return false; if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) { set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); return false; } trigger_meta(MetaTrigger::ITERATE, "scan_parallel"); return true; } /** * Get the last happened error. * @return the last happened error. */ Error error() const { _assert_(true); return error_; } /** * Set the error information. * @param file the file name of the program source code. * @param line the line number of the program source code. * @param func the function name of the program source code. * @param code an error code. * @param message a supplement message. */ void set_error(const char* file, int32_t line, const char* func, Error::Code code, const char* message) { _assert_(file && line > 0 && func && message); error_->set(code, message); if (logger_) { Logger::Kind kind = code == Error::BROKEN || code == Error::SYSTEM ? Logger::ERROR : Logger::INFO; if (kind & logkinds_) report(file, line, func, kind, "%d: %s: %s", code, Error::codename(code), message); } } /** * Open a database file. * @param path the path of a database file. * @param mode the connection mode. CacheDB::OWRITER as a writer, CacheDB::OREADER as a * reader. The following may be added to the writer mode by bitwise-or: CacheDB::OCREATE, * which means it creates a new database if the file does not exist, CacheDB::OTRUNCATE, which * means it creates a new database regardless if the file exists, CacheDB::OAUTOTRAN, which * means each updating operation is performed in implicit transaction, CacheDB::OAUTOSYNC, * which means each updating operation is followed by implicit synchronization with the file * system. The following may be added to both of the reader mode and the writer mode by * bitwise-or: CacheDB::ONOLOCK, which means it opens the database file without file locking, * CacheDB::OTRYLOCK, which means locking is performed without blocking, CacheDB::ONOREPAIR, * which means the database file is not repaired implicitly even if file destruction is * detected. * @return true on success, or false on failure. * @note Every opened database must be closed by the CacheDB::close method when it is no * longer in use. It is not allowed for two or more database objects in the same process to * keep their connections to the same database file at the same time. */ bool open(const std::string& path, uint32_t mode = OWRITER | OCREATE) { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ != 0) { set_error(_KCCODELINE_, Error::INVALID, "already opened"); return false; } report(_KCCODELINE_, Logger::DEBUG, "opening the database (path=%s)", path.c_str()); omode_ = mode; path_.append(path); size_t bnum = nearbyprime(bnum_ / SLOTNUM); size_t capcnt = capcnt_ > 0 ? capcnt_ / SLOTNUM + 1 : (1ULL << (sizeof(capcnt) * 8 - 1)); size_t capsiz = capsiz_ > 0 ? capsiz_ / SLOTNUM + 1 : (1ULL << (sizeof(capsiz) * 8 - 1)); if (capsiz > sizeof(*this) / SLOTNUM) capsiz -= sizeof(*this) / SLOTNUM; if (capsiz > bnum * sizeof(Record*)) capsiz -= bnum * sizeof(Record*); for (int32_t i = 0; i < SLOTNUM; i++) { initialize_slot(slots_ + i, bnum, capcnt, capsiz); } comp_ = (opts_ & TCOMPRESS) ? embcomp_ : NULL; std::memset(opaque_, 0, sizeof(opaque_)); trigger_meta(MetaTrigger::OPEN, "open"); return true; } /** * Close the database file. * @return true on success, or false on failure. */ bool close() { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } report(_KCCODELINE_, Logger::DEBUG, "closing the database (path=%s)", path_.c_str()); tran_ = false; for (int32_t i = SLOTNUM - 1; i >= 0; i--) { destroy_slot(slots_ + i); } path_.clear(); omode_ = 0; trigger_meta(MetaTrigger::CLOSE, "close"); return true; } /** * Synchronize updated contents with the file and the device. * @param hard true for physical synchronization with the device, or false for logical * synchronization with the file system. * @param proc a postprocessor object. If it is NULL, no postprocessing is performed. * @param checker a progress checker object. If it is NULL, no checking is performed. * @return true on success, or false on failure. * @note The operation of the postprocessor is performed atomically and other threads accessing * the same record are blocked. To avoid deadlock, any explicit database operation must not * be performed in this function. */ bool synchronize(bool hard = false, FileProcessor* proc = NULL, ProgressChecker* checker = NULL) { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } bool err = false; if ((omode_ & OWRITER) && checker && !checker->check("synchronize", "nothing to be synchronized", -1, -1)) { set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); return false; } if (proc) { if (checker && !checker->check("synchronize", "running the post processor", -1, -1)) { set_error(_KCCODELINE_, Error::LOGIC, "checker failed"); return false; } if (!proc->process(path_, count_impl(), size_impl())) { set_error(_KCCODELINE_, Error::LOGIC, "postprocessing failed"); err = true; } } trigger_meta(MetaTrigger::SYNCHRONIZE, "synchronize"); return !err; } /** * Occupy database by locking and do something meanwhile. * @param writable true to use writer lock, or false to use reader lock. * @param proc a processor object. If it is NULL, no processing is performed. * @return true on success, or false on failure. * @note The operation of the processor is performed atomically and other threads accessing * the same record are blocked. To avoid deadlock, any explicit database operation must not * be performed in this function. */ bool occupy(bool writable = true, FileProcessor* proc = NULL) { _assert_(true); ScopedRWLock lock(&mlock_, writable); bool err = false; if (proc && !proc->process(path_, count_impl(), size_impl())) { set_error(_KCCODELINE_, Error::LOGIC, "processing failed"); err = true; } trigger_meta(MetaTrigger::OCCUPY, "occupy"); return !err; } /** * Begin transaction. * @param hard true for physical synchronization with the device, or false for logical * synchronization with the file system. * @return true on success, or false on failure. */ bool begin_transaction(bool hard = false) { _assert_(true); uint32_t wcnt = 0; while (true) { mlock_.lock_writer(); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); mlock_.unlock(); return false; } if (!(omode_ & OWRITER)) { set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); mlock_.unlock(); return false; } if (!tran_) break; mlock_.unlock(); if (wcnt >= LOCKBUSYLOOP) { Thread::chill(); } else { Thread::yield(); wcnt++; } } tran_ = true; trigger_meta(MetaTrigger::BEGINTRAN, "begin_transaction"); mlock_.unlock(); return true; } /** * Try to begin transaction. * @param hard true for physical synchronization with the device, or false for logical * synchronization with the file system. * @return true on success, or false on failure. */ bool begin_transaction_try(bool hard = false) { _assert_(true); mlock_.lock_writer(); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); mlock_.unlock(); return false; } if (!(omode_ & OWRITER)) { set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); mlock_.unlock(); return false; } if (tran_) { set_error(_KCCODELINE_, Error::LOGIC, "competition avoided"); mlock_.unlock(); return false; } tran_ = true; trigger_meta(MetaTrigger::BEGINTRAN, "begin_transaction_try"); mlock_.unlock(); return true; } /** * End transaction. * @param commit true to commit the transaction, or false to abort the transaction. * @return true on success, or false on failure. */ bool end_transaction(bool commit = true) { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } if (!tran_) { set_error(_KCCODELINE_, Error::INVALID, "not in transaction"); return false; } if (!commit) disable_cursors(); for (int32_t i = 0; i < SLOTNUM; i++) { if (!commit) apply_slot_trlogs(slots_ + i); slots_[i].trlogs.clear(); adjust_slot_capacity(slots_ + i); } tran_ = false; trigger_meta(commit ? MetaTrigger::COMMITTRAN : MetaTrigger::ABORTTRAN, "end_transaction"); return true; } /** * Remove all records. * @return true on success, or false on failure. */ bool clear() { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } disable_cursors(); for (int32_t i = 0; i < SLOTNUM; i++) { Slot* slot = slots_ + i; clear_slot(slot); } std::memset(opaque_, 0, sizeof(opaque_)); trigger_meta(MetaTrigger::CLEAR, "clear"); return true; } /** * Get the number of records. * @return the number of records, or -1 on failure. */ int64_t count() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return -1; } return count_impl(); } /** * Get the size of the database file. * @return the size of the database file in bytes, or -1 on failure. */ int64_t size() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return -1; } return size_impl(); } /** * Get the path of the database file. * @return the path of the database file, or an empty string on failure. */ std::string path() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return ""; } return path_; } /** * Get the miscellaneous status information. * @param strmap a string map to contain the result. * @return true on success, or false on failure. */ bool status(std::map* strmap) { _assert_(strmap); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } (*strmap)["type"] = strprintf("%u", (unsigned)TYPECACHE); (*strmap)["realtype"] = strprintf("%u", (unsigned)type_); (*strmap)["path"] = path_; (*strmap)["libver"] = strprintf("%u", LIBVER); (*strmap)["librev"] = strprintf("%u", LIBREV); (*strmap)["fmtver"] = strprintf("%u", FMTVER); (*strmap)["chksum"] = strprintf("%u", 0xff); (*strmap)["opts"] = strprintf("%u", opts_); (*strmap)["bnum"] = strprintf("%lld", (long long)bnum_); (*strmap)["capcnt"] = strprintf("%lld", (long long)capcnt_); (*strmap)["capsiz"] = strprintf("%lld", (long long)capsiz_); (*strmap)["recovered"] = strprintf("%d", false); (*strmap)["reorganized"] = strprintf("%d", false); if (strmap->count("opaque") > 0) (*strmap)["opaque"] = std::string(opaque_, sizeof(opaque_)); if (strmap->count("bnum_used") > 0) { int64_t cnt = 0; for (int32_t i = 0; i < SLOTNUM; i++) { Slot* slot = slots_ + i; Record** buckets = slot->buckets; size_t bnum = slot->bnum; for (size_t j = 0; j < bnum; j++) { if (buckets[j]) cnt++; } } (*strmap)["bnum_used"] = strprintf("%lld", (long long)cnt); } (*strmap)["count"] = strprintf("%lld", (long long)count_impl()); (*strmap)["size"] = strprintf("%lld", (long long)size_impl()); return true; } /** * Create a cursor object. * @return the return value is the created cursor object. * @note Because the object of the return value is allocated by the constructor, it should be * released with the delete operator when it is no longer in use. */ Cursor* cursor() { _assert_(true); return new Cursor(this); } /** * Write a log message. * @param file the file name of the program source code. * @param line the line number of the program source code. * @param func the function name of the program source code. * @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal * information, Logger::WARN for warning, and Logger::ERROR for fatal error. * @param message the supplement message. */ void log(const char* file, int32_t line, const char* func, Logger::Kind kind, const char* message) { _assert_(file && line > 0 && func && message); ScopedRWLock lock(&mlock_, false); if (!logger_) return; logger_->log(file, line, func, kind, message); } /** * Set the internal logger. * @param logger the logger object. * @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging, * Logger::INFO for normal information, Logger::WARN for warning, and Logger::ERROR for fatal * error. * @return true on success, or false on failure. */ bool tune_logger(Logger* logger, uint32_t kinds = Logger::WARN | Logger::ERROR) { _assert_(logger); ScopedRWLock lock(&mlock_, true); if (omode_ != 0) { set_error(_KCCODELINE_, Error::INVALID, "already opened"); return false; } logger_ = logger; logkinds_ = kinds; return true; } /** * Set the internal meta operation trigger. * @param trigger the trigger object. * @return true on success, or false on failure. */ bool tune_meta_trigger(MetaTrigger* trigger) { _assert_(trigger); ScopedRWLock lock(&mlock_, true); if (omode_ != 0) { set_error(_KCCODELINE_, Error::INVALID, "already opened"); return false; } mtrigger_ = trigger; return true; } /** * Set the optional features. * @param opts the optional features by bitwise-or: DirDB::TCOMPRESS to compress each record. * @return true on success, or false on failure. */ bool tune_options(int8_t opts) { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ != 0) { set_error(_KCCODELINE_, Error::INVALID, "already opened"); return false; } opts_ = opts; return true; } /** * Set the number of buckets of the hash table. * @param bnum the number of buckets of the hash table. * @return true on success, or false on failure. */ bool tune_buckets(int64_t bnum) { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ != 0) { set_error(_KCCODELINE_, Error::INVALID, "already opened"); return false; } bnum_ = bnum >= 0 ? bnum : DEFBNUM; return true; } /** * Set the data compressor. * @param comp the data compressor object. * @return true on success, or false on failure. */ bool tune_compressor(Compressor* comp) { _assert_(comp); ScopedRWLock lock(&mlock_, true); if (omode_ != 0) { set_error(_KCCODELINE_, Error::INVALID, "already opened"); return false; } embcomp_ = comp; return true; } /** * Set the capacity by record number. * @param count the maximum number of records. * @return true on success, or false on failure. */ bool cap_count(int64_t count) { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ != 0) { set_error(_KCCODELINE_, Error::INVALID, "already opened"); return false; } capcnt_ = count; return true; } /** * Set the capacity by memory usage. * @param size the maximum size of memory usage. * @return true on success, or false on failure. */ bool cap_size(int64_t size) { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ != 0) { set_error(_KCCODELINE_, Error::INVALID, "already opened"); return false; } capsiz_ = size; return true; } /** * Switch the mode of LRU rotation. * @param rttmode true to enable LRU rotation, false to disable LRU rotation. * @return true on success, or false on failure. * @note This function can be called while the database is opened. */ bool switch_rotation(bool rttmode) { _assert_(true); ScopedRWLock lock(&mlock_, true); rttmode_ = rttmode; return true; } /** * Get the opaque data. * @return the pointer to the opaque data region, whose size is 16 bytes. */ char* opaque() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return NULL; } return opaque_; } /** * Synchronize the opaque data. * @return true on success, or false on failure. */ bool synchronize_opaque() { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } if (!(omode_ & OWRITER)) { set_error(_KCCODELINE_, Error::NOPERM, "permission denied"); return false; } return true; } protected: /** * Report a message for debugging. * @param file the file name of the program source code. * @param line the line number of the program source code. * @param func the function name of the program source code. * @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal * information, Logger::WARN for warning, and Logger::ERROR for fatal error. * @param format the printf-like format string. * @param ... used according to the format string. */ void report(const char* file, int32_t line, const char* func, Logger::Kind kind, const char* format, ...) { _assert_(file && line > 0 && func && format); if (!logger_ || !(kind & logkinds_)) return; std::string message; strprintf(&message, "%s: ", path_.empty() ? "-" : path_.c_str()); va_list ap; va_start(ap, format); vstrprintf(&message, format, ap); va_end(ap); logger_->log(file, line, func, kind, message.c_str()); } /** * Report a message for debugging with variable number of arguments. * @param file the file name of the program source code. * @param line the line number of the program source code. * @param func the function name of the program source code. * @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal * information, Logger::WARN for warning, and Logger::ERROR for fatal error. * @param format the printf-like format string. * @param ap used according to the format string. */ void report_valist(const char* file, int32_t line, const char* func, Logger::Kind kind, const char* format, va_list ap) { _assert_(file && line > 0 && func && format); if (!logger_ || !(kind & logkinds_)) return; std::string message; strprintf(&message, "%s: ", path_.empty() ? "-" : path_.c_str()); vstrprintf(&message, format, ap); logger_->log(file, line, func, kind, message.c_str()); } /** * Report the content of a binary buffer for debugging. * @param file the file name of the epicenter. * @param line the line number of the epicenter. * @param func the function name of the program source code. * @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal * information, Logger::WARN for warning, and Logger::ERROR for fatal error. * @param name the name of the information. * @param buf the binary buffer. * @param size the size of the binary buffer */ void report_binary(const char* file, int32_t line, const char* func, Logger::Kind kind, const char* name, const char* buf, size_t size) { _assert_(file && line > 0 && func && name && buf && size <= MEMMAXSIZ); if (!logger_) return; char* hex = hexencode(buf, size); report(file, line, func, kind, "%s=%s", name, hex); delete[] hex; } /** * Trigger a meta database operation. * @param kind the kind of the event. MetaTrigger::OPEN for opening, MetaTrigger::CLOSE for * closing, MetaTrigger::CLEAR for clearing, MetaTrigger::ITERATE for iteration, * MetaTrigger::SYNCHRONIZE for synchronization, MetaTrigger::BEGINTRAN for beginning * transaction, MetaTrigger::COMMITTRAN for committing transaction, MetaTrigger::ABORTTRAN * for aborting transaction, and MetaTrigger::MISC for miscellaneous operations. * @param message the supplement message. */ void trigger_meta(MetaTrigger::Kind kind, const char* message) { _assert_(message); if (mtrigger_) mtrigger_->trigger(kind, message); } /** * Set the database type. * @param type the database type. * @return true on success, or false on failure. */ bool tune_type(int8_t type) { _assert_(true); ScopedRWLock lock(&mlock_, true); if (omode_ != 0) { set_error(_KCCODELINE_, Error::INVALID, "already opened"); return false; } type_ = type; return true; } /** * Get the library version. * @return the library version, or 0 on failure. */ uint8_t libver() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return 0; } return LIBVER; } /** * Get the library revision. * @return the library revision, or 0 on failure. */ uint8_t librev() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return 0; } return LIBREV; } /** * Get the format version. * @return the format version, or 0 on failure. */ uint8_t fmtver() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return 0; } return FMTVER; } /** * Get the module checksum. * @return the module checksum, or 0 on failure. */ uint8_t chksum() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return 0; } return 0xff; } /** * Get the database type. * @return the database type, or 0 on failure. */ uint8_t type() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return 0; } return type_; } /** * Get the options. * @return the options, or 0 on failure. */ uint8_t opts() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return 0; } return opts_; } /** * Get the data compressor. * @return the data compressor, or NULL on failure. */ Compressor* comp() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return NULL; } return comp_; } /** * Check whether the database was recovered or not. * @return true if recovered, or false if not. */ bool recovered() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } return false; } /** * Check whether the database was reorganized or not. * @return true if reorganized, or false if not. */ bool reorganized() { _assert_(true); ScopedRWLock lock(&mlock_, false); if (omode_ == 0) { set_error(_KCCODELINE_, Error::INVALID, "not opened"); return false; } return false; } private: /** * Set the power of the alignment of record size. * @note This is a dummy implementation for compatibility. */ bool tune_alignment(int8_t apow) { return true; } /** * Set the power of the capacity of the free block pool. * @note This is a dummy implementation for compatibility. */ bool tune_fbp(int8_t fpow) { return true; } /** * Set the size of the internal memory-mapped region. * @note This is a dummy implementation for compatibility. */ bool tune_map(int64_t msiz) { return true; } /** * Set the unit step number of auto defragmentation. * @note This is a dummy implementation for compatibility. */ bool tune_defrag(int64_t dfunit) { return true; } /** * Perform defragmentation of the file. * @note This is a dummy implementation for compatibility. */ bool defrag(int64_t step = 0) { return true; } /** * Get the status flags. * @note This is a dummy implementation for compatibility. */ uint8_t flags() { return 0; } /** * Get the alignment power. * @note This is a dummy implementation for compatibility. */ uint8_t apow() { return 0; } /** * Get the free block pool power. * @note This is a dummy implementation for compatibility. */ uint8_t fpow() { return 0; } /** * Get the bucket number. * @note This is a dummy implementation for compatibility. */ int64_t bnum() { return 1; } /** * Get the size of the internal memory-mapped region. * @note This is a dummy implementation for compatibility. */ int64_t msiz() { return 0; } /** * Get the unit step number of auto defragmentation. * @note This is a dummy implementation for compatibility. */ int64_t dfunit() { return 0; } private: /** * Record data. */ struct Record { uint32_t ksiz; ///< size of the key uint32_t vsiz; ///< size of the value Record* left; ///< left child record Record* right; ///< right child record Record* prev; ///< privious record Record* next; ///< next record }; /** * Transaction log. */ struct TranLog { bool full; ///< flag whether full std::string key; ///< old key std::string value; ///< old value /** constructor for a full record */ explicit TranLog(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz) : full(true), key(kbuf, ksiz), value(vbuf, vsiz) { _assert_(true); } /** constructor for an empty record */ explicit TranLog(const char* kbuf, size_t ksiz) : full(false), key(kbuf, ksiz) { _assert_(true); } }; /** * Slot table. */ struct Slot { Mutex lock; ///< lock Record** buckets; ///< bucket array size_t bnum; ///< number of buckets size_t capcnt; ///< cap of record number size_t capsiz; ///< cap of memory usage Record* first; ///< first record Record* last; ///< last record size_t count; ///< number of records size_t size; ///< total size of records TranLogList trlogs; ///< transaction logs size_t trsize; ///< size before transaction }; /** * Repeating visitor. */ class Repeater : public Visitor { public: /** constructor */ explicit Repeater(const char* vbuf, size_t vsiz) : vbuf_(vbuf), vsiz_(vsiz) {} private: /** process a full record */ const char* visit_full(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz, size_t* sp) { _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ && sp); *sp = vsiz_; return vbuf_; } const char* vbuf_; ///< region of the value size_t vsiz_; ///< size of the value }; /** * Setting visitor. */ class Setter : public Visitor { public: /** constructor */ explicit Setter(const char* vbuf, size_t vsiz) : vbuf_(vbuf), vsiz_(vsiz) {} private: /** process a full record */ const char* visit_full(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz, size_t* sp) { _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ && sp); *sp = vsiz_; return vbuf_; } /** process an empty record */ const char* visit_empty(const char* kbuf, size_t ksiz, size_t* sp) { _assert_(kbuf && ksiz <= MEMMAXSIZ && sp); *sp = vsiz_; return vbuf_; } const char* vbuf_; ///< region of the value size_t vsiz_; ///< size of the value }; /** * Removing visitor. */ class Remover : public Visitor { private: /** visit a record */ const char* visit_full(const char* kbuf, size_t ksiz, const char* vbuf, size_t vsiz, size_t* sp) { _assert_(kbuf && ksiz <= MEMMAXSIZ && vbuf && vsiz <= MEMMAXSIZ && sp); return REMOVE; } }; /** * Scoped visitor. */ class ScopedVisitor { public: /** constructor */ explicit ScopedVisitor(Visitor* visitor) : visitor_(visitor) { _assert_(visitor); visitor_->visit_before(); } /** destructor */ ~ScopedVisitor() { _assert_(true); visitor_->visit_after(); } private: Visitor* visitor_; ///< visitor }; /** * Accept a visitor to a record. * @param slot the slot of the record. * @param hash the hash value of the key. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @param visitor a visitor object. * @param comp the data compressor. * @param rtt whether to move the record to the last. */ void accept_impl(Slot* slot, uint64_t hash, const char* kbuf, size_t ksiz, Visitor* visitor, Compressor* comp, bool rtt) { _assert_(slot && kbuf && ksiz <= MEMMAXSIZ && visitor); size_t bidx = hash % slot->bnum; Record* rec = slot->buckets[bidx]; Record** entp = slot->buckets + bidx; uint32_t fhash = fold_hash(hash) & ~KSIZMAX; while (rec) { uint32_t rhash = rec->ksiz & ~KSIZMAX; uint32_t rksiz = rec->ksiz & KSIZMAX; if (fhash > rhash) { entp = &rec->left; rec = rec->left; } else if (fhash < rhash) { entp = &rec->right; rec = rec->right; } else { char* dbuf = (char*)rec + sizeof(*rec); int32_t kcmp = compare_keys(kbuf, ksiz, dbuf, rksiz); if (kcmp < 0) { entp = &rec->left; rec = rec->left; } else if (kcmp > 0) { entp = &rec->right; rec = rec->right; } else { const char* rvbuf = dbuf + rksiz; size_t rvsiz = rec->vsiz; char* zbuf = NULL; size_t zsiz = 0; if (comp) { zbuf = comp->decompress(rvbuf, rvsiz, &zsiz); if (zbuf) { rvbuf = zbuf; rvsiz = zsiz; } } size_t vsiz; const char* vbuf = visitor->visit_full(dbuf, rksiz, rvbuf, rvsiz, &vsiz); delete[] zbuf; if (vbuf == Visitor::REMOVE) { if (tran_) { TranLog log(kbuf, ksiz, dbuf + rksiz, rec->vsiz); slot->trlogs.push_back(log); } if (!curs_.empty()) escape_cursors(rec); if (rec == slot->first) slot->first = rec->next; if (rec == slot->last) slot->last = rec->prev; if (rec->prev) rec->prev->next = rec->next; if (rec->next) rec->next->prev = rec->prev; if (rec->left && !rec->right) { *entp = rec->left; } else if (!rec->left && rec->right) { *entp = rec->right; } else if (!rec->left) { *entp = NULL; } else { Record* pivot = rec->left; if (pivot->right) { Record** pentp = &pivot->right; pivot = pivot->right; while (pivot->right) { pentp = &pivot->right; pivot = pivot->right; } *entp = pivot; *pentp = pivot->left; pivot->left = rec->left; pivot->right = rec->right; } else { *entp = pivot; pivot->right = rec->right; } } slot->count--; slot->size -= sizeof(Record) + rksiz + rec->vsiz; xfree(rec); } else { bool adj = false; if (vbuf != Visitor::NOP) { char* zbuf = NULL; size_t zsiz = 0; if (comp) { zbuf = comp->compress(vbuf, vsiz, &zsiz); if (zbuf) { vbuf = zbuf; vsiz = zsiz; } } if (tran_) { TranLog log(kbuf, ksiz, dbuf + rksiz, rec->vsiz); slot->trlogs.push_back(log); } else { adj = vsiz > rec->vsiz; } slot->size -= rec->vsiz; slot->size += vsiz; if (vsiz > rec->vsiz) { Record* old = rec; rec = (Record*)xrealloc(rec, sizeof(*rec) + ksiz + vsiz); if (rec != old) { if (!curs_.empty()) adjust_cursors(old, rec); if (slot->first == old) slot->first = rec; if (slot->last == old) slot->last = rec; *entp = rec; if (rec->prev) rec->prev->next = rec; if (rec->next) rec->next->prev = rec; dbuf = (char*)rec + sizeof(*rec); } } std::memcpy(dbuf + ksiz, vbuf, vsiz); rec->vsiz = vsiz; delete[] zbuf; } if (rtt && slot->last != rec) { if (!curs_.empty()) escape_cursors(rec); if (slot->first == rec) slot->first = rec->next; if (rec->prev) rec->prev->next = rec->next; if (rec->next) rec->next->prev = rec->prev; rec->prev = slot->last; rec->next = NULL; slot->last->next = rec; slot->last = rec; } if (adj) adjust_slot_capacity(slot); } return; } } } size_t vsiz; const char* vbuf = visitor->visit_empty(kbuf, ksiz, &vsiz); if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) { char* zbuf = NULL; size_t zsiz = 0; if (comp) { zbuf = comp->compress(vbuf, vsiz, &zsiz); if (zbuf) { vbuf = zbuf; vsiz = zsiz; } } if (tran_) { TranLog log(kbuf, ksiz); slot->trlogs.push_back(log); } slot->size += sizeof(Record) + ksiz + vsiz; rec = (Record*)xmalloc(sizeof(*rec) + ksiz + vsiz); char* dbuf = (char*)rec + sizeof(*rec); std::memcpy(dbuf, kbuf, ksiz); rec->ksiz = ksiz | fhash; std::memcpy(dbuf + ksiz, vbuf, vsiz); rec->vsiz = vsiz; rec->left = NULL; rec->right = NULL; rec->prev = slot->last; rec->next = NULL; *entp = rec; if (!slot->first) slot->first = rec; if (slot->last) slot->last->next = rec; slot->last = rec; slot->count++; if (!tran_) adjust_slot_capacity(slot); delete[] zbuf; } } /** * Get the number of records. * @return the number of records, or -1 on failure. */ int64_t count_impl() { _assert_(true); int64_t sum = 0; for (int32_t i = 0; i < SLOTNUM; i++) { Slot* slot = slots_ + i; ScopedMutex lock(&slot->lock); sum += slot->count; } return sum; } /** * Get the size of the database file. * @return the size of the database file in bytes. */ int64_t size_impl() { _assert_(true); int64_t sum = sizeof(*this); for (int32_t i = 0; i < SLOTNUM; i++) { Slot* slot = slots_ + i; ScopedMutex lock(&slot->lock); sum += slot->bnum * sizeof(Record*); sum += slot->size; } return sum; } /** * Initialize a slot table. * @param slot the slot table. * @param bnum the number of buckets. * @param capcnt the capacity of record number. * @param capsiz the capacity of memory usage. */ void initialize_slot(Slot* slot, size_t bnum, size_t capcnt, size_t capsiz) { _assert_(slot); Record** buckets; if (bnum >= ZMAPBNUM) { buckets = (Record**)mapalloc(sizeof(*buckets) * bnum); } else { buckets = new Record*[bnum]; for (size_t i = 0; i < bnum; i++) { buckets[i] = NULL; } } slot->buckets = buckets; slot->bnum = bnum; slot->capcnt = capcnt; slot->capsiz = capsiz; slot->first = NULL; slot->last = NULL; slot->count = 0; slot->size = 0; } /** * Destroy a slot table. * @param slot the slot table. */ void destroy_slot(Slot* slot) { _assert_(slot); slot->trlogs.clear(); Record* rec = slot->last; while (rec) { Record* prev = rec->prev; xfree(rec); rec = prev; } if (slot->bnum >= ZMAPBNUM) { mapfree(slot->buckets); } else { delete[] slot->buckets; } } /** * Clear a slot table. * @param slot the slot table. */ void clear_slot(Slot* slot) { _assert_(slot); Record* rec = slot->last; while (rec) { if (tran_) { uint32_t rksiz = rec->ksiz & KSIZMAX; char* dbuf = (char*)rec + sizeof(*rec); TranLog log(dbuf, rksiz, dbuf + rksiz, rec->vsiz); slot->trlogs.push_back(log); } Record* prev = rec->prev; xfree(rec); rec = prev; } Record** buckets = slot->buckets; size_t bnum = slot->bnum; for (size_t i = 0; i < bnum; i++) { buckets[i] = NULL; } slot->first = NULL; slot->last = NULL; slot->count = 0; slot->size = 0; } /** * Apply transaction logs of a slot table. * @param slot the slot table. */ void apply_slot_trlogs(Slot* slot) { _assert_(slot); const TranLogList& logs = slot->trlogs; TranLogList::const_iterator it = logs.end(); TranLogList::const_iterator itbeg = logs.begin(); while (it != itbeg) { --it; const char* kbuf = it->key.c_str(); size_t ksiz = it->key.size(); const char* vbuf = it->value.c_str(); size_t vsiz = it->value.size(); uint64_t hash = hash_record(kbuf, ksiz) / SLOTNUM; if (it->full) { Setter setter(vbuf, vsiz); accept_impl(slot, hash, kbuf, ksiz, &setter, NULL, false); } else { Remover remover; accept_impl(slot, hash, kbuf, ksiz, &remover, NULL, false); } } } /** * Addjust a slot table to the capacity. * @param slot the slot table. */ void adjust_slot_capacity(Slot* slot) { _assert_(slot); if ((slot->count > slot->capcnt || slot->size > slot->capsiz) && slot->first) { Record* rec = slot->first; uint32_t rksiz = rec->ksiz & KSIZMAX; char* dbuf = (char*)rec + sizeof(*rec); char stack[RECBUFSIZ]; char* kbuf = rksiz > sizeof(stack) ? new char[rksiz] : stack; std::memcpy(kbuf, dbuf, rksiz); uint64_t hash = hash_record(kbuf, rksiz) / SLOTNUM; Remover remover; accept_impl(slot, hash, dbuf, rksiz, &remover, NULL, false); if (kbuf != stack) delete[] kbuf; } } /** * Get the hash value of a record. * @param kbuf the pointer to the key region. * @param ksiz the size of the key region. * @return the hash value. */ uint64_t hash_record(const char* kbuf, size_t ksiz) { _assert_(kbuf && ksiz <= MEMMAXSIZ); return hashmurmur(kbuf, ksiz); } /** * Fold a hash value into a small number. * @param hash the hash number. * @return the result number. */ uint32_t fold_hash(uint64_t hash) { _assert_(true); return ((hash & 0xffffffff00000000ULL) >> 32) ^ ((hash & 0x0000ffffffff0000ULL) >> 16) ^ ((hash & 0x000000000000ffffULL) << 16) ^ ((hash & 0x00000000ffff0000ULL) >> 0); } /** * Compare two keys in lexical order. * @param abuf one key. * @param asiz the size of the one key. * @param bbuf the other key. * @param bsiz the size of the other key. * @return positive if the former is big, or negative if the latter is big, or 0 if both are * equivalent. */ int32_t compare_keys(const char* abuf, size_t asiz, const char* bbuf, size_t bsiz) { _assert_(abuf && asiz <= MEMMAXSIZ && bbuf && bsiz <= MEMMAXSIZ); if (asiz != bsiz) return (int32_t)asiz - (int32_t)bsiz; return std::memcmp(abuf, bbuf, asiz); } /** * Escape cursors on a shifted or removed records. * @param rec the record. */ void escape_cursors(Record* rec) { _assert_(rec); ScopedMutex lock(&flock_); if (curs_.empty()) return; CursorList::const_iterator cit = curs_.begin(); CursorList::const_iterator citend = curs_.end(); while (cit != citend) { Cursor* cur = *cit; if (cur->rec_ == rec) cur->step_impl(); ++cit; } } /** * Adjust cursors on re-allocated records. * @param orec the old address. * @param nrec the new address. */ void adjust_cursors(Record* orec, Record* nrec) { _assert_(orec && nrec); ScopedMutex lock(&flock_); if (curs_.empty()) return; CursorList::const_iterator cit = curs_.begin(); CursorList::const_iterator citend = curs_.end(); while (cit != citend) { Cursor* cur = *cit; if (cur->rec_ == orec) cur->rec_ = nrec; ++cit; } } /** * Disable all cursors. */ void disable_cursors() { _assert_(true); ScopedMutex lock(&flock_); CursorList::const_iterator cit = curs_.begin(); CursorList::const_iterator citend = curs_.end(); while (cit != citend) { Cursor* cur = *cit; cur->sidx_ = -1; cur->rec_ = NULL; ++cit; } } /** Dummy constructor to forbid the use. */ CacheDB(const CacheDB&); /** Dummy Operator to forbid the use. */ CacheDB& operator =(const CacheDB&); /** The method lock. */ RWLock mlock_; /** The file lock. */ Mutex flock_; /** The last happened error. */ TSD error_; /** The internal logger. */ Logger* logger_; /** The kinds of logged messages. */ uint32_t logkinds_; /** The internal meta operation trigger. */ MetaTrigger* mtrigger_; /** The open mode. */ uint32_t omode_; /** The cursor objects. */ CursorList curs_; /** The path of the database file. */ std::string path_; /** The database type. */ uint8_t type_; /** The options. */ uint8_t opts_; /** The bucket number. */ int64_t bnum_; /** The capacity of record number. */ int64_t capcnt_; /** The capacity of memory usage. */ int64_t capsiz_; /** The opaque data. */ char opaque_[OPAQUESIZ]; /** The embedded data compressor. */ Compressor* embcomp_; /** The data compressor. */ Compressor* comp_; /** The slot tables. */ Slot slots_[SLOTNUM]; /** The flag whether in LRU rotation. */ bool rttmode_; /** The flag whether in transaction. */ bool tran_; }; /** An alias of the cache tree database. */ typedef PlantDB GrassDB; } // common namespace #endif // duplication check // END OF FILE