/*************************************************************************************************
* Plant database
* Copyright (C) 2009-2012 FAL Labs
* This file is part of Kyoto Cabinet.
* This program is free software: you can redistribute it and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation, either version
* 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
* You should have received a copy of the GNU General Public License along with this program.
* If not, see .
*************************************************************************************************/
#ifndef _KCPLANTDB_H // duplication check
#define _KCPLANTDB_H
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define KCPDBMETAKEY "@" ///< key of the record for meta data
#define KCPDBTMPPATHEXT "tmpkct" ///< extension of the temporary file
#define KCPDRECBUFSIZ 128 ///< size of the record buffer
namespace kyotocabinet { // common namespace
/**
* Plant database.
* @param BASEDB a class compatible with the file hash database class.
* @param DBTYPE the database type number of the class.
* @note This class template is a template for concrete classes to operate tree databases.
* Template instance classes can be inherited but overwriting methods is forbidden. The class
* TreeDB is the instance of the file tree database. The class ForestDB is the instance of the
* directory tree database. Before every database operation, it is necessary to call the
* BasicDB::open method in order to open a database file and connect the database object to it.
* To avoid data missing or corruption, it is important to close every database file by the
* BasicDB::close method when the database is no longer in use. It is forbidden for multible
* database objects in a process to open the same database at the same time. It is forbidden to
* share a database object with child processes.
*/
template
class PlantDB : public BasicDB {
public:
class Cursor;
private:
struct Record;
struct RecordComparator;
struct LeafNode;
struct Link;
struct LinkComparator;
struct InnerNode;
struct LeafSlot;
struct InnerSlot;
class ScopedVisitor;
/** An alias of array of records. */
typedef std::vector RecordArray;
/** An alias of array of records. */
typedef std::vector LinkArray;
/** An alias of leaf node cache. */
typedef LinkedHashMap LeafCache;
/** An alias of inner node cache. */
typedef LinkedHashMap InnerCache;
/** An alias of list of cursors. */
typedef std::list CursorList;
/** The number of cache slots. */
static const int32_t SLOTNUM = 16;
/** The default alignment power. */
static const uint8_t DEFAPOW = 8;
/** The default free block pool power. */
static const uint8_t DEFFPOW = 10;
/** The default bucket number. */
static const int64_t DEFBNUM = 64LL << 10;
/** The default page size. */
static const int32_t DEFPSIZ = 8192;
/** The default capacity size of the page cache. */
static const int64_t DEFPCCAP = 64LL << 20;
/** The size of the header. */
static const int64_t HEADSIZ = 80;
/** The offset of the numbers. */
static const int64_t MOFFNUMS = 8;
/** The prefix of leaf nodes. */
static const char LNPREFIX = 'L';
/** The prefix of inner nodes. */
static const char INPREFIX = 'I';
/** The average number of ways of each node. */
static const size_t AVGWAY = 16;
/** The ratio of the warm cache. */
static const size_t WARMRATIO = 4;
/** The ratio of flushing inner nodes. */
static const size_t INFLRATIO = 32;
/** The default number of items in each leaf node. */
static const size_t DEFLINUM = 64;
/** The default number of items in each inner node. */
static const size_t DEFIINUM = 128;
/** The base ID number for inner nodes. */
static const int64_t INIDBASE = 1LL << 48;
/** The minimum number of links in each inner node. */
static const size_t INLINKMIN = 8;
/** The maximum level of B+ tree. */
static const int32_t LEVELMAX = 16;
/** The number of cached nodes for auto transaction. */
static const int32_t ATRANCNUM = 256;
/** The threshold of busy loop and sleep for locking. */
static const uint32_t LOCKBUSYLOOP = 8192;
public:
/**
* Cursor to indicate a record.
*/
class Cursor : public BasicDB::Cursor {
friend class PlantDB;
public:
/**
* Constructor.
* @param db the container database object.
*/
explicit Cursor(PlantDB* db) :
db_(db), stack_(), kbuf_(NULL), ksiz_(0), lid_(0), back_(false) {
_assert_(db);
db_->curs_.push_back(this);
}
/**
* Destructor.
*/
virtual ~Cursor() {
_assert_(true);
if (!db_) return;
if (kbuf_) clear_position();
db_->curs_.remove(this);
}
/**
* Accept a visitor to the current record.
* @param visitor a visitor object.
* @param writable true for writable operation, or false for read-only operation.
* @param step true to move the cursor to the next record, or false for no move.
* @return true on success, or false on failure.
* @note The operation for each record is performed atomically and other threads accessing
* the same record are blocked. To avoid deadlock, any explicit database operation must not
* be performed in this function.
*/
bool accept(Visitor* visitor, bool writable = true, bool step = false) {
_assert_(visitor);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (writable && !(db_->writer_)) {
db_->set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
if (!kbuf_) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
bool err = false;
bool hit = false;
if (lid_ > 0 && !accept_spec(visitor, writable, step, &hit)) err = true;
if (!err && !hit) {
if (kbuf_) {
bool retry = true;
while (!err && retry) {
if (!accept_atom(visitor, step, &retry)) err = true;
}
} else {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
err = true;
}
}
return !err;
}
/**
* Jump the cursor to the first record for forward scan.
* @return true on success, or false on failure.
*/
bool jump() {
_assert_(true);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
back_ = false;
if (kbuf_) clear_position();
bool err = false;
if (!set_position(db_->first_)) err = true;
return !err;
}
/**
* Jump the cursor to a record for forward scan.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @return true on success, or false on failure.
*/
bool jump(const char* kbuf, size_t ksiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
back_ = false;
if (kbuf_) clear_position();
set_position(kbuf, ksiz, 0);
bool err = false;
if (!adjust_position()) {
if (kbuf_) clear_position();
err = true;
}
return !err;
}
/**
* Jump the cursor to a record for forward scan.
* @note Equal to the original Cursor::jump method except that the parameter is std::string.
*/
bool jump(const std::string& key) {
_assert_(true);
return jump(key.c_str(), key.size());
}
/**
* Jump the cursor to the last record for backward scan.
* @return true on success, or false on failure.
* @note This method is dedicated to tree databases. Some database types, especially hash
* databases, may provide a dummy implementation.
*/
bool jump_back() {
_assert_(true);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
back_ = true;
if (kbuf_) clear_position();
bool err = false;
if (!set_position_back(db_->last_)) err = true;
return !err;
}
/**
* Jump the cursor to a record for backward scan.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @return true on success, or false on failure.
*/
bool jump_back(const char* kbuf, size_t ksiz) {
_assert_(kbuf && ksiz <= MEMMAXSIZ);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
back_ = true;
if (kbuf_) clear_position();
set_position(kbuf, ksiz, 0);
bool err = false;
if (adjust_position()) {
if (db_->reccomp_.comp->compare(kbuf, ksiz, kbuf_, ksiz_) < 0) {
bool hit = false;
if (lid_ > 0 && !back_position_spec(&hit)) err = true;
if (!err && !hit) {
if (kbuf_) {
if (!back_position_atom()) err = true;
} else {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
err = true;
}
}
}
} else {
if (kbuf_) clear_position();
if (!set_position_back(db_->last_)) err = true;
}
return !err;
}
/**
* Jump the cursor to a record for backward scan.
* @note Equal to the original Cursor::jump_back method except that the parameter is
* std::string.
*/
bool jump_back(const std::string& key) {
_assert_(true);
return jump_back(key.c_str(), key.size());
}
/**
* Step the cursor to the next record.
* @return true on success, or false on failure.
*/
bool step() {
_assert_(true);
back_ = false;
DB::Visitor visitor;
if (!accept(&visitor, false, true)) return false;
if (!kbuf_) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
return true;
}
/**
* Step the cursor to the previous record.
* @return true on success, or false on failure.
*/
bool step_back() {
_assert_(true);
if (db_->omode_ == 0) {
db_->set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (!kbuf_) {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
back_ = true;
bool err = false;
bool hit = false;
if (lid_ > 0 && !back_position_spec(&hit)) err = true;
if (!err && !hit) {
if (kbuf_) {
if (!back_position_atom()) err = true;
} else {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
err = true;
}
}
return !err;
}
/**
* Get the database object.
* @return the database object.
*/
PlantDB* db() {
_assert_(true);
return db_;
}
/**
* Clear the position.
*/
void clear_position() {
_assert_(true);
if (kbuf_ != stack_) delete[] kbuf_;
kbuf_ = NULL;
lid_ = 0;
}
/**
* Set the current position.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param id the ID of the current node.
*/
void set_position(const char* kbuf, size_t ksiz, int64_t id) {
_assert_(kbuf);
kbuf_ = ksiz > sizeof(stack_) ? new char[ksiz] : stack_;
ksiz_ = ksiz;
std::memcpy(kbuf_, kbuf, ksiz);
lid_ = id;
}
/**
* Set the current position with a record.
* @param rec the current record.
* @param id the ID of the current node.
*/
void set_position(Record* rec, int64_t id) {
_assert_(rec);
char* dbuf = (char*)rec + sizeof(*rec);
set_position(dbuf, rec->ksiz, id);
}
/**
* Set the current position at the next node.
* @param id the ID of the next node.
* @return true on success, or false on failure.
*/
bool set_position(int64_t id) {
_assert_(true);
while (id > 0) {
LeafNode* node = db_->load_leaf_node(id, false);
if (!node) {
db_->set_error(_KCCODELINE_, Error::BROKEN, "missing leaf node");
db_->db_.report(_KCCODELINE_, Logger::WARN, "id=%lld", (long long)id);
return false;
}
RecordArray& recs = node->recs;
if (!recs.empty()) {
set_position(recs.front(), id);
return true;
} else {
id = node->next;
}
}
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
/**
* Set the current position at the previous node.
* @param id the ID of the previous node.
* @return true on success, or false on failure.
*/
bool set_position_back(int64_t id) {
_assert_(true);
while (id > 0) {
LeafNode* node = db_->load_leaf_node(id, false);
if (!node) {
db_->set_error(_KCCODELINE_, Error::BROKEN, "missing leaf node");
db_->db_.report(_KCCODELINE_, Logger::WARN, "id=%lld", (long long)id);
return false;
}
RecordArray& recs = node->recs;
if (!recs.empty()) {
set_position(recs.back(), id);
return true;
} else {
id = node->prev;
}
}
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
return false;
}
/**
* Accept a visitor to the current record speculatively.
* @param visitor a visitor object.
* @param writable true for writable operation, or false for read-only operation.
* @param step true to move the cursor to the next record, or false for no move.
* @param hitp the pointer to the variable for the hit flag.
* @return true on success, or false on failure.
*/
bool accept_spec(Visitor* visitor, bool writable, bool step, bool* hitp) {
_assert_(visitor && hitp);
bool err = false;
bool hit = false;
char rstack[KCPDRECBUFSIZ];
size_t rsiz = sizeof(Record) + ksiz_;
char* rbuf = rsiz > sizeof(rstack) ? new char[rsiz] : rstack;
Record* rec = (Record*)rbuf;
rec->ksiz = ksiz_;
rec->vsiz = 0;
std::memcpy(rbuf + sizeof(*rec), kbuf_, ksiz_);
LeafNode* node = db_->load_leaf_node(lid_, false);
if (node) {
char lstack[KCPDRECBUFSIZ];
char* lbuf = NULL;
size_t lsiz = 0;
Link* link = NULL;
int64_t hist[LEVELMAX];
int32_t hnum = 0;
RecordArray& recs = node->recs;
if (!recs.empty()) {
Record* frec = recs.front();
Record* lrec = recs.back();
if (!db_->reccomp_(rec, frec) && !db_->reccomp_(lrec, rec)) {
typename RecordArray::iterator ritend = recs.end();
typename RecordArray::iterator rit = std::lower_bound(recs.begin(), ritend,
rec, db_->reccomp_);
if (rit != ritend) {
hit = true;
if (db_->reccomp_(rec, *rit)) {
clear_position();
set_position(*rit, node->id);
if (rbuf != rstack) delete[] rbuf;
rsiz = sizeof(Record) + ksiz_;
rbuf = rsiz > sizeof(rstack) ? new char[rsiz] : rstack;
rec = (Record*)rbuf;
rec->ksiz = ksiz_;
rec->vsiz = 0;
std::memcpy(rbuf + sizeof(*rec), kbuf_, ksiz_);
}
rec = *rit;
char* kbuf = (char*)rec + sizeof(*rec);
size_t ksiz = rec->ksiz;
size_t vsiz;
const char* vbuf = visitor->visit_full(kbuf, ksiz, kbuf + ksiz,
rec->vsiz, &vsiz);
if (vbuf == Visitor::REMOVE) {
rsiz = sizeof(*rec) + rec->ksiz + rec->vsiz;
db_->count_ -= 1;
db_->cusage_ -= rsiz;
node->size -= rsiz;
node->dirty = true;
if (recs.size() <= 1) {
lsiz = sizeof(Link) + ksiz;
lbuf = lsiz > sizeof(lstack) ? new char[lsiz] : lstack;
link = (Link*)lbuf;
link->child = 0;
link->ksiz = ksiz;
std::memcpy(lbuf + sizeof(*link), kbuf, ksiz);
}
xfree(rec);
if (back_) {
if (rit == recs.begin()) {
step = true;
} else {
typename RecordArray::iterator ritprev = rit - 1;
set_position(*ritprev, node->id);
step = false;
}
} else {
typename RecordArray::iterator ritnext = rit + 1;
if (ritnext == ritend) {
step = true;
} else {
clear_position();
set_position(*ritnext, node->id);
step = false;
}
}
recs.erase(rit);
} else if (vbuf != Visitor::NOP) {
int64_t diff = (int64_t)vsiz - (int64_t)rec->vsiz;
db_->cusage_ += diff;
node->size += diff;
node->dirty = true;
if (vsiz > rec->vsiz) {
*rit = (Record*)xrealloc(rec, sizeof(*rec) + rec->ksiz + vsiz);
rec = *rit;
kbuf = (char*)rec + sizeof(*rec);
}
std::memcpy(kbuf + rec->ksiz, vbuf, vsiz);
rec->vsiz = vsiz;
if (node->size > db_->psiz_ && recs.size() > 1) {
lsiz = sizeof(Link) + ksiz;
lbuf = lsiz > sizeof(lstack) ? new char[lsiz] : lstack;
link = (Link*)lbuf;
link->child = 0;
link->ksiz = ksiz;
std::memcpy(lbuf + sizeof(*link), kbuf, ksiz);
}
}
if (step) {
if (back_) {
if (rit != recs.begin()) {
--rit;
set_position(*rit, node->id);
step = false;
}
} else {
++rit;
if (rit != ritend) {
clear_position();
set_position(*rit, node->id);
step = false;
}
}
}
}
}
}
bool atran = db_->autotran_ && !db_->tran_ && node->dirty;
bool async = db_->autosync_ && !db_->autotran_ && !db_->tran_ && node->dirty;
if (hit && step) {
clear_position();
if (back_) {
set_position_back(node->prev);
} else {
set_position(node->next);
}
}
if (hit) {
bool flush = db_->cusage_ > db_->pccap_;
if (link || flush || async) {
int64_t id = node->id;
if (atran && !link && !db_->fix_auto_transaction_leaf(node)) err = true;
if (link) {
node = db_->search_tree(link, true, hist, &hnum);
if (node) {
if (!db_->reorganize_tree(node, hist, hnum)) err = true;
if (atran && !db_->tran_ && !db_->fix_auto_transaction_tree()) err = true;
} else {
db_->set_error(_KCCODELINE_, Error::BROKEN, "search failed");
err = true;
}
} else if (flush) {
int32_t idx = id % SLOTNUM;
LeafSlot* lslot = db_->lslots_ + idx;
if (!db_->flush_leaf_cache_part(lslot)) err = true;
InnerSlot* islot = db_->islots_ + idx;
if (islot->warm->count() > lslot->warm->count() + lslot->hot->count() + 1 &&
!db_->flush_inner_cache_part(islot)) err = true;
}
if (async && !db_->fix_auto_synchronization()) err = true;
} else {
if (!db_->fix_auto_transaction_leaf(node)) err = true;
}
}
if (lbuf != lstack) delete[] lbuf;
}
if (rbuf != rstack) delete[] rbuf;
*hitp = hit;
return !err;
}
/**
* Accept a visitor to the current record atomically.
* @param visitor a visitor object.
* @param step true to move the cursor to the next record, or false for no move.
* @param retryp the pointer to the variable for the retry flag.
* @return true on success, or false on failure.
*/
bool accept_atom(Visitor* visitor, bool step, bool *retryp) {
_assert_(visitor && retryp);
bool err = false;
bool reorg = false;
*retryp = false;
char lstack[KCPDRECBUFSIZ];
size_t lsiz = sizeof(Link) + ksiz_;
char* lbuf = lsiz > sizeof(lstack) ? new char[lsiz] : lstack;
Link* link = (Link*)lbuf;
link->child = 0;
link->ksiz = ksiz_;
std::memcpy(lbuf + sizeof(*link), kbuf_, ksiz_);
int64_t hist[LEVELMAX];
int32_t hnum = 0;
LeafNode* node = db_->search_tree(link, true, hist, &hnum);
if (!node) {
db_->set_error(_KCCODELINE_, Error::BROKEN, "search failed");
if (lbuf != lstack) delete[] lbuf;
return false;
}
if (node->recs.empty()) {
if (lbuf != lstack) delete[] lbuf;
clear_position();
if (!set_position(node->next)) return false;
node = db_->load_leaf_node(lid_, false);
if (!node) {
db_->set_error(_KCCODELINE_, Error::BROKEN, "search failed");
return false;
}
lsiz = sizeof(Link) + ksiz_;
char* lbuf = lsiz > sizeof(lstack) ? new char[lsiz] : lstack;
Link* link = (Link*)lbuf;
link->child = 0;
link->ksiz = ksiz_;
std::memcpy(lbuf + sizeof(*link), kbuf_, ksiz_);
node = db_->search_tree(link, true, hist, &hnum);
if (node->id != lid_) {
db_->set_error(_KCCODELINE_, Error::BROKEN, "invalid tree");
if (lbuf != lstack) delete[] lbuf;
return false;
}
}
char rstack[KCPDRECBUFSIZ];
size_t rsiz = sizeof(Record) + ksiz_;
char* rbuf = rsiz > sizeof(rstack) ? new char[rsiz] : rstack;
Record* rec = (Record*)rbuf;
rec->ksiz = ksiz_;
rec->vsiz = 0;
std::memcpy(rbuf + sizeof(*rec), kbuf_, ksiz_);
RecordArray& recs = node->recs;
typename RecordArray::iterator ritend = recs.end();
typename RecordArray::iterator rit = std::lower_bound(recs.begin(), ritend,
rec, db_->reccomp_);
if (rit != ritend) {
if (db_->reccomp_(rec, *rit)) {
clear_position();
set_position(*rit, node->id);
if (rbuf != rstack) delete[] rbuf;
rsiz = sizeof(Record) + ksiz_;
rbuf = rsiz > sizeof(rstack) ? new char[rsiz] : rstack;
rec = (Record*)rbuf;
rec->ksiz = ksiz_;
rec->vsiz = 0;
std::memcpy(rbuf + sizeof(*rec), kbuf_, ksiz_);
}
rec = *rit;
char* kbuf = (char*)rec + sizeof(*rec);
size_t ksiz = rec->ksiz;
size_t vsiz;
const char* vbuf = visitor->visit_full(kbuf, ksiz, kbuf + ksiz,
rec->vsiz, &vsiz);
if (vbuf == Visitor::REMOVE) {
rsiz = sizeof(*rec) + rec->ksiz + rec->vsiz;
db_->count_ -= 1;
db_->cusage_ -= rsiz;
node->size -= rsiz;
node->dirty = true;
xfree(rec);
step = false;
clear_position();
if (back_) {
if (rit == recs.begin()) {
set_position_back(node->prev);
} else {
typename RecordArray::iterator ritprev = rit - 1;
set_position(*ritprev, node->id);
}
} else {
typename RecordArray::iterator ritnext = rit + 1;
if (ritnext == ritend) {
set_position(node->next);
} else {
set_position(*ritnext, node->id);
}
}
recs.erase(rit);
if (recs.empty()) reorg = true;
} else if (vbuf != Visitor::NOP) {
int64_t diff = (int64_t)vsiz - (int64_t)rec->vsiz;
db_->cusage_ += diff;
node->size += diff;
node->dirty = true;
if (vsiz > rec->vsiz) {
*rit = (Record*)xrealloc(rec, sizeof(*rec) + rec->ksiz + vsiz);
rec = *rit;
kbuf = (char*)rec + sizeof(*rec);
}
std::memcpy(kbuf + rec->ksiz, vbuf, vsiz);
rec->vsiz = vsiz;
if (node->size > db_->psiz_ && recs.size() > 1) reorg = true;
}
if (step) {
clear_position();
if (back_) {
if (rit == recs.begin()) {
set_position_back(node->prev);
} else {
--rit;
set_position(*rit, node->id);
}
} else {
++rit;
if (rit == ritend) {
set_position(node->next);
} else {
set_position(*rit, node->id);
}
}
}
bool atran = db_->autotran_ && !db_->tran_ && node->dirty;
bool async = db_->autosync_ && !db_->autotran_ && !db_->tran_ && node->dirty;
if (atran && !reorg && !db_->fix_auto_transaction_leaf(node)) err = true;
if (reorg) {
if (!db_->reorganize_tree(node, hist, hnum)) err = true;
if (atran && !db_->fix_auto_transaction_tree()) err = true;
} else if (db_->cusage_ > db_->pccap_) {
int32_t idx = node->id % SLOTNUM;
LeafSlot* lslot = db_->lslots_ + idx;
if (!db_->flush_leaf_cache_part(lslot)) err = true;
InnerSlot* islot = db_->islots_ + idx;
if (islot->warm->count() > lslot->warm->count() + lslot->hot->count() + 1 &&
!db_->flush_inner_cache_part(islot)) err = true;
}
if (async && !db_->fix_auto_synchronization()) err = true;
} else {
int64_t lid = lid_;
clear_position();
if (back_) {
if (set_position_back(node->prev)) {
if (lid_ == lid) {
db_->set_error(_KCCODELINE_, Error::BROKEN, "invalid leaf node");
err = true;
} else {
*retryp = true;
}
} else {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
err = true;
}
} else {
if (set_position(node->next)) {
if (lid_ == lid) {
db_->set_error(_KCCODELINE_, Error::BROKEN, "invalid leaf node");
err = true;
} else {
*retryp = true;
}
} else {
db_->set_error(_KCCODELINE_, Error::NOREC, "no record");
err = true;
}
}
}
if (rbuf != rstack) delete[] rbuf;
if (lbuf != lstack) delete[] lbuf;
return !err;
}
/**
* Adjust the position to an existing record.
* @return true on success, or false on failure.
*/
bool adjust_position() {
_assert_(true);
char lstack[KCPDRECBUFSIZ];
size_t lsiz = sizeof(Link) + ksiz_;
char* lbuf = lsiz > sizeof(lstack) ? new char[lsiz] : lstack;
Link* link = (Link*)lbuf;
link->child = 0;
link->ksiz = ksiz_;
std::memcpy(lbuf + sizeof(*link), kbuf_, ksiz_);
int64_t hist[LEVELMAX];
int32_t hnum = 0;
LeafNode* node = db_->search_tree(link, true, hist, &hnum);
if (!node) {
db_->set_error(_KCCODELINE_, Error::BROKEN, "search failed");
if (lbuf != lstack) delete[] lbuf;
return false;
}
char rstack[KCPDRECBUFSIZ];
size_t rsiz = sizeof(Record) + ksiz_;
char* rbuf = rsiz > sizeof(rstack) ? new char[rsiz] : rstack;
Record* rec = (Record*)rbuf;
rec->ksiz = ksiz_;
rec->vsiz = 0;
std::memcpy(rbuf + sizeof(*rec), kbuf_, ksiz_);
bool err = false;
const RecordArray& recs = node->recs;
typename RecordArray::const_iterator ritend = node->recs.end();
typename RecordArray::const_iterator rit = std::lower_bound(recs.begin(), ritend,
rec, db_->reccomp_);
clear_position();
if (rit == ritend) {
if (!set_position(node->next)) err = true;
} else {
set_position(*rit, node->id);
}
if (rbuf != rstack) delete[] rbuf;
if (lbuf != lstack) delete[] lbuf;
return !err;
}
/**
* Back the position to the previous record speculatively.
* @param hitp the pointer to the variable for the hit flag.
* @return true on success, or false on failure.
*/
bool back_position_spec(bool* hitp) {
_assert_(hitp);
bool err = false;
bool hit = false;
char rstack[KCPDRECBUFSIZ];
size_t rsiz = sizeof(Record) + ksiz_;
char* rbuf = rsiz > sizeof(rstack) ? new char[rsiz] : rstack;
Record* rec = (Record*)rbuf;
rec->ksiz = ksiz_;
rec->vsiz = 0;
std::memcpy(rbuf + sizeof(*rec), kbuf_, ksiz_);
LeafNode* node = db_->load_leaf_node(lid_, false);
if (node) {
RecordArray& recs = node->recs;
if (!recs.empty()) {
Record* frec = recs.front();
Record* lrec = recs.back();
if (db_->reccomp_(rec, frec)) {
hit = true;
clear_position();
if (!set_position_back(node->prev)) err = true;
} else if (db_->reccomp_(lrec, rec)) {
} else {
hit = true;
typename RecordArray::iterator ritbeg = recs.begin();
typename RecordArray::iterator ritend = recs.end();
typename RecordArray::iterator rit = std::lower_bound(recs.begin(), ritend,
rec, db_->reccomp_);
clear_position();
if (rit == ritbeg) {
if (!set_position_back(node->prev)) err = true;
} else {
--rit;
set_position(*rit, node->id);
}
}
}
}
if (rbuf != rstack) delete[] rbuf;
*hitp = hit;
return !err;
}
/**
* Back the position to the previous record atomically.
* @return true on success, or false on failure.
*/
bool back_position_atom() {
_assert_(true);
char lstack[KCPDRECBUFSIZ];
size_t lsiz = sizeof(Link) + ksiz_;
char* lbuf = lsiz > sizeof(lstack) ? new char[lsiz] : lstack;
Link* link = (Link*)lbuf;
link->child = 0;
link->ksiz = ksiz_;
std::memcpy(lbuf + sizeof(*link), kbuf_, ksiz_);
int64_t hist[LEVELMAX];
int32_t hnum = 0;
LeafNode* node = db_->search_tree(link, true, hist, &hnum);
if (!node) {
db_->set_error(_KCCODELINE_, Error::BROKEN, "search failed");
if (lbuf != lstack) delete[] lbuf;
return false;
}
char rstack[KCPDRECBUFSIZ];
size_t rsiz = sizeof(Record) + ksiz_;
char* rbuf = rsiz > sizeof(rstack) ? new char[rsiz] : rstack;
Record* rec = (Record*)rbuf;
rec->ksiz = ksiz_;
rec->vsiz = 0;
std::memcpy(rbuf + sizeof(*rec), kbuf_, ksiz_);
bool err = false;
const RecordArray& recs = node->recs;
typename RecordArray::const_iterator ritbeg = node->recs.begin();
typename RecordArray::const_iterator ritend = node->recs.end();
typename RecordArray::const_iterator rit = std::lower_bound(recs.begin(), ritend,
rec, db_->reccomp_);
clear_position();
if (rit == ritbeg) {
if (!set_position_back(node->prev)) err = true;
} else if (rit == ritend) {
ritend--;
set_position(*ritend, node->id);
} else {
--rit;
set_position(*rit, node->id);
}
if (rbuf != rstack) delete[] rbuf;
if (lbuf != lstack) delete[] lbuf;
return !err;
}
/** Dummy constructor to forbid the use. */
Cursor(const Cursor&);
/** Dummy Operator to forbid the use. */
Cursor& operator =(const Cursor&);
/** The inner database. */
PlantDB* db_;
/** The stack buffer for the key. */
char stack_[KCPDRECBUFSIZ];
/** The pointer to the key region. */
char* kbuf_;
/** The size of the key region. */
size_t ksiz_;
/** The last visited leaf. */
int64_t lid_;
/** The backward flag. */
bool back_;
};
/**
* Tuning options.
*/
enum Option {
TSMALL = BASEDB::TSMALL, ///< use 32-bit addressing
TLINEAR = BASEDB::TLINEAR, ///< use linear collision chaining
TCOMPRESS = BASEDB::TCOMPRESS ///< compress each record
};
/**
* Status flags.
*/
enum Flag {
FOPEN = BASEDB::FOPEN, ///< whether opened
FFATAL = BASEDB::FFATAL ///< whether with fatal error
};
/**
* Default constructor.
*/
explicit PlantDB() :
mtrigger_(NULL), omode_(0), writer_(false), autotran_(false), autosync_(false),
db_(), curs_(), apow_(DEFAPOW), fpow_(DEFFPOW), opts_(0), bnum_(DEFBNUM),
psiz_(DEFPSIZ), pccap_(DEFPCCAP),
root_(0), first_(0), last_(0), lcnt_(0), icnt_(0), count_(0), cusage_(0),
lslots_(), islots_(), reccomp_(), linkcomp_(),
tran_(false), trclock_(0), trlcnt_(0), trcount_(0) {
_assert_(true);
}
/**
* Destructor.
* @note If the database is not closed, it is closed implicitly.
*/
virtual ~PlantDB() {
_assert_(true);
if (omode_ != 0) close();
if (!curs_.empty()) {
typename CursorList::const_iterator cit = curs_.begin();
typename CursorList::const_iterator citend = curs_.end();
while (cit != citend) {
Cursor* cur = *cit;
cur->db_ = NULL;
++cit;
}
}
}
/**
* Accept a visitor to a record.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
* @param visitor a visitor object.
* @param writable true for writable operation, or false for read-only operation.
* @return true on success, or false on failure.
* @note The operation for each record is performed atomically and other threads accessing the
* same record are blocked. To avoid deadlock, any explicit database operation must not be
* performed in this function.
*/
bool accept(const char* kbuf, size_t ksiz, Visitor* visitor, bool writable = true) {
_assert_(kbuf && ksiz <= MEMMAXSIZ && visitor);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (writable && !writer_) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
char lstack[KCPDRECBUFSIZ];
size_t lsiz = sizeof(Link) + ksiz;
char* lbuf = lsiz > sizeof(lstack) ? new char[lsiz] : lstack;
Link* link = (Link*)lbuf;
link->child = 0;
link->ksiz = ksiz;
std::memcpy(lbuf + sizeof(*link), kbuf, ksiz);
int64_t hist[LEVELMAX];
int32_t hnum = 0;
LeafNode* node = search_tree(link, true, hist, &hnum);
if (!node) {
set_error(_KCCODELINE_, Error::BROKEN, "search failed");
if (lbuf != lstack) delete[] lbuf;
return false;
}
char rstack[KCPDRECBUFSIZ];
size_t rsiz = sizeof(Record) + ksiz;
char* rbuf = rsiz > sizeof(rstack) ? new char[rsiz] : rstack;
Record* rec = (Record*)rbuf;
rec->ksiz = ksiz;
rec->vsiz = 0;
std::memcpy(rbuf + sizeof(*rec), kbuf, ksiz);
bool reorg = accept_impl(node, rec, visitor);
bool atran = autotran_ && !tran_ && node->dirty;
bool async = autosync_ && !autotran_ && !tran_ && node->dirty;
bool flush = false;
bool err = false;
int64_t id = node->id;
if (atran && !reorg && !fix_auto_transaction_leaf(node)) err = true;
if (cusage_ > pccap_) {
int32_t idx = id % SLOTNUM;
LeafSlot* lslot = lslots_ + idx;
if (!clean_leaf_cache_part(lslot)) err = true;
flush = true;
}
if (reorg) {
node = search_tree(link, false, hist, &hnum);
if (node) {
if (!reorganize_tree(node, hist, hnum)) err = true;
if (atran && !tran_ && !fix_auto_transaction_tree()) err = true;
}
} else if (flush) {
int32_t idx = id % SLOTNUM;
LeafSlot* lslot = lslots_ + idx;
if (!flush_leaf_cache_part(lslot)) err = true;
InnerSlot* islot = islots_ + idx;
if (islot->warm->count() > lslot->warm->count() + lslot->hot->count() + 1 &&
!flush_inner_cache_part(islot)) err = true;
}
if (rbuf != rstack) delete[] rbuf;
if (lbuf != lstack) delete[] lbuf;
if (async) {
if (!fix_auto_synchronization()) err = true;
}
return !err;
}
/**
* Accept a visitor to multiple records at once.
* @param keys specifies a string vector of the keys.
* @param visitor a visitor object.
* @param writable true for writable operation, or false for read-only operation.
* @return true on success, or false on failure.
* @note The operations for specified records are performed atomically and other threads
* accessing the same records are blocked. To avoid deadlock, any explicit database operation
* must not be performed in this function.
*/
bool accept_bulk(const std::vector& keys, Visitor* visitor,
bool writable = true) {
_assert_(visitor);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (writable && !writer_) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
ScopedVisitor svis(visitor);
if (keys.empty()) return true;
bool err = false;
std::vector::const_iterator kit = keys.begin();
std::vector::const_iterator kitend = keys.end();
while (!err && kit != kitend) {
const char* kbuf = kit->data();
size_t ksiz = kit->size();
char lstack[KCPDRECBUFSIZ];
size_t lsiz = sizeof(Link) + ksiz;
char* lbuf = lsiz > sizeof(lstack) ? new char[lsiz] : lstack;
Link* link = (Link*)lbuf;
link->child = 0;
link->ksiz = ksiz;
std::memcpy(lbuf + sizeof(*link), kbuf, ksiz);
int64_t hist[LEVELMAX];
int32_t hnum = 0;
LeafNode* node = search_tree(link, true, hist, &hnum);
if (!node) {
set_error(_KCCODELINE_, Error::BROKEN, "search failed");
if (lbuf != lstack) delete[] lbuf;
err = true;
break;
}
char rstack[KCPDRECBUFSIZ];
size_t rsiz = sizeof(Record) + ksiz;
char* rbuf = rsiz > sizeof(rstack) ? new char[rsiz] : rstack;
Record* rec = (Record*)rbuf;
rec->ksiz = ksiz;
rec->vsiz = 0;
std::memcpy(rbuf + sizeof(*rec), kbuf, ksiz);
bool reorg = accept_impl(node, rec, visitor);
bool atran = autotran_ && !tran_ && node->dirty;
bool async = autosync_ && !autotran_ && !tran_ && node->dirty;
if (atran && !reorg && !fix_auto_transaction_leaf(node)) err = true;
if (reorg) {
if (!reorganize_tree(node, hist, hnum)) err = true;
if (atran && !fix_auto_transaction_tree()) err = true;
} else if (cusage_ > pccap_) {
int32_t idx = node->id % SLOTNUM;
LeafSlot* lslot = lslots_ + idx;
if (!flush_leaf_cache_part(lslot)) err = true;
InnerSlot* islot = islots_ + idx;
if (islot->warm->count() > lslot->warm->count() + lslot->hot->count() + 1 &&
!flush_inner_cache_part(islot)) err = true;
}
if (rbuf != rstack) delete[] rbuf;
if (lbuf != lstack) delete[] lbuf;
if (async && !fix_auto_synchronization()) err = true;
++kit;
}
return !err;
}
/**
* Iterate to accept a visitor for each record.
* @param visitor a visitor object.
* @param writable true for writable operation, or false for read-only operation.
* @param checker a progress checker object. If it is NULL, no checking is performed.
* @return true on success, or false on failure.
* @note The whole iteration is performed atomically and other threads are blocked. To avoid
* deadlock, any explicit database operation must not be performed in this function.
*/
bool iterate(Visitor *visitor, bool writable = true, ProgressChecker* checker = NULL) {
_assert_(visitor);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (writable && !writer_) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
ScopedVisitor svis(visitor);
int64_t allcnt = count_;
if (checker && !checker->check("iterate", "beginning", 0, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
bool err = false;
bool atran = false;
if (autotran_ && writable && !tran_) {
if (begin_transaction_impl(autosync_)) {
atran = true;
} else {
err = true;
}
}
int64_t id = first_;
int64_t flcnt = 0;
int64_t curcnt = 0;
while (!err && id > 0) {
LeafNode* node = load_leaf_node(id, false);
if (!node) {
set_error(_KCCODELINE_, Error::BROKEN, "missing leaf node");
db_.report(_KCCODELINE_, Logger::WARN, "id=%lld", (long long)id);
return false;
}
id = node->next;
const RecordArray& recs = node->recs;
RecordArray keys;
keys.reserve(recs.size());
typename RecordArray::const_iterator rit = recs.begin();
typename RecordArray::const_iterator ritend = recs.end();
while (rit != ritend) {
Record* rec = *rit;
size_t rsiz = sizeof(*rec) + rec->ksiz;
char* dbuf = (char*)rec + sizeof(*rec);
Record* key = (Record*)xmalloc(rsiz);
key->ksiz = rec->ksiz;
key->vsiz = 0;
char* kbuf = (char*)key + sizeof(*key);
std::memcpy(kbuf, dbuf, rec->ksiz);
keys.push_back(key);
++rit;
}
typename RecordArray::const_iterator kit = keys.begin();
typename RecordArray::const_iterator kitend = keys.end();
bool reorg = false;
while (kit != kitend) {
Record* rec = *kit;
if (accept_impl(node, rec, visitor)) reorg = true;
curcnt++;
if (checker && !checker->check("iterate", "processing", curcnt, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
err = true;
break;
}
++kit;
}
if (reorg) {
Record* rec = keys.front();
char* dbuf = (char*)rec + sizeof(*rec);
char lstack[KCPDRECBUFSIZ];
size_t lsiz = sizeof(Link) + rec->ksiz;
char* lbuf = lsiz > sizeof(lstack) ? new char[lsiz] : lstack;
Link* link = (Link*)lbuf;
link->child = 0;
link->ksiz = rec->ksiz;
std::memcpy(lbuf + sizeof(*link), dbuf, rec->ksiz);
int64_t hist[LEVELMAX];
int32_t hnum = 0;
node = search_tree(link, false, hist, &hnum);
if (node) {
if (!reorganize_tree(node, hist, hnum)) err = true;
} else {
set_error(_KCCODELINE_, Error::BROKEN, "search failed");
err = true;
}
if (lbuf != lstack) delete[] lbuf;
}
if (cusage_ > pccap_) {
for (int32_t i = 0; i < SLOTNUM; i++) {
LeafSlot* lslot = lslots_ + i;
if (!flush_leaf_cache_part(lslot)) err = true;
}
InnerSlot* islot = islots_ + (flcnt++) % SLOTNUM;
if (islot->warm->count() > 2 && !flush_inner_cache_part(islot)) err = true;
}
kit = keys.begin();
while (kit != kitend) {
xfree(*kit);
++kit;
}
}
if (checker && !checker->check("iterate", "ending", -1, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
err = true;
}
if (atran && !commit_transaction()) err = true;
if (autosync_ && !autotran_ && writable && !fix_auto_synchronization()) err = true;
trigger_meta(MetaTrigger::ITERATE, "iterate");
return !err;
}
/**
* Scan each record in parallel.
* @param visitor a visitor object.
* @param thnum the number of worker threads.
* @param checker a progress checker object. If it is NULL, no checking is performed.
* @return true on success, or false on failure.
* @note This function is for reading records and not for updating ones. The return value of
* the visitor is just ignored. To avoid deadlock, any explicit database operation must not
* be performed in this function.
*/
bool scan_parallel(Visitor *visitor, size_t thnum, ProgressChecker* checker = NULL) {
_assert_(visitor && thnum <= MEMMAXSIZ);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (thnum < 1) thnum = 0;
if (thnum > (size_t)INT8MAX) thnum = INT8MAX;
bool err = false;
if (writer_) {
if (checker && !checker->check("scan_parallel", "cleaning the leaf node cache", -1, -1)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
if (!clean_leaf_cache()) err = true;
}
ScopedVisitor svis(visitor);
int64_t allcnt = count_;
if (checker && !checker->check("scan_parallel", "beginning", 0, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
class ProgressCheckerImpl : public ProgressChecker {
public:
explicit ProgressCheckerImpl() : ok_(1) {}
void stop() {
ok_.set(0);
}
private:
bool check(const char* name, const char* message, int64_t curcnt, int64_t allcnt) {
return ok_ > 0;
}
AtomicInt64 ok_;
};
ProgressCheckerImpl ichecker;
class VisitorImpl : public Visitor {
public:
explicit VisitorImpl(PlantDB* db, Visitor* visitor,
ProgressChecker* checker, int64_t allcnt,
ProgressCheckerImpl* ichecker) :
db_(db), visitor_(visitor), checker_(checker), allcnt_(allcnt),
ichecker_(ichecker), error_() {}
const Error& error() {
return error_;
}
private:
const char* visit_full(const char* kbuf, size_t ksiz,
const char* vbuf, size_t vsiz, size_t* sp) {
if (ksiz < 2 || ksiz >= NUMBUFSIZ || kbuf[0] != LNPREFIX) return NOP;
uint64_t prev;
size_t step = readvarnum(vbuf, vsiz, &prev);
if (step < 1) return NOP;
vbuf += step;
vsiz -= step;
uint64_t next;
step = readvarnum(vbuf, vsiz, &next);
if (step < 1) return NOP;
vbuf += step;
vsiz -= step;
while (vsiz > 1) {
uint64_t rksiz;
step = readvarnum(vbuf, vsiz, &rksiz);
if (step < 1) break;
vbuf += step;
vsiz -= step;
uint64_t rvsiz;
step = readvarnum(vbuf, vsiz, &rvsiz);
if (step < 1) break;
vbuf += step;
vsiz -= step;
if (vsiz < rksiz + rvsiz) break;
size_t xvsiz;
visitor_->visit_full(vbuf, rksiz, vbuf + rksiz, rvsiz, &xvsiz);
vbuf += rksiz;
vsiz -= rksiz;
vbuf += rvsiz;
vsiz -= rvsiz;
if (checker_ && !checker_->check("scan_parallel", "processing", -1, allcnt_)) {
db_->set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
error_ = db_->error();
ichecker_->stop();
break;
}
}
return NOP;
}
PlantDB* db_;
Visitor* visitor_;
ProgressChecker* checker_;
int64_t allcnt_;
ProgressCheckerImpl* ichecker_;
Error error_;
};
VisitorImpl ivisitor(this, visitor, checker, allcnt, &ichecker);
if (!db_.scan_parallel(&ivisitor, thnum, &ichecker)) err = true;
if (ivisitor.error() != Error::SUCCESS) {
const Error& e = ivisitor.error();
db_.set_error(_KCCODELINE_, e.code(), e.message());
err = true;
}
if (checker && !checker->check("scan_parallel", "ending", -1, allcnt)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
err = true;
}
trigger_meta(MetaTrigger::ITERATE, "scan_parallel");
return !err;
}
/**
* Get the last happened error.
* @return the last happened error.
*/
Error error() const {
_assert_(true);
return db_.error();
}
/**
* Set the error information.
* @param file the file name of the program source code.
* @param line the line number of the program source code.
* @param func the function name of the program source code.
* @param code an error code.
* @param message a supplement message.
*/
void set_error(const char* file, int32_t line, const char* func,
Error::Code code, const char* message) {
_assert_(file && line > 0 && func && message);
db_.set_error(file, line, func, code, message);
}
/**
* Open a database file.
* @param path the path of a database file.
* @param mode the connection mode. BasicDB::OWRITER as a writer, BasicDB::OREADER as a
* reader. The following may be added to the writer mode by bitwise-or: BasicDB::OCREATE,
* which means it creates a new database if the file does not exist, BasicDB::OTRUNCATE, which
* means it creates a new database regardless if the file exists, BasicDB::OAUTOTRAN, which
* means each updating operation is performed in implicit transaction, BasicDB::OAUTOSYNC,
* which means each updating operation is followed by implicit synchronization with the file
* system. The following may be added to both of the reader mode and the writer mode by
* bitwise-or: BasicDB::ONOLOCK, which means it opens the database file without file locking,
* BasicDB::OTRYLOCK, which means locking is performed without blocking, BasicDB::ONOREPAIR,
* which means the database file is not repaired implicitly even if file destruction is
* detected.
* @return true on success, or false on failure.
* @note Every opened database must be closed by the BasicDB::close method when it is no
* longer in use. It is not allowed for two or more database objects in the same process to
* keep their connections to the same database file at the same time.
*/
bool open(const std::string& path, uint32_t mode = OWRITER | OCREATE) {
_assert_(true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
report(_KCCODELINE_, Logger::DEBUG, "opening the database (path=%s)", path.c_str());
if (DBTYPE == TYPEGRASS) {
mode &= ~OREADER;
mode |= OWRITER | OCREATE;
}
writer_ = false;
autotran_ = false;
autosync_ = false;
if (mode & OWRITER) {
writer_ = true;
if (mode & OAUTOTRAN) autotran_ = true;
if (mode & OAUTOSYNC) autosync_ = true;
}
if (!db_.tune_type(DBTYPE)) return false;
if (!db_.tune_alignment(apow_)) return false;
if (!db_.tune_fbp(fpow_)) return false;
if (!db_.tune_options(opts_)) return false;
if (!db_.tune_buckets(bnum_)) return false;
if (!db_.open(path, mode)) return false;
if (db_.type() != DBTYPE) {
set_error(_KCCODELINE_, Error::INVALID, "invalid database type");
db_.close();
return false;
}
if (db_.reorganized()) {
if (!reorganize_file(mode)) return false;
} else if (db_.recovered()) {
if (!writer_) {
if (!db_.close()) return false;
uint32_t tmode = (mode & ~OREADER) | OWRITER;
if (!db_.open(path, tmode)) return false;
}
if (!recalc_count()) return false;
if (!writer_) {
if (!db_.close()) return false;
if (!db_.open(path, mode)) return false;
}
if (count_ == INT64MAX && !reorganize_file(mode)) return false;
}
if (writer_ && db_.count() < 1) {
root_ = 0;
first_ = 0;
last_ = 0;
count_ = 0;
create_leaf_cache();
create_inner_cache();
lcnt_ = 0;
create_leaf_node(0, 0);
root_ = 1;
first_ = 1;
last_ = 1;
lcnt_ = 1;
icnt_ = 0;
count_ = 0;
if (!reccomp_.comp) reccomp_.comp = LEXICALCOMP;
if (!dump_meta() || !flush_leaf_cache(true) || !load_meta()) {
delete_inner_cache();
delete_leaf_cache();
db_.close();
return false;
}
} else {
if (!load_meta()) {
db_.close();
return false;
}
create_leaf_cache();
create_inner_cache();
}
if (psiz_ < 1 || root_ < 1 || first_ < 1 || last_ < 1 ||
lcnt_ < 1 || icnt_ < 0 || count_ < 0 || bnum_ < 1) {
set_error(_KCCODELINE_, Error::BROKEN, "invalid meta data");
db_.report(_KCCODELINE_, Logger::WARN, "psiz=%lld root=%lld first=%lld last=%lld"
" lcnt=%lld icnt=%lld count=%lld bnum=%lld",
(long long)psiz_, (long long)root_, (long long)first_, (long long)last_,
(long long)lcnt_, (long long)icnt_, (long long)count_, (long long)bnum_);
delete_inner_cache();
delete_leaf_cache();
db_.close();
return false;
}
omode_ = mode;
cusage_ = 0;
tran_ = false;
trclock_ = 0;
trigger_meta(MetaTrigger::OPEN, "open");
return true;
}
/**
* Close the database file.
* @return true on success, or false on failure.
*/
bool close() {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
const std::string& path = db_.path();
report(_KCCODELINE_, Logger::DEBUG, "closing the database (path=%s)", path.c_str());
bool err = false;
disable_cursors();
int64_t lsiz = calc_leaf_cache_size();
int64_t isiz = calc_inner_cache_size();
if (cusage_ != lsiz + isiz) {
set_error(_KCCODELINE_, Error::BROKEN, "invalid cache usage");
db_.report(_KCCODELINE_, Logger::WARN, "cusage=%lld lsiz=%lld isiz=%lld",
(long long)cusage_, (long long)lsiz, (long long)isiz);
err = true;
}
if (!flush_leaf_cache(true)) err = true;
if (!flush_inner_cache(true)) err = true;
lsiz = calc_leaf_cache_size();
isiz = calc_inner_cache_size();
int64_t lcnt = calc_leaf_cache_count();
int64_t icnt = calc_inner_cache_count();
if (cusage_ != 0 || lsiz != 0 || isiz != 0 || lcnt != 0 || icnt != 0) {
set_error(_KCCODELINE_, Error::BROKEN, "remaining cache");
db_.report(_KCCODELINE_, Logger::WARN, "cusage=%lld lsiz=%lld isiz=%lld"
" lcnt=%lld icnt=%lld", (long long)cusage_, (long long)lsiz, (long long)isiz,
(long long)lcnt, (long long)icnt);
err = true;
}
delete_inner_cache();
delete_leaf_cache();
if (writer_ && !dump_meta()) err = true;
if (!db_.close()) err = true;
omode_ = 0;
trigger_meta(MetaTrigger::CLOSE, "close");
return !err;
}
/**
* Synchronize updated contents with the file and the device.
* @param hard true for physical synchronization with the device, or false for logical
* synchronization with the file system.
* @param proc a postprocessor object. If it is NULL, no postprocessing is performed.
* @param checker a progress checker object. If it is NULL, no checking is performed.
* @return true on success, or false on failure.
* @note The operation of the postprocessor is performed atomically and other threads accessing
* the same record are blocked. To avoid deadlock, any explicit database operation must not
* be performed in this function.
*/
bool synchronize(bool hard = false, FileProcessor* proc = NULL,
ProgressChecker* checker = NULL) {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
bool err = false;
if (writer_) {
if (checker && !checker->check("synchronize", "cleaning the leaf node cache", -1, -1)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
if (!clean_leaf_cache()) err = true;
if (checker && !checker->check("synchronize", "cleaning the inner node cache", -1, -1)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
if (!clean_inner_cache()) err = true;
if (checker && !checker->check("synchronize", "flushing the leaf node cache", -1, -1)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
if (!flush_leaf_cache(true)) err = true;
if (checker && !checker->check("synchronize", "flushing the inner node cache", -1, -1)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
if (!flush_inner_cache(true)) err = true;
if (checker && !checker->check("synchronize", "dumping the meta data", -1, -1)) {
set_error(_KCCODELINE_, Error::LOGIC, "checker failed");
return false;
}
if (!dump_meta()) err = true;
}
class Wrapper : public FileProcessor {
public:
Wrapper(FileProcessor* proc, int64_t count) : proc_(proc), count_(count) {}
private:
bool process(const std::string& path, int64_t count, int64_t size) {
if (proc_) return proc_->process(path, count_, size);
return true;
}
FileProcessor* proc_;
int64_t count_;
} wrapper(proc, count_);
if (!db_.synchronize(hard, &wrapper, checker)) err = true;
trigger_meta(MetaTrigger::SYNCHRONIZE, "synchronize");
return !err;
}
/**
* Occupy database by locking and do something meanwhile.
* @param writable true to use writer lock, or false to use reader lock.
* @param proc a processor object. If it is NULL, no processing is performed.
* @return true on success, or false on failure.
* @note The operation of the processor is performed atomically and other threads accessing
* the same record are blocked. To avoid deadlock, any explicit database operation must not
* be performed in this function.
*/
bool occupy(bool writable = true, FileProcessor* proc = NULL) {
_assert_(true);
bool err = false;
if (proc && !proc->process(db_.path(), count_, db_.size())) {
set_error(_KCCODELINE_, Error::LOGIC, "processing failed");
err = true;
}
trigger_meta(MetaTrigger::OCCUPY, "occupy");
return !err;
}
/**
* Begin transaction.
* @param hard true for physical synchronization with the device, or false for logical
* synchronization with the file system.
* @return true on success, or false on failure.
*/
bool begin_transaction(bool hard = false) {
_assert_(true);
uint32_t wcnt = 0;
while (true) {
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (!writer_) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
if (!tran_) break;
if (wcnt >= LOCKBUSYLOOP) {
Thread::chill();
} else {
Thread::yield();
wcnt++;
}
}
if (!begin_transaction_impl(hard)) {
return false;
}
tran_ = true;
trigger_meta(MetaTrigger::BEGINTRAN, "begin_transaction");
return true;
}
/**
* Try to begin transaction.
* @param hard true for physical synchronization with the device, or false for logical
* synchronization with the file system.
* @return true on success, or false on failure.
*/
bool begin_transaction_try(bool hard = false) {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (!writer_) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
if (tran_) {
set_error(_KCCODELINE_, Error::LOGIC, "competition avoided");
return false;
}
if (!begin_transaction_impl(hard)) {
return false;
}
tran_ = true;
trigger_meta(MetaTrigger::BEGINTRAN, "begin_transaction_try");
return true;
}
/**
* End transaction.
* @param commit true to commit the transaction, or false to abort the transaction.
* @return true on success, or false on failure.
*/
bool end_transaction(bool commit = true) {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (!tran_) {
set_error(_KCCODELINE_, Error::INVALID, "not in transaction");
return false;
}
bool err = false;
if (commit) {
if (!commit_transaction()) err = true;
} else {
if (!abort_transaction()) err = true;
}
tran_ = false;
trigger_meta(commit ? MetaTrigger::COMMITTRAN : MetaTrigger::ABORTTRAN, "end_transaction");
return !err;
}
/**
* Remove all records.
* @return true on success, or false on failure.
*/
bool clear() {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (!writer_) {
set_error(_KCCODELINE_, Error::NOPERM, "permission denied");
return false;
}
disable_cursors();
flush_leaf_cache(false);
flush_inner_cache(false);
bool err = false;
if (!db_.clear()) err = true;
lcnt_ = 0;
create_leaf_node(0, 0);
root_ = 1;
first_ = 1;
last_ = 1;
lcnt_ = 1;
icnt_ = 0;
count_ = 0;
if (!dump_meta()) err = true;
if (!flush_leaf_cache(true)) err = true;
cusage_ = 0;
trigger_meta(MetaTrigger::CLEAR, "clear");
return !err;
}
/**
* Get the number of records.
* @return the number of records, or -1 on failure.
*/
int64_t count() {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return -1;
}
return count_;
}
/**
* Get the size of the database file.
* @return the size of the database file in bytes, or -1 on failure.
*/
int64_t size() {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return -1;
}
return db_.size();
}
/**
* Get the path of the database file.
* @return the path of the database file, or an empty string on failure.
*/
std::string path() {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return "";
}
return db_.path();
}
/**
* Get the miscellaneous status information.
* @param strmap a string map to contain the result.
* @return true on success, or false on failure.
*/
bool status(std::map* strmap) {
_assert_(strmap);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
if (!db_.status(strmap)) return false;
(*strmap)["type"] = strprintf("%u", (unsigned)DBTYPE);
(*strmap)["psiz"] = strprintf("%d", psiz_);
(*strmap)["pccap"] = strprintf("%lld", (long long)pccap_);
const char* compname = "external";
if (reccomp_.comp == LEXICALCOMP) {
compname = "lexical";
} else if (reccomp_.comp == DECIMALCOMP) {
compname = "decimal";
} else if (reccomp_.comp == LEXICALDESCCOMP) {
compname = "lexicaldesc";
} else if (reccomp_.comp == DECIMALDESCCOMP) {
compname = "decimaldesc";
}
(*strmap)["rcomp"] = compname;
(*strmap)["root"] = strprintf("%lld", (long long)root_);
(*strmap)["first"] = strprintf("%lld", (long long)first_);
(*strmap)["last"] = strprintf("%lld", (long long)last_);
(*strmap)["lcnt"] = strprintf("%lld", (long long)lcnt_);
(*strmap)["icnt"] = strprintf("%lld", (long long)icnt_);
(*strmap)["count"] = strprintf("%lld", (long long)count_);
(*strmap)["bnum"] = strprintf("%lld", (long long)bnum_);
(*strmap)["pnum"] = strprintf("%lld", (long long)db_.count());
(*strmap)["cusage"] = strprintf("%lld", (long long)cusage_);
if (strmap->count("cusage_lcnt") > 0)
(*strmap)["cusage_lcnt"] = strprintf("%lld", (long long)calc_leaf_cache_count());
if (strmap->count("cusage_lsiz") > 0)
(*strmap)["cusage_lsiz"] = strprintf("%lld", (long long)calc_leaf_cache_size());
if (strmap->count("cusage_icnt") > 0)
(*strmap)["cusage_icnt"] = strprintf("%lld", (long long)calc_inner_cache_count());
if (strmap->count("cusage_isiz") > 0)
(*strmap)["cusage_isiz"] = strprintf("%lld", (long long)calc_inner_cache_size());
if (strmap->count("tree_level") > 0) {
Link link;
link.ksiz = 0;
int64_t hist[LEVELMAX];
int32_t hnum = 0;
search_tree(&link, false, hist, &hnum);
(*strmap)["tree_level"] = strprintf("%d", hnum + 1);
}
return true;
}
/**
* Create a cursor object.
* @return the return value is the created cursor object.
* @note Because the object of the return value is allocated by the constructor, it should be
* released with the delete operator when it is no longer in use.
*/
Cursor* cursor() {
_assert_(true);
return new Cursor(this);
}
/**
* Write a log message.
* @param file the file name of the program source code.
* @param line the line number of the program source code.
* @param func the function name of the program source code.
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
* @param message the supplement message.
*/
void log(const char* file, int32_t line, const char* func, Logger::Kind kind,
const char* message) {
_assert_(file && line > 0 && func && message);
db_.log(file, line, func, kind, message);
}
/**
* Set the internal logger.
* @param logger the logger object.
* @param kinds kinds of logged messages by bitwise-or: Logger::DEBUG for debugging,
* Logger::INFO for normal information, Logger::WARN for warning, and Logger::ERROR for fatal
* error.
* @return true on success, or false on failure.
*/
bool tune_logger(Logger* logger, uint32_t kinds = Logger::WARN | Logger::ERROR) {
_assert_(logger);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
return db_.tune_logger(logger, kinds);
}
/**
* Set the internal meta operation trigger.
* @param trigger the trigger object.
* @return true on success, or false on failure.
*/
bool tune_meta_trigger(MetaTrigger* trigger) {
_assert_(trigger);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
mtrigger_ = trigger;
return true;
}
/**
* Set the power of the alignment of record size.
* @param apow the power of the alignment of record size.
* @return true on success, or false on failure.
*/
bool tune_alignment(int8_t apow) {
_assert_(true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
apow_ = apow >= 0 ? apow : DEFAPOW;
return true;
}
/**
* Set the power of the capacity of the free block pool.
* @param fpow the power of the capacity of the free block pool.
* @return true on success, or false on failure.
*/
bool tune_fbp(int8_t fpow) {
_assert_(true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
fpow_ = fpow >= 0 ? fpow : DEFFPOW;
return true;
}
/**
* Set the optional features.
* @param opts the optional features by bitwise-or: BasicDB::TSMALL to use 32-bit addressing,
* BasicDB::TLINEAR to use linear collision chaining, BasicDB::TCOMPRESS to compress each
* record.
* @return true on success, or false on failure.
*/
bool tune_options(int8_t opts) {
_assert_(true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
opts_ = opts;
return true;
}
/**
* Set the number of buckets of the hash table.
* @param bnum the number of buckets of the hash table.
* @return true on success, or false on failure.
*/
bool tune_buckets(int64_t bnum) {
_assert_(true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
bnum_ = bnum > 0 ? bnum : DEFBNUM;
return true;
}
/**
* Set the size of each page.
* @param psiz the size of each page.
* @return true on success, or false on failure.
*/
bool tune_page(int32_t psiz) {
_assert_(true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
psiz_ = psiz > 0 ? psiz : DEFPSIZ;
return true;
}
/**
* Set the size of the internal memory-mapped region.
* @param msiz the size of the internal memory-mapped region.
* @return true on success, or false on failure.
*/
bool tune_map(int64_t msiz) {
_assert_(true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
return db_.tune_map(msiz);
}
/**
* Set the unit step number of auto defragmentation.
* @param dfunit the unit step number of auto defragmentation.
* @return true on success, or false on failure.
*/
bool tune_defrag(int64_t dfunit) {
_assert_(true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
return db_.tune_defrag(dfunit);
}
/**
* Set the capacity size of the page cache.
* @param pccap the capacity size of the page cache.
* @return true on success, or false on failure.
*/
bool tune_page_cache(int64_t pccap) {
_assert_(true);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
pccap_ = pccap > 0 ? pccap : DEFPCCAP;
return true;
}
/**
* Set the data compressor.
* @param comp the data compressor object.
* @return true on success, or false on failure.
*/
bool tune_compressor(Compressor* comp) {
_assert_(comp);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
return db_.tune_compressor(comp);
}
/**
* Set the record comparator.
* @param rcomp the record comparator object.
* @return true on success, or false on failure.
* @note Several built-in comparators are provided. LEXICALCOMP for the default lexical
* comparator. DECIMALCOMP for the decimal comparator. LEXICALDESCCOMP for the lexical
* descending comparator. DECIMALDESCCOMP for the lexical descending comparator.
*/
bool tune_comparator(Comparator* rcomp) {
_assert_(rcomp);
if (omode_ != 0) {
set_error(_KCCODELINE_, Error::INVALID, "already opened");
return false;
}
reccomp_.comp = rcomp;
return true;
}
/**
* Get the opaque data.
* @return the pointer to the opaque data region, whose size is 16 bytes.
*/
char* opaque() {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return NULL;
}
return db_.opaque();
}
/**
* Synchronize the opaque data.
* @return true on success, or false on failure.
*/
bool synchronize_opaque() {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
return db_.synchronize_opaque();
}
/**
* Perform defragmentation of the file.
* @param step the number of steps. If it is not more than 0, the whole region is defraged.
* @return true on success, or false on failure.
*/
bool defrag(int64_t step = 0) {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return false;
}
bool err = false;
if (step < 1 && writer_) {
if (!clean_leaf_cache()) err = true;
if (!clean_inner_cache()) err = true;
}
if (!db_.defrag(step)) err = true;
return !err;
}
/**
* Get the status flags.
* @return the status flags, or 0 on failure.
*/
uint8_t flags() {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return 0;
}
return db_.flags();
}
/**
* Get the record comparator.
* @return the record comparator object.
*/
Comparator* rcomp() {
_assert_(true);
if (omode_ == 0) {
set_error(_KCCODELINE_, Error::INVALID, "not opened");
return 0;
}
return reccomp_.comp;
}
protected:
/**
* Report a message for debugging.
* @param file the file name of the program source code.
* @param line the line number of the program source code.
* @param func the function name of the program source code.
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
* @param format the printf-like format string.
* @param ... used according to the format string.
*/
void report(const char* file, int32_t line, const char* func, Logger::Kind kind,
const char* format, ...) {
_assert_(file && line > 0 && func && format);
va_list ap;
va_start(ap, format);
db_.report_valist(file, line, func, kind, format, ap);
va_end(ap);
}
/**
* Report a message for debugging with variable number of arguments.
* @param file the file name of the program source code.
* @param line the line number of the program source code.
* @param func the function name of the program source code.
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
* @param format the printf-like format string.
* @param ap used according to the format string.
*/
void report_valist(const char* file, int32_t line, const char* func, Logger::Kind kind,
const char* format, va_list ap) {
_assert_(file && line > 0 && func && format);
db_.report_valist(file, line, func, kind, format, ap);
}
/**
* Report the content of a binary buffer for debugging.
* @param file the file name of the epicenter.
* @param line the line number of the epicenter.
* @param func the function name of the program source code.
* @param kind the kind of the event. Logger::DEBUG for debugging, Logger::INFO for normal
* information, Logger::WARN for warning, and Logger::ERROR for fatal error.
* @param name the name of the information.
* @param buf the binary buffer.
* @param size the size of the binary buffer
*/
void report_binary(const char* file, int32_t line, const char* func, Logger::Kind kind,
const char* name, const char* buf, size_t size) {
_assert_(file && line > 0 && func && name && buf && size <= MEMMAXSIZ);
db_.report_binary(file, line, func, kind, name, buf, size);
}
/**
* Trigger a meta database operation.
* @param kind the kind of the event. MetaTrigger::OPEN for opening, MetaTrigger::CLOSE for
* closing, MetaTrigger::CLEAR for clearing, MetaTrigger::ITERATE for iteration,
* MetaTrigger::SYNCHRONIZE for synchronization, MetaTrigger::BEGINTRAN for beginning
* transaction, MetaTrigger::COMMITTRAN for committing transaction, MetaTrigger::ABORTTRAN
* for aborting transaction, and MetaTrigger::MISC for miscellaneous operations.
* @param message the supplement message.
*/
void trigger_meta(MetaTrigger::Kind kind, const char* message) {
_assert_(message);
if (mtrigger_) mtrigger_->trigger(kind, message);
}
private:
/**
* Record data.
*/
struct Record {
uint32_t ksiz; ///< size of the key
uint32_t vsiz; ///< size of the value
};
/**
* Comparator for records.
*/
struct RecordComparator {
Comparator* comp; ///< comparator
/** constructor */
explicit RecordComparator() : comp(NULL) {}
/** comparing operator */
bool operator ()(const Record* const& a, const Record* const& b) const {
_assert_(true);
char* akbuf = (char*)a + sizeof(*a);
char* bkbuf = (char*)b + sizeof(*b);
return comp->compare(akbuf, a->ksiz, bkbuf, b->ksiz) < 0;
}
};
/**
* Leaf node of B+ tree.
*/
struct LeafNode {
int64_t id; ///< page ID number
RecordArray recs; ///< sorted array of records
int64_t size; ///< total size of records
int64_t prev; ///< previous leaf node
int64_t next; ///< next leaf node
bool hot; ///< whether in the hot cache
bool dirty; ///< whether to be written back
bool dead; ///< whether to be removed
};
/**
* Link to a node.
*/
struct Link {
int64_t child; ///< child node
int32_t ksiz; ///< size of the key
};
/**
* Comparator for links.
*/
struct LinkComparator {
Comparator* comp; ///< comparator
/** constructor */
explicit LinkComparator() : comp(NULL) {
_assert_(true);
}
/** comparing operator */
bool operator ()(const Link* const& a, const Link* const& b) const {
_assert_(true);
char* akbuf = (char*)a + sizeof(*a);
char* bkbuf = (char*)b + sizeof(*b);
return comp->compare(akbuf, a->ksiz, bkbuf, b->ksiz) < 0;
}
};
/**
* Inner node of B+ tree.
*/
struct InnerNode {
int64_t id; ///< page ID numger
int64_t heir; ///< child before the first link
LinkArray links; ///< sorted array of links
int64_t size; ///< total size of links
bool dirty; ///< whether to be written back
bool dead; ///< whether to be removed
};
/**
* Slot cache of leaf nodes.
*/
struct LeafSlot {
Mutex lock; ///< lock
LeafCache* hot; ///< hot cache
LeafCache* warm; ///< warm cache
};
/**
* Slot cache of inner nodes.
*/
struct InnerSlot {
Mutex lock; ///< lock
InnerCache* warm; ///< warm cache
};
/**
* Scoped visitor.
*/
class ScopedVisitor {
public:
/** constructor */
explicit ScopedVisitor(Visitor* visitor) : visitor_(visitor) {
_assert_(visitor);
visitor_->visit_before();
}
/** destructor */
~ScopedVisitor() {
_assert_(true);
visitor_->visit_after();
}
private:
Visitor* visitor_; ///< visitor
};
/**
* Open the leaf cache.
*/
void create_leaf_cache() {
_assert_(true);
int64_t bnum = bnum_ / SLOTNUM + 1;
if (bnum < INT8MAX) bnum = INT8MAX;
bnum = nearbyprime(bnum);
for (int32_t i = 0; i < SLOTNUM; i++) {
lslots_[i].hot = new LeafCache(bnum);
lslots_[i].warm = new LeafCache(bnum);
}
}
/**
* Close the leaf cache.
*/
void delete_leaf_cache() {
_assert_(true);
for (int32_t i = SLOTNUM - 1; i >= 0; i--) {
LeafSlot* slot = lslots_ + i;
delete slot->warm;
delete slot->hot;
}
}
/**
* Remove all leaf nodes from the leaf cache.
* @param save whether to save dirty nodes.
* @return true on success, or false on failure.
*/
bool flush_leaf_cache(bool save) {
_assert_(true);
bool err = false;
for (int32_t i = SLOTNUM - 1; i >= 0; i--) {
LeafSlot* slot = lslots_ + i;
typename LeafCache::Iterator it = slot->warm->begin();
typename LeafCache::Iterator itend = slot->warm->end();
while (it != itend) {
LeafNode* node = it.value();
++it;
if (!flush_leaf_node(node, save)) err = true;
}
it = slot->hot->begin();
itend = slot->hot->end();
while (it != itend) {
LeafNode* node = it.value();
++it;
if (!flush_leaf_node(node, save)) err = true;
}
}
return !err;
}
/**
* Flush a part of the leaf cache.
* @param slot a slot of leaf nodes.
* @return true on success, or false on failure.
*/
bool flush_leaf_cache_part(LeafSlot* slot) {
_assert_(slot);
bool err = false;
if (slot->warm->count() > 0) {
LeafNode* node = slot->warm->first_value();
if (!flush_leaf_node(node, true)) err = true;
} else if (slot->hot->count() > 0) {
LeafNode* node = slot->hot->first_value();
if (!flush_leaf_node(node, true)) err = true;
}
return !err;
}
/**
* Clean all of the leaf cache.
* @return true on success, or false on failure.
*/
bool clean_leaf_cache() {
_assert_(true);
bool err = false;
for (int32_t i = 0; i < SLOTNUM; i++) {
LeafSlot* slot = lslots_ + i;
ScopedMutex lock(&slot->lock);
typename LeafCache::Iterator it = slot->warm->begin();
typename LeafCache::Iterator itend = slot->warm->end();
while (it != itend) {
LeafNode* node = it.value();
if (!save_leaf_node(node)) err = true;
++it;
}
it = slot->hot->begin();
itend = slot->hot->end();
while (it != itend) {
LeafNode* node = it.value();
if (!save_leaf_node(node)) err = true;
++it;
}
}
return !err;
}
/**
* Clean a part of the leaf cache.
* @param slot a slot of leaf nodes.
* @return true on success, or false on failure.
*/
bool clean_leaf_cache_part(LeafSlot* slot) {
_assert_(slot);
bool err = false;
ScopedMutex lock(&slot->lock);
if (slot->warm->count() > 0) {
LeafNode* node = slot->warm->first_value();
if (!save_leaf_node(node)) err = true;
} else if (slot->hot->count() > 0) {
LeafNode* node = slot->hot->first_value();
if (!save_leaf_node(node)) err = true;
}
return !err;
}
/**
* Create a new leaf node.
* @param prev the ID of the previous node.
* @param next the ID of the next node.
* @return the created leaf node.
*/
LeafNode* create_leaf_node(int64_t prev, int64_t next) {
_assert_(true);
LeafNode* node = new LeafNode;
node->id = ++lcnt_;
node->size = sizeof(int32_t) * 2;
node->recs.reserve(DEFLINUM);
node->prev = prev;
node->next = next;
node->hot = false;
node->dirty = true;
node->dead = false;
int32_t sidx = node->id % SLOTNUM;
LeafSlot* slot = lslots_ + sidx;
slot->warm->set(node->id, node, LeafCache::MLAST);
cusage_ += node->size;
return node;
}
/**
* Remove a leaf node from the cache.
* @param node the leaf node.
* @param save whether to save dirty node.
* @return true on success, or false on failure.
*/
bool flush_leaf_node(LeafNode* node, bool save) {
_assert_(node);
bool err = false;
if (save && !save_leaf_node(node)) err = true;
typename RecordArray::const_iterator rit = node->recs.begin();
typename RecordArray::const_iterator ritend = node->recs.end();
while (rit != ritend) {
Record* rec = *rit;
xfree(rec);
++rit;
}
int32_t sidx = node->id % SLOTNUM;
LeafSlot* slot = lslots_ + sidx;
if (node->hot) {
slot->hot->remove(node->id);
} else {
slot->warm->remove(node->id);
}
cusage_ -= node->size;
delete node;
return !err;
}
/**
* Save a leaf node.
* @param node the leaf node.
* @return true on success, or false on failure.
*/
bool save_leaf_node(LeafNode* node) {
_assert_(node);
if (!node->dirty) return true;
bool err = false;
char hbuf[NUMBUFSIZ];
size_t hsiz = write_key(hbuf, LNPREFIX, node->id);
if (node->dead) {
if (!db_.remove(hbuf, hsiz) && db_.error().code() != Error::NOREC) err = true;
} else {
char* rbuf = new char[node->size];
char* wp = rbuf;
wp += writevarnum(wp, node->prev);
wp += writevarnum(wp, node->next);
typename RecordArray::const_iterator rit = node->recs.begin();
typename RecordArray::const_iterator ritend = node->recs.end();
while (rit != ritend) {
Record* rec = *rit;
wp += writevarnum(wp, rec->ksiz);
wp += writevarnum(wp, rec->vsiz);
char* dbuf = (char*)rec + sizeof(*rec);
std::memcpy(wp, dbuf, rec->ksiz);
wp += rec->ksiz;
std::memcpy(wp, dbuf + rec->ksiz, rec->vsiz);
wp += rec->vsiz;
++rit;
}
if (!db_.set(hbuf, hsiz, rbuf, wp - rbuf)) err = true;
delete[] rbuf;
}
node->dirty = false;
return !err;
}
/**
* Load a leaf node.
* @param id the ID number of the leaf node.
* @param prom whether to promote the warm cache.
* @return the loaded leaf node.
*/
LeafNode* load_leaf_node(int64_t id, bool prom) {
_assert_(id > 0);
int32_t sidx = id % SLOTNUM;
LeafSlot* slot = lslots_ + sidx;
ScopedMutex lock(&slot->lock);
LeafNode** np = slot->hot->get(id, LeafCache::MLAST);
if (np) return *np;
if (prom) {
if (slot->hot->count() * WARMRATIO > slot->warm->count() + WARMRATIO) {
slot->hot->first_value()->hot = false;
slot->hot->migrate(slot->hot->first_key(), slot->warm, LeafCache::MLAST);
}
np = slot->warm->migrate(id, slot->hot, LeafCache::MLAST);
if (np) {
(*np)->hot = true;
return *np;
}
} else {
LeafNode** np = slot->warm->get(id, LeafCache::MLAST);
if (np) return *np;
}
char hbuf[NUMBUFSIZ];
size_t hsiz = write_key(hbuf, LNPREFIX, id);
class VisitorImpl : public DB::Visitor {
public:
explicit VisitorImpl() : node_(NULL) {}
LeafNode* pop() {
return node_;
}
private:
const char* visit_full(const char* kbuf, size_t ksiz,
const char* vbuf, size_t vsiz, size_t* sp) {
uint64_t prev;
size_t step = readvarnum(vbuf, vsiz, &prev);
if (step < 1) return NOP;
vbuf += step;
vsiz -= step;
uint64_t next;
step = readvarnum(vbuf, vsiz, &next);
if (step < 1) return NOP;
vbuf += step;
vsiz -= step;
LeafNode* node = new LeafNode;
node->size = sizeof(int32_t) * 2;
node->prev = prev;
node->next = next;
while (vsiz > 1) {
uint64_t rksiz;
step = readvarnum(vbuf, vsiz, &rksiz);
if (step < 1) break;
vbuf += step;
vsiz -= step;
uint64_t rvsiz;
step = readvarnum(vbuf, vsiz, &rvsiz);
if (step < 1) break;
vbuf += step;
vsiz -= step;
if (vsiz < rksiz + rvsiz) break;
size_t rsiz = sizeof(Record) + rksiz + rvsiz;
Record* rec = (Record*)xmalloc(rsiz);
rec->ksiz = rksiz;
rec->vsiz = rvsiz;
char* dbuf = (char*)rec + sizeof(*rec);
std::memcpy(dbuf, vbuf, rksiz);
vbuf += rksiz;
vsiz -= rksiz;
std::memcpy(dbuf + rksiz, vbuf, rvsiz);
vbuf += rvsiz;
vsiz -= rvsiz;
node->recs.push_back(rec);
node->size += rsiz;
}
if (vsiz != 0) {
typename RecordArray::const_iterator rit = node->recs.begin();
typename RecordArray::const_iterator ritend = node->recs.end();
while (rit != ritend) {
Record* rec = *rit;
xfree(rec);
++rit;
}
delete node;
return NOP;
}
node_ = node;
return NOP;
}
LeafNode* node_;
} visitor;
if (!db_.accept(hbuf, hsiz, &visitor, false)) return NULL;
LeafNode* node = visitor.pop();
if (!node) return NULL;
node->id = id;
node->hot = false;
node->dirty = false;
node->dead = false;
slot->warm->set(id, node, LeafCache::MLAST);
cusage_ += node->size;
return node;
}
/**
* Check whether a record is in the range of a leaf node.
* @param node the leaf node.
* @param rec the record containing the key only.
* @return true for in range, or false for out of range.
*/
bool check_leaf_node_range(LeafNode* node, Record* rec) {
_assert_(node && rec);
RecordArray& recs = node->recs;
if (recs.empty()) return false;
Record* frec = recs.front();
Record* lrec = recs.back();
return !reccomp_(rec, frec) && !reccomp_(lrec, rec);
}
/**
* Accept a visitor at a leaf node.
* @param node the leaf node.
* @param rec the record containing the key only.
* @param visitor a visitor object.
* @return true to reorganize the tree, or false if not.
*/
bool accept_impl(LeafNode* node, Record* rec, Visitor* visitor) {
_assert_(node && rec && visitor);
bool reorg = false;
RecordArray& recs = node->recs;
typename RecordArray::iterator ritend = recs.end();
typename RecordArray::iterator rit = std::lower_bound(recs.begin(), ritend, rec, reccomp_);
if (rit != ritend && !reccomp_(rec, *rit)) {
Record* rec = *rit;
char* kbuf = (char*)rec + sizeof(*rec);
size_t ksiz = rec->ksiz;
size_t vsiz;
const char* vbuf = visitor->visit_full(kbuf, ksiz, kbuf + ksiz, rec->vsiz, &vsiz);
if (vbuf == Visitor::REMOVE) {
size_t rsiz = sizeof(*rec) + rec->ksiz + rec->vsiz;
count_ -= 1;
cusage_ -= rsiz;
node->size -= rsiz;
node->dirty = true;
xfree(rec);
recs.erase(rit);
if (recs.empty()) reorg = true;
} else if (vbuf != Visitor::NOP) {
int64_t diff = (int64_t)vsiz - (int64_t)rec->vsiz;
cusage_ += diff;
node->size += diff;
node->dirty = true;
if (vsiz > rec->vsiz) {
*rit = (Record*)xrealloc(rec, sizeof(*rec) + rec->ksiz + vsiz);
rec = *rit;
kbuf = (char*)rec + sizeof(*rec);
}
std::memcpy(kbuf + rec->ksiz, vbuf, vsiz);
rec->vsiz = vsiz;
if (node->size > psiz_ && recs.size() > 1) reorg = true;
}
} else {
const char* kbuf = (char*)rec + sizeof(*rec);
size_t ksiz = rec->ksiz;
size_t vsiz;
const char* vbuf = visitor->visit_empty(kbuf, ksiz, &vsiz);
if (vbuf != Visitor::NOP && vbuf != Visitor::REMOVE) {
size_t rsiz = sizeof(*rec) + ksiz + vsiz;
count_ += 1;
cusage_ += rsiz;
node->size += rsiz;
node->dirty = true;
rec = (Record*)xmalloc(rsiz);
rec->ksiz = ksiz;
rec->vsiz = vsiz;
char* dbuf = (char*)rec + sizeof(*rec);
std::memcpy(dbuf, kbuf, ksiz);
std::memcpy(dbuf + ksiz, vbuf, vsiz);
recs.insert(rit, rec);
if (node->size > psiz_ && recs.size() > 1) reorg = true;
}
}
return reorg;
}
/**
* Devide a leaf node into two.
* @param node the leaf node.
* @return the created node, or NULL on failure.
*/
LeafNode* divide_leaf_node(LeafNode* node) {
_assert_(node);
LeafNode* newnode = create_leaf_node(node->id, node->next);
if (newnode->next > 0) {
LeafNode* nextnode = load_leaf_node(newnode->next, false);
if (!nextnode) {
set_error(_KCCODELINE_, Error::BROKEN, "missing leaf node");
db_.report(_KCCODELINE_, Logger::WARN, "id=%lld", (long long)newnode->next);
return NULL;
}
nextnode->prev = newnode->id;
nextnode->dirty = true;
}
node->next = newnode->id;
node->dirty = true;
RecordArray& recs = node->recs;
typename RecordArray::iterator mid = recs.begin() + recs.size() / 2;
typename RecordArray::iterator rit = mid;
typename RecordArray::iterator ritend = recs.end();
RecordArray& newrecs = newnode->recs;
while (rit != ritend) {
Record* rec = *rit;
newrecs.push_back(rec);
size_t rsiz = sizeof(*rec) + rec->ksiz + rec->vsiz;
node->size -= rsiz;
newnode->size += rsiz;
++rit;
}
escape_cursors(node->id, node->next, *mid);
recs.erase(mid, ritend);
return newnode;
}
/**
* Open the inner cache.
*/
void create_inner_cache() {
_assert_(true);
int64_t bnum = (bnum_ / AVGWAY) / SLOTNUM + 1;
if (bnum < INT8MAX) bnum = INT8MAX;
bnum = nearbyprime(bnum);
for (int32_t i = 0; i < SLOTNUM; i++) {
islots_[i].warm = new InnerCache(bnum);
}
}
/**
* Close the inner cache.
*/
void delete_inner_cache() {
_assert_(true);
for (int32_t i = SLOTNUM - 1; i >= 0; i--) {
InnerSlot* slot = islots_ + i;
delete slot->warm;
}
}
/**
* Remove all inner nodes from the inner cache.
* @param save whether to save dirty nodes.
* @return true on success, or false on failure.
*/
bool flush_inner_cache(bool save) {
_assert_(true);
bool err = false;
for (int32_t i = SLOTNUM - 1; i >= 0; i--) {
InnerSlot* slot = islots_ + i;
typename InnerCache::Iterator it = slot->warm->begin();
typename InnerCache::Iterator itend = slot->warm->end();
while (it != itend) {
InnerNode* node = it.value();
++it;
if (!flush_inner_node(node, save)) err = true;
}
}
return !err;
}
/**
* Flush a part of the inner cache.
* @param slot a slot of inner nodes.
* @return true on success, or false on failure.
*/
bool flush_inner_cache_part(InnerSlot* slot) {
_assert_(slot);
bool err = false;
if (slot->warm->count() > 0) {
InnerNode* node = slot->warm->first_value();
if (!flush_inner_node(node, true)) err = true;
}
return !err;
}
/**
* Clean all of the inner cache.
* @return true on success, or false on failure.
*/
bool clean_inner_cache() {
_assert_(true);
bool err = false;
for (int32_t i = 0; i < SLOTNUM; i++) {
InnerSlot* slot = islots_ + i;
ScopedMutex lock(&slot->lock);
typename InnerCache::Iterator it = slot->warm->begin();
typename InnerCache::Iterator itend = slot->warm->end();
while (it != itend) {
InnerNode* node = it.value();
if (!save_inner_node(node)) err = true;
++it;
}
}
return !err;
}
/**
* Create a new inner node.
* @param heir the ID of the child before the first link.
* @return the created inner node.
*/
InnerNode* create_inner_node(int64_t heir) {
_assert_(true);
InnerNode* node = new InnerNode;
node->id = ++icnt_ + INIDBASE;
node->heir = heir;
node->links.reserve(DEFIINUM);
node->size = sizeof(int64_t);
node->dirty = true;
node->dead = false;
int32_t sidx = node->id % SLOTNUM;
InnerSlot* slot = islots_ + sidx;
slot->warm->set(node->id, node, InnerCache::MLAST);
cusage_ += node->size;
return node;
}
/**
* Remove an inner node from the cache.
* @param node the inner node.
* @param save whether to save dirty node.
* @return true on success, or false on failure.
*/
bool flush_inner_node(InnerNode* node, bool save) {
_assert_(node);
bool err = false;
if (save && !save_inner_node(node)) err = true;
typename LinkArray::const_iterator lit = node->links.begin();
typename LinkArray::const_iterator litend = node->links.end();
while (lit != litend) {
Link* link = *lit;
xfree(link);
++lit;
}
int32_t sidx = node->id % SLOTNUM;
InnerSlot* slot = islots_ + sidx;
slot->warm->remove(node->id);
cusage_ -= node->size;
delete node;
return !err;
}
/**
* Save a inner node.
* @param node the inner node.
* @return true on success, or false on failure.
*/
bool save_inner_node(InnerNode* node) {
_assert_(true);
if (!node->dirty) return true;
bool err = false;
char hbuf[NUMBUFSIZ];
size_t hsiz = write_key(hbuf, INPREFIX, node->id - INIDBASE);
if (node->dead) {
if (!db_.remove(hbuf, hsiz) && db_.error().code() != Error::NOREC) err = true;
} else {
char* rbuf = new char[node->size];
char* wp = rbuf;
wp += writevarnum(wp, node->heir);
typename LinkArray::const_iterator lit = node->links.begin();
typename LinkArray::const_iterator litend = node->links.end();
while (lit != litend) {
Link* link = *lit;
wp += writevarnum(wp, link->child);
wp += writevarnum(wp, link->ksiz);
char* dbuf = (char*)link + sizeof(*link);
std::memcpy(wp, dbuf, link->ksiz);
wp += link->ksiz;
++lit;
}
if (!db_.set(hbuf, hsiz, rbuf, wp - rbuf)) err = true;
delete[] rbuf;
}
node->dirty = false;
return !err;
}
/**
* Load an inner node.
* @param id the ID number of the inner node.
* @return the loaded inner node.
*/
InnerNode* load_inner_node(int64_t id) {
_assert_(id > 0);
int32_t sidx = id % SLOTNUM;
InnerSlot* slot = islots_ + sidx;
ScopedMutex lock(&slot->lock);
InnerNode** np = slot->warm->get(id, InnerCache::MLAST);
if (np) return *np;
char hbuf[NUMBUFSIZ];
size_t hsiz = write_key(hbuf, INPREFIX, id - INIDBASE);
class VisitorImpl : public DB::Visitor {
public:
explicit VisitorImpl() : node_(NULL) {}
InnerNode* pop() {
return node_;
}
private:
const char* visit_full(const char* kbuf, size_t ksiz,
const char* vbuf, size_t vsiz, size_t* sp) {
uint64_t heir;
size_t step = readvarnum(vbuf, vsiz, &heir);
if (step < 1) return NOP;
vbuf += step;
vsiz -= step;
InnerNode* node = new InnerNode;
node->size = sizeof(int64_t);
node->heir = heir;
while (vsiz > 1) {
uint64_t child;
step = readvarnum(vbuf, vsiz, &child);
if (step < 1) break;
vbuf += step;
vsiz -= step;
uint64_t rksiz;
step = readvarnum(vbuf, vsiz, &rksiz);
if (step < 1) break;
vbuf += step;
vsiz -= step;
if (vsiz < rksiz) break;
Link* link = (Link*)xmalloc(sizeof(*link) + rksiz);
link->child = child;
link->ksiz = rksiz;
char* dbuf = (char*)link + sizeof(*link);
std::memcpy(dbuf, vbuf, rksiz);
vbuf += rksiz;
vsiz -= rksiz;
node->links.push_back(link);
node->size += sizeof(*link) + rksiz;
}
if (vsiz != 0) {
typename LinkArray::const_iterator lit = node->links.begin();
typename LinkArray::const_iterator litend = node->links.end();
while (lit != litend) {
Link* link = *lit;
xfree(link);
++lit;
}
delete node;
return NOP;
}
node_ = node;
return NOP;
}
InnerNode* node_;
} visitor;
if (!db_.accept(hbuf, hsiz, &visitor, false)) return NULL;
InnerNode* node = visitor.pop();
if (!node) return NULL;
node->id = id;
node->dirty = false;
node->dead = false;
slot->warm->set(id, node, InnerCache::MLAST);
cusage_ += node->size;
return node;
}
/**
* Search the B+ tree.
* @param link the link containing the key only.
* @param prom whether to promote the warm cache.
* @param hist the array of visiting history.
* @param hnp the pointer to the variable into which the number of the history is assigned.
* @return the corresponding leaf node, or NULL on failure.
*/
LeafNode* search_tree(Link* link, bool prom, int64_t* hist, int32_t* hnp) {
_assert_(link && hist && hnp);
int64_t id = root_;
int32_t hnum = 0;
while (id > INIDBASE) {
InnerNode* node = load_inner_node(id);
if (!node) {
set_error(_KCCODELINE_, Error::BROKEN, "missing inner node");
db_.report(_KCCODELINE_, Logger::WARN, "id=%lld", (long long)id);
return NULL;
}
hist[hnum++] = id;
const LinkArray& links = node->links;
typename LinkArray::const_iterator litbeg = links.begin();
typename LinkArray::const_iterator litend = links.end();
typename LinkArray::const_iterator lit = std::upper_bound(litbeg, litend, link, linkcomp_);
if (lit == litbeg) {
id = node->heir;
} else {
--lit;
Link* link = *lit;
id = link->child;
}
}
*hnp = hnum;
return load_leaf_node(id, prom);
}
/**
* Reorganize the B+ tree.
* @param node a leaf node.
* @param hist the array of visiting history.
* @param hnum the number of the history.
* @return true on success, or false on failure.
*/
bool reorganize_tree(LeafNode* node, int64_t* hist, int32_t hnum) {
_assert_(node && hist && hnum >= 0);
if (node->size > psiz_ && node->recs.size() > 1) {
LeafNode* newnode = divide_leaf_node(node);
if (!newnode) return false;
if (node->id == last_) last_ = newnode->id;
int64_t heir = node->id;
int64_t child = newnode->id;
Record* rec = *newnode->recs.begin();
char* dbuf = (char*)rec + sizeof(*rec);
int32_t ksiz = rec->ksiz;
char* kbuf = new char[ksiz];
std::memcpy(kbuf, dbuf, ksiz);
while (true) {
if (hnum < 1) {
InnerNode* inode = create_inner_node(heir);
add_link_inner_node(inode, child, kbuf, ksiz);
root_ = inode->id;
delete[] kbuf;
break;
}
int64_t parent = hist[--hnum];
InnerNode* inode = load_inner_node(parent);
if (!inode) {
set_error(_KCCODELINE_, Error::BROKEN, "missing inner node");
db_.report(_KCCODELINE_, Logger::WARN, "id=%lld", (long long)parent);
delete[] kbuf;
return false;
}
add_link_inner_node(inode, child, kbuf, ksiz);
delete[] kbuf;
LinkArray& links = inode->links;
if (inode->size <= psiz_ || links.size() <= INLINKMIN) break;
typename LinkArray::iterator litbeg = links.begin();
typename LinkArray::iterator mid = litbeg + links.size() / 2;
Link* link = *mid;
InnerNode* newinode = create_inner_node(link->child);
heir = inode->id;
child = newinode->id;
char* dbuf = (char*)link + sizeof(*link);
ksiz = link->ksiz;
kbuf = new char[ksiz];
std::memcpy(kbuf, dbuf, ksiz);
typename LinkArray::iterator lit = mid + 1;
typename LinkArray::iterator litend = links.end();
while (lit != litend) {
link = *lit;
char* dbuf = (char*)link + sizeof(*link);
add_link_inner_node(newinode, link->child, dbuf, link->ksiz);
++lit;
}
int32_t num = newinode->links.size();
for (int32_t i = 0; i <= num; i++) {
Link* link = links.back();
size_t rsiz = sizeof(*link) + link->ksiz;
cusage_ -= rsiz;
inode->size -= rsiz;
xfree(link);
links.pop_back();
}
inode->dirty = true;
}
} else if (node->recs.empty() && hnum > 0) {
if (!escape_cursors(node->id, node->next)) return false;
InnerNode* inode = load_inner_node(hist[--hnum]);
if (!inode) {
set_error(_KCCODELINE_, Error::BROKEN, "missing inner node");
db_.report(_KCCODELINE_, Logger::WARN, "id=%lld", (long long)hist[hnum]);
return false;
}
if (sub_link_tree(inode, node->id, hist, hnum)) {
if (node->prev > 0) {
LeafNode* tnode = load_leaf_node(node->prev, false);
if (!tnode) {
set_error(_KCCODELINE_, Error::BROKEN, "missing node");
db_.report(_KCCODELINE_, Logger::WARN, "id=%lld", (long long)node->prev);
return false;
}
tnode->next = node->next;
tnode->dirty = true;
if (last_ == node->id) last_ = node->prev;
}
if (node->next > 0) {
LeafNode* tnode = load_leaf_node(node->next, false);
if (!tnode) {
set_error(_KCCODELINE_, Error::BROKEN, "missing node");
db_.report(_KCCODELINE_, Logger::WARN, "id=%lld", (long long)node->next);
return false;
}
tnode->prev = node->prev;
tnode->dirty = true;
if (first_ == node->id) first_ = node->next;
}
node->dead = true;
}
}
return true;
}
/**
* Add a link to a inner node.
* @param node the inner node.
* @param child the ID number of the child.
* @param kbuf the pointer to the key region.
* @param ksiz the size of the key region.
*/
void add_link_inner_node(InnerNode* node, int64_t child, const char* kbuf, size_t ksiz) {
_assert_(node && kbuf);
size_t rsiz = sizeof(Link) + ksiz;
Link* link = (Link*)xmalloc(rsiz);
link->child = child;
link->ksiz = ksiz;
char* dbuf = (char*)link + sizeof(*link);
std::memcpy(dbuf, kbuf, ksiz);
LinkArray& links = node->links;
typename LinkArray::iterator litend = links.end();
typename LinkArray::iterator lit = std::upper_bound(links.begin(), litend, link, linkcomp_);
links.insert(lit, link);
node->size += rsiz;
node->dirty = true;
cusage_ += rsiz;
}
/**
* Subtract a link from the B+ tree.
* @param node the inner node.
* @param child the ID number of the child.
* @param hist the array of visiting history.
* @param hnum the number of the history.
* @return true on success, or false on failure.
*/
bool sub_link_tree(InnerNode* node, int64_t child, int64_t* hist, int32_t hnum) {
_assert_(node && hist && hnum >= 0);
node->dirty = true;
LinkArray& links = node->links;
typename LinkArray::iterator lit = links.begin();
typename LinkArray::iterator litend = links.end();
if (node->heir == child) {
if (!links.empty()) {
Link* link = *lit;
node->heir = link->child;
xfree(link);
links.erase(lit);
return true;
} else if (hnum > 0) {
InnerNode* pnode = load_inner_node(hist[--hnum]);
if (!pnode) {
set_error(_KCCODELINE_, Error::BROKEN, "missing inner node");
db_.report(_KCCODELINE_, Logger::WARN, "id=%lld", (long long)hist[hnum]);
return false;
}
node->dead = true;
return sub_link_tree(pnode, node->id, hist, hnum);
}
node->dead = true;
root_ = child;
while (child > INIDBASE) {
node = load_inner_node(child);
if (!node) {
set_error(_KCCODELINE_, Error::BROKEN, "missing inner node");
db_.report(_KCCODELINE_, Logger::WARN, "id=%lld", (long long)child);
return false;
}
if (node->dead) {
child = node->heir;
root_ = child;
} else {
child = 0;
}
}
return false;
}
while (lit != litend) {
Link* link = *lit;
if (link->child == child) {
xfree(link);
links.erase(lit);
return true;
}
++lit;
}
set_error(_KCCODELINE_, Error::BROKEN, "invalid tree");
return false;
}
/**
* Dump the meta data into the file.
* @return true on success, or false on failure.
*/
bool dump_meta() {
_assert_(true);
char head[HEADSIZ];
std::memset(head, 0, sizeof(head));
char* wp = head;
if (reccomp_.comp == LEXICALCOMP) {
*(uint8_t*)(wp++) = 0x10;
} else if (reccomp_.comp == DECIMALCOMP) {
*(uint8_t*)(wp++) = 0x11;
} else if (reccomp_.comp == LEXICALDESCCOMP) {
*(uint8_t*)(wp++) = 0x18;
} else if (reccomp_.comp == DECIMALDESCCOMP) {
*(uint8_t*)(wp++) = 0x19;
} else {
*(uint8_t*)(wp++) = 0xff;
}
wp = head + MOFFNUMS;
uint64_t num = hton64(psiz_);
std::memcpy(wp, &num, sizeof(num));
wp += sizeof(num);
num = hton64(root_);
std::memcpy(wp, &num, sizeof(num));
wp += sizeof(num);
num = hton64(first_);
std::memcpy(wp, &num, sizeof(num));
wp += sizeof(num);
num = hton64(last_);
std::memcpy(wp, &num, sizeof(num));
wp += sizeof(num);
num = hton64(lcnt_);
std::memcpy(wp, &num, sizeof(num));
wp += sizeof(num);
num = hton64(icnt_);
std::memcpy(wp, &num, sizeof(num));
wp += sizeof(num);
num = hton64(count_);
std::memcpy(wp, &num, sizeof(num));
wp += sizeof(num);
num = hton64(bnum_);
std::memcpy(wp, &num, sizeof(num));
wp += sizeof(num);
std::memcpy(wp, "\x0a\x42\x6f\x6f\x66\x79\x21\x0a", sizeof(num));
wp += sizeof(num);
if (!db_.set(KCPDBMETAKEY, sizeof(KCPDBMETAKEY) - 1, head, sizeof(head))) return false;
trlcnt_ = lcnt_;
trcount_ = count_;
return true;
}
/**
* Load the meta data from the file.
* @return true on success, or false on failure.
*/
bool load_meta() {
_assert_(true);
char head[HEADSIZ];
int32_t hsiz = db_.get(KCPDBMETAKEY, sizeof(KCPDBMETAKEY) - 1, head, sizeof(head));
if (hsiz < 0) return false;
if (hsiz != sizeof(head)) {
set_error(_KCCODELINE_, Error::BROKEN, "invalid meta data record");
db_.report(_KCCODELINE_, Logger::WARN, "hsiz=%d", hsiz);
return false;
}
const char* rp = head;
if (*(uint8_t*)rp == 0x10) {
reccomp_.comp = LEXICALCOMP;
linkcomp_.comp = LEXICALCOMP;
} else if (*(uint8_t*)rp == 0x11) {
reccomp_.comp = DECIMALCOMP;
linkcomp_.comp = DECIMALCOMP;
} else if (*(uint8_t*)rp == 0x18) {
reccomp_.comp = LEXICALDESCCOMP;
linkcomp_.comp = LEXICALDESCCOMP;
} else if (*(uint8_t*)rp == 0x19) {
reccomp_.comp = DECIMALDESCCOMP;
linkcomp_.comp = DECIMALDESCCOMP;
} else if (*(uint8_t*)rp == 0xff) {
if (!reccomp_.comp) {
set_error(_KCCODELINE_, Error::INVALID, "the custom comparator is not given");
return false;
}
linkcomp_.comp = reccomp_.comp;
} else {
set_error(_KCCODELINE_, Error::BROKEN, "comparator is invalid");
return false;
}
rp = head + MOFFNUMS;
uint64_t num;
std::memcpy(&num, rp, sizeof(num));
psiz_ = ntoh64(num);
rp += sizeof(num);
std::memcpy(&num, rp, sizeof(num));
root_ = ntoh64(num);
rp += sizeof(num);
std::memcpy(&num, rp, sizeof(num));
first_ = ntoh64(num);
rp += sizeof(num);
std::memcpy(&num, rp, sizeof(num));
last_ = ntoh64(num);
rp += sizeof(num);
std::memcpy(&num, rp, sizeof(num));
lcnt_ = ntoh64(num);
rp += sizeof(num);
std::memcpy(&num, rp, sizeof(num));
icnt_ = ntoh64(num);
rp += sizeof(num);
std::memcpy(&num, rp, sizeof(num));
count_ = ntoh64(num);
rp += sizeof(num);
std::memcpy(&num, rp, sizeof(num));
bnum_ = ntoh64(num);
rp += sizeof(num);
trlcnt_ = lcnt_;
trcount_ = count_;
return true;
}
/**
* Caluculate the total number of nodes in the leaf cache.
* @return the total number of nodes in the leaf cache.
*/
int64_t calc_leaf_cache_count() {
_assert_(true);
int64_t sum = 0;
for (int32_t i = 0; i < SLOTNUM; i++) {
LeafSlot* slot = lslots_ + i;
sum += slot->warm->count();
sum += slot->hot->count();
}
return sum;
}
/**
* Caluculate the amount of memory usage of the leaf cache.
* @return the amount of memory usage of the leaf cache.
*/
int64_t calc_leaf_cache_size() {
_assert_(true);
int64_t sum = 0;
for (int32_t i = 0; i < SLOTNUM; i++) {
LeafSlot* slot = lslots_ + i;
typename LeafCache::Iterator it = slot->warm->begin();
typename LeafCache::Iterator itend = slot->warm->end();
while (it != itend) {
LeafNode* node = it.value();
sum += node->size;
++it;
}
it = slot->hot->begin();
itend = slot->hot->end();
while (it != itend) {
LeafNode* node = it.value();
sum += node->size;
++it;
}
}
return sum;
}
/**
* Caluculate the total number of nodes in the inner cache.
* @return the total number of nodes in the inner cache.
*/
int64_t calc_inner_cache_count() {
_assert_(true);
int64_t sum = 0;
for (int32_t i = 0; i < SLOTNUM; i++) {
InnerSlot* slot = islots_ + i;
sum += slot->warm->count();
}
return sum;
}
/**
* Caluculate the amount of memory usage of the inner cache.
* @return the amount of memory usage of the inner cache.
*/
int64_t calc_inner_cache_size() {
_assert_(true);
int64_t sum = 0;
for (int32_t i = 0; i < SLOTNUM; i++) {
InnerSlot* slot = islots_ + i;
typename InnerCache::Iterator it = slot->warm->begin();
typename InnerCache::Iterator itend = slot->warm->end();
while (it != itend) {
InnerNode* node = it.value();
sum += node->size;
++it;
}
}
return sum;
}
/**
* Disable all cursors.
*/
void disable_cursors() {
_assert_(true);
if (curs_.empty()) return;
typename CursorList::const_iterator cit = curs_.begin();
typename CursorList::const_iterator citend = curs_.end();
while (cit != citend) {
Cursor* cur = *cit;
if (cur->kbuf_) cur->clear_position();
++cit;
}
}
/**
* Escape cursors on a divided leaf node.
* @param src the ID of the source node.
* @param dest the ID of the destination node.
* @param rec the pivot record.
* @return true on success, or false on failure.
*/
void escape_cursors(int64_t src, int64_t dest, Record* rec) {
_assert_(src > 0 && dest >= 0 && rec);
if (curs_.empty()) return;
typename CursorList::const_iterator cit = curs_.begin();
typename CursorList::const_iterator citend = curs_.end();
while (cit != citend) {
Cursor* cur = *cit;
if (cur->lid_ == src) {
char* dbuf = (char*)rec + sizeof(*rec);
if (reccomp_.comp->compare(cur->kbuf_, cur->ksiz_, dbuf, rec->ksiz) >= 0)
cur->lid_ = dest;
}
++cit;
}
}
/**
* Escape cursors on a removed leaf node.
* @param src the ID of the source node.
* @param dest the ID of the destination node.
* @return true on success, or false on failure.
*/
bool escape_cursors(int64_t src, int64_t dest) {
_assert_(src > 0 && dest >= 0);
if (curs_.empty()) return true;
bool err = false;
typename CursorList::const_iterator cit = curs_.begin();
typename CursorList::const_iterator citend = curs_.end();
while (cit != citend) {
Cursor* cur = *cit;
if (cur->lid_ == src) {
cur->clear_position();
if (!cur->set_position(dest) && db_.error().code() != Error::NOREC) err = true;
}
++cit;
}
return !err;
}
/**
* Recalculate the count data.
* @return true on success, or false on failure.
*/
bool recalc_count() {
_assert_(true);
if (!load_meta()) return false;
bool err = false;
std::set ids;
std::set prevs;
std::set nexts;
class VisitorImpl : public DB::Visitor {
public:
explicit VisitorImpl(std::set* ids,
std::set* prevs, std::set* nexts) :
ids_(ids), prevs_(prevs), nexts_(nexts), count_(0) {}
int64_t count() {
return count_;
}
private:
const char* visit_full(const char* kbuf, size_t ksiz,
const char* vbuf, size_t vsiz, size_t* sp) {
if (ksiz < 2 || ksiz >= NUMBUFSIZ || kbuf[0] != LNPREFIX) return NOP;
kbuf++;
ksiz--;
char tkbuf[NUMBUFSIZ];
std::memcpy(tkbuf, kbuf, ksiz);
tkbuf[ksiz] = '\0';
int64_t id = atoih(tkbuf);
uint64_t prev;
size_t step = readvarnum(vbuf, vsiz, &prev);
if (step < 1) return NOP;
vbuf += step;
vsiz -= step;
uint64_t next;
step = readvarnum(vbuf, vsiz, &next);
if (step < 1) return NOP;
vbuf += step;
vsiz -= step;
ids_->insert(id);
if (prev > 0) prevs_->insert(prev);
if (next > 0) nexts_->insert(next);
while (vsiz > 1) {
uint64_t rksiz;
step = readvarnum(vbuf, vsiz, &rksiz);
if (step < 1) break;
vbuf += step;
vsiz -= step;
uint64_t rvsiz;
step = readvarnum(vbuf, vsiz, &rvsiz);
if (step < 1) break;
vbuf += step;
vsiz -= step;
if (vsiz < rksiz + rvsiz) break;
vbuf += rksiz;
vsiz -= rksiz;
vbuf += rvsiz;
vsiz -= rvsiz;
count_++;
}
return NOP;
}
std::set* ids_;
std::set* prevs_;
std::set* nexts_;
int64_t count_;
} visitor(&ids, &prevs, &nexts);
if (!db_.iterate(&visitor, false)) err = true;
int64_t count = visitor.count();
db_.report(_KCCODELINE_, Logger::WARN, "recalculated the record count from %lld to %lld",
(long long)count_, (long long)count);
std::set::iterator iitend = ids.end();
std::set::iterator nit = nexts.begin();
std::set::iterator nitend = nexts.end();
while (nit != nitend) {
if (ids.find(*nit) == ids.end()) {
db_.report(_KCCODELINE_, Logger::WARN, "detected missing leaf: %lld", (long long)*nit);
count = INT64MAX;
}
++nit;
}
std::set::iterator pit = prevs.begin();
std::set::iterator pitend = prevs.end();
while (pit != pitend) {
if (ids.find(*pit) == iitend) {
db_.report(_KCCODELINE_, Logger::WARN, "detected missing leaf: %lld", (long long)*pit);
count = INT64MAX;
}
++pit;
}
count_ = count;
if (!dump_meta()) err = true;
return !err;
}
/**
* Reorganize the database file.
* @param mode the connection mode of the internal database.
* @return true on success, or false on failure.
*/
bool reorganize_file(uint32_t mode) {
_assert_(true);
if (!load_meta()) {
if (reccomp_.comp) {
linkcomp_.comp = reccomp_.comp;
} else {
reccomp_.comp = LEXICALCOMP;
linkcomp_.comp = LEXICALCOMP;
}
}
const std::string& path = db_.path();
const std::string& npath = path + File::EXTCHR + KCPDBTMPPATHEXT;
PlantDB tdb;
tdb.tune_comparator(reccomp_.comp);
if (!tdb.open(npath, OWRITER | OCREATE | OTRUNCATE)) {
set_error(_KCCODELINE_, tdb.error().code(), "opening the destination failed");
return false;
}
db_.report(_KCCODELINE_, Logger::WARN, "reorganizing the database");
bool err = false;
create_leaf_cache();
create_inner_cache();
DB::Cursor* cur = db_.cursor();
cur->jump();
char* kbuf;
size_t ksiz;
while (!err && (kbuf = cur->get_key(&ksiz)) != NULL) {
if (*kbuf == LNPREFIX) {
int64_t id = std::strtol(kbuf + 1, NULL, 16);
if (id > 0 && id < INIDBASE) {
LeafNode* node = load_leaf_node(id, false);
if (node) {
const RecordArray& recs = node->recs;
typename RecordArray::const_iterator rit = recs.begin();
typename RecordArray::const_iterator ritend = recs.end();
while (rit != ritend) {
Record* rec = *rit;
char* dbuf = (char*)rec + sizeof(*rec);
if (!tdb.set(dbuf, rec->ksiz, dbuf + rec->ksiz, rec->vsiz)) {
set_error(_KCCODELINE_, tdb.error().code(),
"opening the destination failed");
err = true;
}
++rit;
}
flush_leaf_node(node, false);
}
}
}
delete[] kbuf;
cur->step();
}
delete cur;
delete_inner_cache();
delete_leaf_cache();
if (!tdb.close()) {
set_error(_KCCODELINE_, tdb.error().code(), "opening the destination failed");
err = true;
}
if (DBTYPE == TYPETREE) {
if (File::rename(npath, path)) {
if (!db_.close()) err = true;
if (!db_.open(path, mode)) err = true;
} else {
set_error(_KCCODELINE_, Error::SYSTEM, "renaming the destination failed");
err = true;
}
File::remove(npath);
} else if (DBTYPE == TYPEFOREST) {
const std::string& tpath = npath + File::EXTCHR + KCPDBTMPPATHEXT;
File::remove_recursively(tpath);
if (File::rename(path, tpath)) {
if (File::rename(npath, path)) {
if (!db_.close()) err = true;
if (!db_.open(path, mode)) err = true;
} else {
set_error(_KCCODELINE_, Error::SYSTEM, "renaming the destination failed");
File::rename(tpath, path);
err = true;
}
} else {
set_error(_KCCODELINE_, Error::SYSTEM, "renaming the source failed");
err = true;
}
File::remove_recursively(tpath);
File::remove_recursively(npath);
} else {
BASEDB udb;
if (!err && udb.open(npath, OREADER)) {
if (writer_) {
if (!db_.clear()) err = true;
} else {
if (!db_.close()) err = true;
uint32_t tmode = (mode & ~OREADER) | OWRITER | OCREATE | OTRUNCATE;
if (!db_.open(path, tmode)) err = true;
}
cur = udb.cursor();
cur->jump();
const char* vbuf;
size_t vsiz;
while (!err && (kbuf = cur->get(&ksiz, &vbuf, &vsiz)) != NULL) {
if (!db_.set(kbuf, ksiz, vbuf, vsiz)) err = true;
delete[] kbuf;
cur->step();
}
delete cur;
if (writer_) {
if (!db_.synchronize(false, NULL)) err = true;
} else {
if (!db_.close()) err = true;
if (!db_.open(path, mode)) err = true;
}
if (!udb.close()) {
set_error(_KCCODELINE_, udb.error().code(), "closing the destination failed");
err = true;
}
} else {
set_error(_KCCODELINE_, udb.error().code(), "opening the destination failed");
err = true;
}
File::remove_recursively(npath);
}
return !err;
}
/**
* Begin transaction.
* @param hard true for physical synchronization with the device, or false for logical
* synchronization with the file system.
* @return true on success, or false on failure.
*/
bool begin_transaction_impl(bool hard) {
_assert_(true);
if (!clean_leaf_cache()) return false;
if (!clean_inner_cache()) return false;
int32_t idx = trclock_++ % SLOTNUM;
LeafSlot* lslot = lslots_ + idx;
if (lslot->warm->count() + lslot->hot->count() > 1) flush_leaf_cache_part(lslot);
InnerSlot* islot = islots_ + idx;
if (islot->warm->count() > 1) flush_inner_cache_part(islot);
if ((trlcnt_ != lcnt_ || count_ != trcount_) && !dump_meta()) return false;
if (!db_.begin_transaction(hard)) return false;
return true;
}
/**
* Commit transaction.
* @return true on success, or false on failure.
*/
bool commit_transaction() {
_assert_(true);
bool err = false;
if (!clean_leaf_cache()) return false;
if (!clean_inner_cache()) return false;
if ((trlcnt_ != lcnt_ || count_ != trcount_) && !dump_meta()) err = true;
if (!db_.end_transaction(true)) return false;
return !err;
}
/**
* Abort transaction.
* @return true on success, or false on failure.
*/
bool abort_transaction() {
_assert_(true);
bool err = false;
flush_leaf_cache(false);
flush_inner_cache(false);
if (!db_.end_transaction(false)) err = true;
if (!load_meta()) err = true;
disable_cursors();
return !err;
}
/**
* Fix auto transaction for the B+ tree.
* @return true on success, or false on failure.
*/
bool fix_auto_transaction_tree() {
_assert_(true);
if (!db_.begin_transaction(autosync_)) return false;
bool err = false;
if (!clean_leaf_cache()) err = true;
if (!clean_inner_cache()) err = true;
size_t cnum = ATRANCNUM / SLOTNUM;
int32_t idx = trclock_++ % SLOTNUM;
LeafSlot* lslot = lslots_ + idx;
if (lslot->warm->count() + lslot->hot->count() > cnum) flush_leaf_cache_part(lslot);
InnerSlot* islot = islots_ + idx;
if (islot->warm->count() > cnum) flush_inner_cache_part(islot);
if (!dump_meta()) err = true;
if (!db_.end_transaction(true)) err = true;
return !err;
}
/**
* Fix auto transaction for a leaf.
* @return true on success, or false on failure.
*/
bool fix_auto_transaction_leaf(LeafNode* node) {
_assert_(node);
bool err = false;
if (!save_leaf_node(node)) err = true;
return !err;
}
/**
* Fix auto synchronization.
* @return true on success, or false on failure.
*/
bool fix_auto_synchronization() {
_assert_(true);
bool err = false;
if (!flush_leaf_cache(true)) err = true;
if (!flush_inner_cache(true)) err = true;
if (!dump_meta()) err = true;
if (!db_.synchronize(true, NULL)) err = true;
return !err;
}
/**
* Write the key pattern into a buffer.
* @param kbuf the destination buffer.
* @param pc the prefix character.
* @param id the ID number of the page.
* @return the size of the key pattern.
*/
size_t write_key(char* kbuf, int32_t pc, int64_t num) {
_assert_(kbuf && num >= 0);
char* wp = kbuf;
*(wp++) = pc;
bool hit = false;
for (size_t i = 0; i < sizeof(num); i++) {
uint8_t c = num >> ((sizeof(num) - 1 - i) * 8);
uint8_t h = c >> 4;
if (h < 10) {
if (hit || h != 0) {
*(wp++) = '0' + h;
hit = true;
}
} else {
*(wp++) = 'A' - 10 + h;
hit = true;
}
uint8_t l = c & 0xf;
if (l < 10) {
if (hit || l != 0) {
*(wp++) = '0' + l;
hit = true;
}
} else {
*(wp++) = 'A' - 10 + l;
hit = true;
}
}
return wp - kbuf;
}
/** Dummy constructor to forbid the use. */
PlantDB(const PlantDB&);
/** Dummy Operator to forbid the use. */
PlantDB& operator =(const PlantDB&);
/** The internal meta operation trigger. */
MetaTrigger* mtrigger_;
/** The open mode. */
uint32_t omode_;
/** The flag for writer. */
bool writer_;
/** The flag for auto transaction. */
bool autotran_;
/** The flag for auto synchronization. */
bool autosync_;
/** The internal database. */
BASEDB db_;
/** The cursor objects. */
CursorList curs_;
/** The alignment power. */
uint8_t apow_;
/** The free block pool power. */
uint8_t fpow_;
/** The options. */
uint8_t opts_;
/** The bucket number. */
int64_t bnum_;
/** The page size. */
int32_t psiz_;
/** The capacity of page cache. */
int64_t pccap_;
/** The root node. */
int64_t root_;
/** The first node. */
int64_t first_;
/** The last node. */
int64_t last_;
/** The count of leaf nodes. */
int64_t lcnt_;
/** The count of inner nodes. */
int64_t icnt_;
/** The record number. */
AtomicInt64 count_;
/** The cache memory usage. */
AtomicInt64 cusage_;
/** The Slots of leaf nodes. */
LeafSlot lslots_[SLOTNUM];
/** The Slots of inner nodes. */
InnerSlot islots_[SLOTNUM];
/** The record comparator. */
RecordComparator reccomp_;
/** The link comparator. */
LinkComparator linkcomp_;
/** The flag whether in transaction. */
bool tran_;
/** The logical clock for transaction. */
int64_t trclock_;
/** The leaf count history for transaction. */
int64_t trlcnt_;
/** The record count history for transaction. */
int64_t trcount_;
};
} // common namespace
#endif // duplication check
// END OF FILE