diff options
Diffstat (limited to 'libs/tdlib/td/tddb')
31 files changed, 3285 insertions, 0 deletions
diff --git a/libs/tdlib/td/tddb/CMakeLists.txt b/libs/tdlib/td/tddb/CMakeLists.txt new file mode 100644 index 0000000000..531dcc5c02 --- /dev/null +++ b/libs/tdlib/td/tddb/CMakeLists.txt @@ -0,0 +1,55 @@ +cmake_minimum_required(VERSION 3.0.2 FATAL_ERROR) + +#SOURCE SETS +set(TDDB_SOURCE + td/db/binlog/Binlog.cpp + td/db/binlog/BinlogEvent.cpp + td/db/binlog/ConcurrentBinlog.cpp + td/db/binlog/detail/BinlogEventsBuffer.cpp + td/db/binlog/detail/BinlogEventsProcessor.cpp + + td/db/SqliteDb.cpp + td/db/SqliteStatement.cpp + td/db/SqliteKeyValueAsync.cpp + + td/db/detail/RawSqliteDb.cpp + + td/db/binlog/Binlog.h + td/db/binlog/BinlogInterface.h + td/db/binlog/BinlogEvent.h + td/db/binlog/BinlogHelper.h + td/db/binlog/ConcurrentBinlog.h + td/db/binlog/detail/BinlogEventsBuffer.h + td/db/binlog/detail/BinlogEventsProcessor.h + + td/db/BinlogKeyValue.h + td/db/DbKey.h + td/db/KeyValueSyncInterface.h + td/db/Pmc.h + td/db/SeqKeyValue.h + td/db/SqliteConnectionSafe.h + td/db/SqliteDb.h + td/db/SqliteKeyValue.h + td/db/SqliteKeyValueAsync.h + td/db/SqliteKeyValueSafe.h + td/db/SqliteStatement.h + td/db/TsSeqKeyValue.h + + td/db/detail/RawSqliteDb.h +) + +add_library(tddb STATIC ${TDDB_SOURCE}) +target_include_directories(tddb PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>) +target_link_libraries(tddb PUBLIC tdactor tdutils PRIVATE tdsqlite) + +if (NOT CMAKE_CROSSCOMPILING) + add_executable(binlog_dump td/db/binlog/binlog_dump.cpp) + target_link_libraries(binlog_dump PRIVATE tddb) +endif() + +install(TARGETS tddb EXPORT TdTargets + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib + RUNTIME DESTINATION bin + INCLUDES DESTINATION include +) diff --git a/libs/tdlib/td/tddb/td/db/BinlogKeyValue.h b/libs/tdlib/td/tddb/td/db/BinlogKeyValue.h new file mode 100644 index 0000000000..04df413d53 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/BinlogKeyValue.h @@ -0,0 +1,252 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/PromiseFuture.h" + +#include "td/db/binlog/Binlog.h" +#include "td/db/binlog/BinlogEvent.h" +#include "td/db/binlog/ConcurrentBinlog.h" +#include "td/db/KeyValueSyncInterface.h" + +#include "td/utils/buffer.h" +#include "td/utils/common.h" +#include "td/utils/logging.h" +#include "td/utils/misc.h" +#include "td/utils/port/RwMutex.h" +#include "td/utils/Slice.h" +#include "td/utils/Status.h" +#include "td/utils/tl_parsers.h" +#include "td/utils/tl_storers.h" + +#include <map> +#include <memory> +#include <unordered_map> +#include <utility> + +namespace td { +template <class BinlogT> +class BinlogKeyValue : public KeyValueSyncInterface { + public: + static constexpr int32 magic = 0x2a280000; + + struct Event : public Storer { + Event() = default; + Event(Slice key, Slice value) : key(key), value(value) { + } + Slice key; + Slice value; + template <class StorerT> + void store(StorerT &&storer) const { + storer.store_string(key); + storer.store_string(value); + } + + template <class ParserT> + void parse(ParserT &&parser) { + key = parser.template fetch_string<Slice>(); + value = parser.template fetch_string<Slice>(); + } + + size_t size() const override { + TlStorerCalcLength storer; + store(storer); + return storer.get_length(); + } + size_t store(uint8 *ptr_x) const override { + auto ptr = reinterpret_cast<char *>(ptr_x); + TlStorerUnsafe storer(ptr); + store(storer); + return storer.get_buf() - ptr; + } + }; + + int32 get_magic() const { + return magic_; + } + + Status init(string name, DbKey db_key = DbKey::empty(), int scheduler_id = -1, + int32 override_magic = 0) TD_WARN_UNUSED_RESULT { + close(); + if (override_magic) { + magic_ = override_magic; + } + + binlog_ = std::make_unique<BinlogT>(); + TRY_STATUS(binlog_->init(name, + [&](const BinlogEvent &binlog_event) { + Event event; + event.parse(TlParser(binlog_event.data_)); + map_.emplace(std::make_pair(event.key.str(), + std::make_pair(event.value.str(), binlog_event.id_))); + }, + std::move(db_key), DbKey::empty(), scheduler_id)); + return Status::OK(); + } + + void external_init_begin(int32 override_magic = 0) { + close(); + if (override_magic) { + magic_ = override_magic; + } + } + + template <class OtherBinlogT> + void external_init_handle(BinlogKeyValue<OtherBinlogT> &&other) { + map_ = std::move(other.map_); + } + + void external_init_handle(const BinlogEvent &binlog_event) { + Event event; + event.parse(TlParser(binlog_event.data_)); + map_.emplace(std::make_pair(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_))); + } + + void external_init_finish(std::shared_ptr<BinlogT> binlog) { + binlog_ = std::move(binlog); + } + + void close() { + *this = BinlogKeyValue(); + } + + SeqNo set(string key, string value) override { + auto lock = rw_mutex_.lock_write().move_as_ok(); + uint64 old_id = 0; + auto it_ok = map_.insert({key, {value, 0}}); + if (!it_ok.second) { + if (it_ok.first->second.first == value) { + return 0; + } + old_id = it_ok.first->second.second; + it_ok.first->second.first = value; + } + bool rewrite = false; + uint64 id; + auto seq_no = binlog_->next_id(); + if (old_id != 0) { + rewrite = true; + id = old_id; + } else { + id = seq_no; + it_ok.first->second.second = id; + } + + lock.reset(); + add_event(seq_no, + BinlogEvent::create_raw(id, magic_, rewrite ? BinlogEvent::Flags::Rewrite : 0, Event{key, value})); + return seq_no; + } + + SeqNo erase(const string &key) override { + auto lock = rw_mutex_.lock_write().move_as_ok(); + auto it = map_.find(key); + if (it == map_.end()) { + return 0; + } + uint64 id = it->second.second; + map_.erase(it); + // LOG(ERROR) << "ADD EVENT"; + auto seq_no = binlog_->next_id(); + lock.reset(); + add_event(seq_no, BinlogEvent::create_raw(id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, + EmptyStorer())); + return seq_no; + } + + void add_event(uint64 seq_no, BufferSlice &&event) { + binlog_->add_raw_event(seq_no, std::move(event)); + } + + bool isset(const string &key) override { + auto lock = rw_mutex_.lock_read().move_as_ok(); + return map_.count(key) > 0; + } + + string get(const string &key) override { + auto lock = rw_mutex_.lock_read().move_as_ok(); + auto it = map_.find(key); + if (it == map_.end()) { + return string(); + } + return it->second.first; + } + + void force_sync(Promise<> &&promise) { + binlog_->force_sync(std::move(promise)); + } + void lazy_sync(Promise<> &&promise) { + binlog_->lazy_sync(std::move(promise)); + } + + std::unordered_map<string, string> prefix_get(Slice prefix) { + // TODO: optimize with std::map? + auto lock = rw_mutex_.lock_write().move_as_ok(); + std::unordered_map<string, string> res; + for (const auto &kv : map_) { + if (begins_with(kv.first, prefix)) { + res[kv.first] = kv.second.first; + } + } + return res; + } + + std::unordered_map<string, string> get_all() { + auto lock = rw_mutex_.lock_write().move_as_ok(); + std::unordered_map<string, string> res; + for (const auto &kv : map_) { + res[kv.first] = kv.second.first; + } + return res; + } + + void erase_by_prefix(Slice prefix) { + auto lock = rw_mutex_.lock_write().move_as_ok(); + std::vector<uint64> ids; + for (auto it = map_.begin(); it != map_.end();) { + if (begins_with(it->first, prefix)) { + ids.push_back(it->second.second); + it = map_.erase(it); + } else { + ++it; + } + } + auto seq_no = binlog_->next_id(narrow_cast<int32>(ids.size())); + lock.reset(); + for (auto id : ids) { + add_event(seq_no, BinlogEvent::create_raw(id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, + EmptyStorer())); + seq_no++; + } + } + template <class T> + friend class BinlogKeyValue; + + static Status destroy(Slice name) { + return Binlog::destroy(name); + } + + private: + std::unordered_map<string, std::pair<string, uint64>> map_; + std::shared_ptr<BinlogT> binlog_; + RwMutex rw_mutex_; + int32 magic_ = magic; +}; +template <> +inline void BinlogKeyValue<Binlog>::add_event(uint64 seq_no, BufferSlice &&event) { + binlog_->add_raw_event(std::move(event)); +} +template <> +inline void BinlogKeyValue<Binlog>::force_sync(Promise<> &&promise) { + binlog_->sync(); + promise.set_value(Unit()); +} +template <> +inline void BinlogKeyValue<Binlog>::lazy_sync(Promise<> &&promise) { + force_sync(std::move(promise)); +} +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/DbKey.h b/libs/tdlib/td/tddb/td/db/DbKey.h new file mode 100644 index 0000000000..b0edde2ae1 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/DbKey.h @@ -0,0 +1,51 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once +#include "td/utils/common.h" +#include "td/utils/Slice.h" + +namespace td { +class DbKey { + public: + enum Type { Empty, RawKey, Password }; + + Type type() const { + return type_; + } + bool is_empty() const { + return type_ == Empty; + } + bool is_raw_key() const { + return type_ == RawKey; + } + bool is_password() const { + return type_ == Password; + } + CSlice data() const { + return data_; + } + static DbKey raw_key(string raw_key) { + DbKey res; + res.type_ = RawKey; + res.data_ = std::move(raw_key); + return res; + } + static DbKey password(string password) { + DbKey res; + res.type_ = Password; + res.data_ = std::move(password); + return res; + } + static DbKey empty() { + return DbKey(); + } + + private: + Type type_{Empty}; + string data_; +}; +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/KeyValueSyncInterface.h b/libs/tdlib/td/tddb/td/db/KeyValueSyncInterface.h new file mode 100644 index 0000000000..8d19d0e75c --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/KeyValueSyncInterface.h @@ -0,0 +1,35 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/common.h" + +namespace td { + +class KeyValueSyncInterface { + public: + // SeqNo is used to restore total order on all write queries. + // Some implementations may return 0 as SeqNo. + using SeqNo = uint64; + + KeyValueSyncInterface() = default; + KeyValueSyncInterface(const KeyValueSyncInterface &) = delete; + KeyValueSyncInterface &operator=(const KeyValueSyncInterface &) = delete; + KeyValueSyncInterface(KeyValueSyncInterface &&) = default; + KeyValueSyncInterface &operator=(KeyValueSyncInterface &&) = default; + virtual ~KeyValueSyncInterface() = default; + + virtual SeqNo set(string key, string value) = 0; + + virtual bool isset(const string &key) = 0; + + virtual string get(const string &key) = 0; + + virtual SeqNo erase(const string &key) = 0; +}; + +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/Pmc.h b/libs/tdlib/td/tddb/td/db/Pmc.h new file mode 100644 index 0000000000..dcf0e0c351 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/Pmc.h @@ -0,0 +1,27 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/db/binlog/ConcurrentBinlog.h" +#include "td/db/BinlogKeyValue.h" +#include "td/db/SqliteKeyValue.h" + +#include "td/utils/common.h" + +#include <memory> + +namespace td { + +using BinlogPmcBase = BinlogKeyValue<ConcurrentBinlog>; +using BinlogPmc = std::shared_ptr<BinlogPmcBase>; +using BinlogPmcPtr = BinlogPmcBase *; + +using BigPmcBase = SqliteKeyValue; +using BigPmc = std::unique_ptr<BigPmcBase>; +using BigPmcPtr = BigPmcBase *; + +}; // namespace td diff --git a/libs/tdlib/td/tddb/td/db/SeqKeyValue.h b/libs/tdlib/td/tddb/td/db/SeqKeyValue.h new file mode 100644 index 0000000000..ec6f2b99b6 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/SeqKeyValue.h @@ -0,0 +1,78 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/Slice.h" + +#include <unordered_map> + +namespace td { +class SeqKeyValue { + public: + using SeqNo = uint64; + SeqKeyValue() = default; + SeqKeyValue(SeqKeyValue &&) = default; + SeqKeyValue &operator=(SeqKeyValue &&) = default; + SeqKeyValue(const SeqKeyValue &) = delete; + SeqKeyValue &operator=(const SeqKeyValue &) = delete; + ~SeqKeyValue() = default; + + SeqNo set(Slice key, Slice value) { + auto it_ok = map_.insert({key.str(), value.str()}); + if (!it_ok.second) { + if (it_ok.first->second == value) { + return 0; + } + it_ok.first->second = value.str(); + } + return next_seq_no(); + } + SeqNo erase(const string &key) { + auto it = map_.find(key); + if (it == map_.end()) { + return 0; + } + map_.erase(it); + return next_seq_no(); + } + SeqNo seq_no() const { + return current_id_ + 1; + } + string get(const string &key) const { + auto it = map_.find(key); + if (it == map_.end()) { + return string(); + } + return it->second; + } + + template <class F> + void foreach (const F &f) { + for (auto &it : map_) { + f(it.first, it.second); + } + } + + size_t size() const { + return map_.size(); + } + + void reset_seq_no() { + current_id_ = 0; + } + std::unordered_map<string, string> get_all() const { + return map_; + } + + private: + std::unordered_map<string, string> map_; + SeqNo current_id_ = 0; + SeqNo next_seq_no() { + return ++current_id_; + } +}; +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/SqliteConnectionSafe.h b/libs/tdlib/td/tddb/td/db/SqliteConnectionSafe.h new file mode 100644 index 0000000000..6e45e79e63 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/SqliteConnectionSafe.h @@ -0,0 +1,53 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/SchedulerLocalStorage.h" + +#include "td/db/SqliteDb.h" + +#include "td/utils/common.h" +#include "td/utils/format.h" +#include "td/utils/logging.h" + +namespace td { + +class SqliteConnectionSafe { + public: + SqliteConnectionSafe() = default; + explicit SqliteConnectionSafe(string name, DbKey key = DbKey::empty()) + : lsls_connection_([name = name, key = std::move(key)] { + auto db = SqliteDb::open_with_key(name, key).move_as_ok(); + db.exec("PRAGMA synchronous=NORMAL").ensure(); + db.exec("PRAGMA temp_store=MEMORY").ensure(); + db.exec("PRAGMA secure_delete=1").ensure(); + db.exec("PRAGMA recursive_triggers=1").ensure(); + return db; + }) + , name_(std::move(name)) { + } + + SqliteDb &get() { + return lsls_connection_.get(); + } + + void close() { + LOG(INFO) << "Close sqlite db " << tag("path", name_); + lsls_connection_.clear_values(); + } + void close_and_destroy() { + close(); + LOG(INFO) << "Destroy sqlite db " << tag("path", name_); + SqliteDb::destroy(name_).ignore(); + } + + private: + LazySchedulerLocalStorage<SqliteDb> lsls_connection_; + string name_; +}; + +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/SqliteDb.cpp b/libs/tdlib/td/tddb/td/db/SqliteDb.cpp new file mode 100644 index 0000000000..819818197d --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/SqliteDb.cpp @@ -0,0 +1,228 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/SqliteDb.h" + +#include "td/utils/format.h" +#include "td/utils/port/path.h" +#include "td/utils/port/Stat.h" +#include "td/utils/Status.h" +#include "td/utils/StringBuilder.h" +#include "td/utils/Timer.h" + +#include "sqlite/sqlite3.h" + +namespace td { + +namespace { +string db_key_to_sqlcipher_key(const DbKey &db_key) { + if (db_key.is_empty()) { + return "''"; + } + if (db_key.is_password()) { + return PSTRING() << "'" << db_key.data().str() << "'"; + } + CHECK(db_key.is_raw_key()); + Slice raw_key = db_key.data(); + CHECK(raw_key.size() == 32); + size_t expected_size = 64 + 5; + string res(expected_size + 50, ' '); + StringBuilder sb(res); + sb << '"'; + sb << 'x'; + sb << '\''; + sb << format::as_hex_dump<0>(raw_key); + sb << '\''; + sb << '"'; + CHECK(!sb.is_error()); + CHECK(sb.as_cslice().size() == expected_size); + res.resize(expected_size); + return res; +} +} // namespace + +SqliteDb::~SqliteDb() = default; + +Status SqliteDb::init(CSlice path, bool *was_created) { + // If database does not exist, delete all other files which may left + // from older database + bool is_db_exists = stat(path).is_ok(); + if (!is_db_exists) { + destroy(path).ignore(); + } + + if (was_created != nullptr) { + *was_created = !is_db_exists; + } + sqlite3 *db; + CHECK(sqlite3_threadsafe() != 0); + int rc = sqlite3_open_v2(path.c_str(), &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE /*| SQLITE_OPEN_SHAREDCACHE*/, + nullptr); + if (rc != SQLITE_OK) { + auto res = Status::Error(PSLICE() << "Failed to open db: " << detail::RawSqliteDb::last_error(db)); + sqlite3_close(db); + return res; + } + sqlite3_busy_timeout(db, 1000 * 5 /* 5 seconds */); + raw_ = std::make_shared<detail::RawSqliteDb>(db, path.str()); + return Status::OK(); +} + +static void trace_callback(void *ptr, const char *query) { + LOG(ERROR) << query; +} +static int trace_v2_callback(unsigned code, void *ctx, void *p_raw, void *x_raw) { + CHECK(code == SQLITE_TRACE_STMT); + auto x = static_cast<const char *>(x_raw); + if (x[0] == '-' && x[1] == '-') { + trace_callback(ctx, x); + } else { + trace_callback(ctx, sqlite3_expanded_sql(static_cast<sqlite3_stmt *>(p_raw))); + } + + return 0; +} +void SqliteDb::trace(bool flag) { + sqlite3_trace_v2(raw_->db(), SQLITE_TRACE_STMT, flag ? trace_v2_callback : nullptr, nullptr); +} + +Status SqliteDb::exec(CSlice cmd) { + CHECK(!empty()); + char *msg; + VLOG(sqlite) << "Start exec " << tag("cmd", cmd) << tag("db", raw_->db()); + auto rc = sqlite3_exec(raw_->db(), cmd.c_str(), nullptr, nullptr, &msg); + VLOG(sqlite) << "Finish exec " << tag("cmd", cmd) << tag("db", raw_->db()); + if (rc != SQLITE_OK) { + CHECK(msg != nullptr); + return Status::Error(PSLICE() << tag("query", cmd) << " failed: " << msg); + } + CHECK(msg == nullptr); + return Status::OK(); +} + +Result<bool> SqliteDb::has_table(Slice table) { + TRY_RESULT(stmt, get_statement(PSLICE() << "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='" << table + << "'")); + TRY_STATUS(stmt.step()); + CHECK(stmt.has_row()); + auto cnt = stmt.view_int32(0); + return cnt == 1; +} +Result<string> SqliteDb::get_pragma(Slice name) { + TRY_RESULT(stmt, get_statement(PSLICE() << "PRAGMA " << name)); + TRY_STATUS(stmt.step()); + CHECK(stmt.has_row()); + auto res = stmt.view_blob(0).str(); + TRY_STATUS(stmt.step()); + CHECK(!stmt.can_step()); + return std::move(res); +} + +Result<int32> SqliteDb::user_version() { + TRY_RESULT(get_version_stmt, get_statement("PRAGMA user_version")); + TRY_STATUS(get_version_stmt.step()); + if (!get_version_stmt.has_row()) { + return Status::Error("PRAGMA user_version failed"); + } + return get_version_stmt.view_int32(0); +} + +Status SqliteDb::set_user_version(int32 version) { + return exec(PSLICE() << "PRAGMA user_version = " << version); +} + +Status SqliteDb::begin_transaction() { + return exec("BEGIN"); +} +Status SqliteDb::commit_transaction() { + return exec("COMMIT"); +} + +bool SqliteDb::is_encrypted() { + return exec("SELECT count(*) FROM sqlite_master").is_error(); +} + +Result<SqliteDb> SqliteDb::open_with_key(CSlice path, const DbKey &db_key) { + SqliteDb db; + TRY_STATUS(db.init(path)); + if (!db_key.is_empty()) { + if (!db.is_encrypted()) { + return Status::Error("No key is needed"); + } + auto key = db_key_to_sqlcipher_key(db_key); + TRY_STATUS(db.exec(PSLICE() << "PRAGMA key = " << key)); + } + if (db.is_encrypted()) { + return Status::Error("Wrong key"); + } + return std::move(db); +} + +Status SqliteDb::change_key(CSlice path, const DbKey &new_db_key, const DbKey &old_db_key) { + // fast path + { + auto r_db = open_with_key(path, new_db_key); + if (r_db.is_ok()) { + return Status::OK(); + } + } + + TRY_RESULT(db, open_with_key(path, old_db_key)); + TRY_RESULT(user_version, db.user_version()); + auto new_key = db_key_to_sqlcipher_key(new_db_key); + if (old_db_key.is_empty() && !new_db_key.is_empty()) { + LOG(DEBUG) << "ENCRYPT"; + // Encrypt + PerfWarningTimer timer("Encrypt sqlite database", 0.1); + auto tmp_path = path.str() + ".ecnrypted"; + unlink(tmp_path).ignore(); + + // make shure that database is not empty + TRY_STATUS(db.exec("CREATE TABLE IF NOT EXISTS encryption_dummy_table(id INT PRIMARY KEY)")); + //NB: not really safe + TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << tmp_path << "' AS encrypted KEY " << new_key)); + TRY_STATUS(db.exec("SELECT sqlcipher_export('encrypted')")); + TRY_STATUS(db.exec(PSLICE() << "PRAGMA encrypted.user_version = " << user_version)); + TRY_STATUS(db.exec("DETACH DATABASE encrypted")); + db.close(); + TRY_STATUS(rename(tmp_path, path)); + } else if (!old_db_key.is_empty() && new_db_key.is_empty()) { + LOG(DEBUG) << "DECRYPT"; + // Dectypt + PerfWarningTimer timer("Decrypt sqlite database", 0.1); + auto tmp_path = path.str() + ".ecnrypted"; + unlink(tmp_path).ignore(); + + //NB: not really safe + TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << tmp_path << "' AS decrypted KEY ''")); + TRY_STATUS(db.exec("SELECT sqlcipher_export('decrypted')")); + TRY_STATUS(db.exec(PSLICE() << "PRAGMA decrypted.user_version = " << user_version)); + TRY_STATUS(db.exec("DETACH DATABASE decrypted")); + db.close(); + TRY_STATUS(rename(tmp_path, path)); + } else { + LOG(DEBUG) << "REKEY"; + PerfWarningTimer timer("Rekey sqlite database", 0.1); + TRY_STATUS(db.exec(PSLICE() << "PRAGMA rekey = " << new_key)); + } + + TRY_RESULT(new_db, open_with_key(path, new_db_key)); + CHECK(new_db.user_version().ok() == user_version); + return Status::OK(); +} +Status SqliteDb::destroy(Slice path) { + return detail::RawSqliteDb::destroy(path); +} + +Result<SqliteStatement> SqliteDb::get_statement(CSlice statement) { + sqlite3_stmt *stmt = nullptr; + auto rc = sqlite3_prepare_v2(get_native(), statement.c_str(), static_cast<int>(statement.size()) + 1, &stmt, nullptr); + if (rc != SQLITE_OK) { + return Status::Error(PSLICE() << "Failed to prepare sqlite " << tag("stmt", statement) << raw_->last_error()); + } + return SqliteStatement(stmt, raw_); +} +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/SqliteDb.h b/libs/tdlib/td/tddb/td/db/SqliteDb.h new file mode 100644 index 0000000000..40137464ce --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/SqliteDb.h @@ -0,0 +1,86 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/db/DbKey.h" +#include "td/db/SqliteStatement.h" + +#include "td/db/detail/RawSqliteDb.h" + +#include "td/utils/logging.h" +#include "td/utils/Slice.h" +#include "td/utils/Status.h" + +#include <memory> + +struct sqlite3; + +namespace td { + +class SqliteDb { + public: + SqliteDb() = default; + explicit SqliteDb(CSlice path) { + auto status = init(path); + LOG_IF(FATAL, status.is_error()) << status; + } + SqliteDb(SqliteDb &&) = default; + SqliteDb &operator=(SqliteDb &&) = default; + SqliteDb(const SqliteDb &) = delete; + SqliteDb &operator=(const SqliteDb &) = delete; + ~SqliteDb(); + + // dangerous + SqliteDb clone() const { + return SqliteDb(raw_); + } + + bool empty() const { + return !raw_; + } + void close() { + *this = SqliteDb(); + } + + Status init(CSlice path, bool *was_created = nullptr) TD_WARN_UNUSED_RESULT; + Status exec(CSlice cmd) TD_WARN_UNUSED_RESULT; + Result<bool> has_table(Slice table); + Result<string> get_pragma(Slice name); + Status begin_transaction(); + Status commit_transaction(); + + Result<int32> user_version(); + Status set_user_version(int32 version); + void trace(bool flag); + + static Status destroy(Slice path) TD_WARN_UNUSED_RESULT; + + // Anyway we can't change the key on the fly, so static functions is more than enough + static Result<SqliteDb> open_with_key(CSlice path, const DbKey &db_key); + static Status change_key(CSlice path, const DbKey &new_db_key, const DbKey &old_db_key); + + Status last_error(); + + sqlite3 *get_native() const { + return raw_->db(); + } + + Result<SqliteStatement> get_statement(CSlice statement) TD_WARN_UNUSED_RESULT; + + template <class F> + static void with_db_path(Slice main_path, F &&f) { + detail::RawSqliteDb::with_db_path(main_path, f); + } + + private: + explicit SqliteDb(std::shared_ptr<detail::RawSqliteDb> raw) : raw_(std::move(raw)) { + } + std::shared_ptr<detail::RawSqliteDb> raw_; + + bool is_encrypted(); +}; +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/SqliteKeyValue.h b/libs/tdlib/td/tddb/td/db/SqliteKeyValue.h new file mode 100644 index 0000000000..6fe050e7f2 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/SqliteKeyValue.h @@ -0,0 +1,221 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/db/SqliteDb.h" +#include "td/db/SqliteStatement.h" + +#include "td/utils/logging.h" +#include "td/utils/ScopeGuard.h" +#include "td/utils/Slice.h" +#include "td/utils/Status.h" + +#include <unordered_map> + +namespace td { +class SqliteKeyValue { + public: + static Status drop(SqliteDb &connection, Slice kv_name) TD_WARN_UNUSED_RESULT { + return connection.exec(PSLICE() << "DROP TABLE IF EXISTS " << kv_name); + } + + static Status init(SqliteDb &connection, Slice kv_name) TD_WARN_UNUSED_RESULT { + return connection.exec(PSLICE() << "CREATE TABLE IF NOT EXISTS " << kv_name << " (k BLOB PRIMARY KEY, v BLOB)"); + } + + using SeqNo = uint64; + Result<bool> init(string name) TD_WARN_UNUSED_RESULT { + name_ = std::move(name); + bool is_created = false; + SqliteDb db; + TRY_STATUS(db.init(name, &is_created)); + TRY_STATUS(db.exec("PRAGMA encoding=\"UTF-8\"")); + TRY_STATUS(db.exec("PRAGMA synchronous=NORMAL")); + TRY_STATUS(db.exec("PRAGMA journal_mode=WAL")); + TRY_STATUS(db.exec("PRAGMA temp_store=MEMORY")); + TRY_STATUS(init_with_connection(std::move(db), "KV")); + return is_created; + } + + Status init_with_connection(SqliteDb connection, string kv_name) { + db_ = std::move(connection); + kv_name_ = std::move(kv_name); + TRY_STATUS(init(db_, kv_name_)); + TRY_STATUS(db_.exec(PSLICE() << "CREATE TABLE IF NOT EXISTS " << kv_name_ << " (k BLOB PRIMARY KEY, v BLOB)")); + + TRY_RESULT(set_stmt, db_.get_statement(PSLICE() << "REPLACE INTO " << kv_name_ << " (k, v) VALUES (?1, ?2)")); + set_stmt_ = std::move(set_stmt); + TRY_RESULT(get_stmt, db_.get_statement(PSLICE() << "SELECT v FROM " << kv_name_ << " WHERE k = ?1")); + get_stmt_ = std::move(get_stmt); + TRY_RESULT(erase_stmt, db_.get_statement(PSLICE() << "DELETE FROM " << kv_name_ << " WHERE k = ?1")); + erase_stmt_ = std::move(erase_stmt); + TRY_RESULT(get_all_stmt, db_.get_statement(PSLICE() << "SELECT k, v FROM " << kv_name_ << "")); + + TRY_RESULT(erase_by_prefix_stmt, + db_.get_statement(PSLICE() << "DELETE FROM " << kv_name_ << " WHERE ?1 <= k AND k < ?2")); + erase_by_prefix_stmt_ = std::move(erase_by_prefix_stmt); + + TRY_RESULT(erase_by_prefix_rare_stmt, + db_.get_statement(PSLICE() << "DELETE FROM " << kv_name_ << " WHERE ?1 <= k")); + erase_by_prefix_rare_stmt_ = std::move(erase_by_prefix_rare_stmt); + + TRY_RESULT(get_by_prefix_stmt, + db_.get_statement(PSLICE() << "SELECT k, v FROM " << kv_name_ << " WHERE ?1 <= k AND k < ?2")); + get_by_prefix_stmt_ = std::move(get_by_prefix_stmt); + + TRY_RESULT(get_by_prefix_rare_stmt, + db_.get_statement(PSLICE() << "SELECT k, v FROM " << kv_name_ << " WHERE ?1 <= k")); + get_by_prefix_rare_stmt_ = std::move(get_by_prefix_rare_stmt); + + get_all_stmt_ = std::move(get_all_stmt); + return Status::OK(); + } + + Result<bool> try_regenerate_index() TD_WARN_UNUSED_RESULT { + return false; + } + void close() { + clear(); + } + void close_silent() { + clear(); + } + static Status destroy(Slice name) { + return SqliteDb::destroy(name); + } + void close_and_destroy() { + db_.exec(PSLICE() << "DROP TABLE IF EXISTS " << kv_name_).ensure(); + auto name = std::move(name_); + clear(); + if (!name.empty()) { + SqliteDb::destroy(name).ignore(); + } + } + + SeqNo set(Slice key, Slice value) { + set_stmt_.bind_blob(1, key).ensure(); + set_stmt_.bind_blob(2, value).ensure(); + set_stmt_.step().ensure(); + set_stmt_.reset(); + return 0; + } + + SeqNo erase(Slice key) { + erase_stmt_.bind_blob(1, key).ensure(); + erase_stmt_.step().ensure(); + erase_stmt_.reset(); + return 0; + } + string get(Slice key) { + SCOPE_EXIT { + get_stmt_.reset(); + }; + get_stmt_.bind_blob(1, key).ensure(); + get_stmt_.step().ensure(); + if (!get_stmt_.has_row()) { + return ""; + } + auto data = get_stmt_.view_blob(0).str(); + get_stmt_.step().ignore(); + return data; + } + + Status begin_transaction() { + return db_.begin_transaction(); + } + Status commit_transaction() { + return db_.commit_transaction(); + } + + void erase_by_prefix(Slice prefix) { + auto next = next_prefix(prefix); + if (next.empty()) { + SCOPE_EXIT { + erase_by_prefix_rare_stmt_.reset(); + }; + erase_by_prefix_rare_stmt_.bind_blob(1, prefix).ensure(); + erase_by_prefix_rare_stmt_.step().ensure(); + } else { + SCOPE_EXIT { + erase_by_prefix_stmt_.reset(); + }; + erase_by_prefix_stmt_.bind_blob(1, prefix).ensure(); + erase_by_prefix_stmt_.bind_blob(2, next).ensure(); + erase_by_prefix_stmt_.step().ensure(); + } + }; + + std::unordered_map<string, string> get_all() { + std::unordered_map<string, string> res; + get_by_prefix("", [&](Slice key, Slice value) { res.emplace(key.str(), value.str()); }); + return res; + } + + template <class CallbackT> + void get_by_prefix(Slice prefix, CallbackT &&callback) { + string next; + if (!prefix.empty()) { + next = next_prefix(prefix); + } + get_by_range(prefix, next, callback); + } + template <class CallbackT> + void get_by_range(Slice from, Slice till, CallbackT &&callback) { + SqliteStatement *stmt = nullptr; + if (from.empty()) { + stmt = &get_all_stmt_; + } else { + if (till.empty()) { + stmt = &get_by_prefix_rare_stmt_; + stmt->bind_blob(1, till).ensure(); + } else { + stmt = &get_by_prefix_stmt_; + stmt->bind_blob(1, from).ensure(); + stmt->bind_blob(2, till).ensure(); + } + } + auto guard = stmt->guard(); + stmt->step().ensure(); + while (stmt->has_row()) { + callback(stmt->view_blob(0), stmt->view_blob(1)); + stmt->step().ensure(); + } + } + + void clear() { + *this = SqliteKeyValue(); + } + + private: + string name_; // deprecated + string kv_name_; + SqliteDb db_; + SqliteStatement get_stmt_; + SqliteStatement set_stmt_; + SqliteStatement erase_stmt_; + SqliteStatement get_all_stmt_; + SqliteStatement erase_by_prefix_stmt_; + SqliteStatement erase_by_prefix_rare_stmt_; + SqliteStatement get_by_prefix_stmt_; + SqliteStatement get_by_prefix_rare_stmt_; + + string next_prefix(Slice prefix) { + string next = prefix.str(); + size_t pos = next.size(); + while (pos) { + pos--; + auto value = static_cast<uint8>(next[pos]); + value++; + next[pos] = static_cast<char>(value); + if (value != 0) { + return next; + } + } + return string{}; + } +}; +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp b/libs/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp new file mode 100644 index 0000000000..e8575b62f1 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/SqliteKeyValueAsync.cpp @@ -0,0 +1,141 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/SqliteKeyValueAsync.h" + +#include "td/utils/optional.h" +#include "td/utils/Time.h" + +#include <unordered_map> + +namespace td { +class SqliteKeyValueAsync : public SqliteKeyValueAsyncInterface { + public: + explicit SqliteKeyValueAsync(std::shared_ptr<SqliteKeyValueSafe> kv_safe, int32 scheduler_id = -1) { + impl_ = create_actor_on_scheduler<Impl>("KV", scheduler_id, std::move(kv_safe)); + } + void set(string key, string value, Promise<> promise) override { + send_closure_later(impl_, &Impl::set, std::move(key), std::move(value), std::move(promise)); + } + void erase(string key, Promise<> promise) override { + send_closure_later(impl_, &Impl::erase, std::move(key), std::move(promise)); + } + void get(string key, Promise<string> promise) override { + send_closure_later(impl_, &Impl::get, std::move(key), std::move(promise)); + } + void close(Promise<> promise) override { + send_closure_later(impl_, &Impl::close, std::move(promise)); + } + + private: + class Impl : public Actor { + public: + explicit Impl(std::shared_ptr<SqliteKeyValueSafe> kv_safe) : kv_safe_(std::move(kv_safe)) { + } + void set(string key, string value, Promise<> promise) { + auto it = buffer_.find(key); + if (it != buffer_.end()) { + it->second = std::move(value); + } else { + buffer_.emplace(std::move(key), std::move(value)); + } + if (promise) { + buffer_promises_.push_back(std::move(promise)); + } + cnt_++; + do_flush(false /*force*/); + } + void erase(string key, Promise<> promise) { + auto it = buffer_.find(key); + if (it != buffer_.end()) { + it->second = optional<string>(); + } else { + buffer_.emplace(std::move(key), optional<string>()); + } + if (promise) { + buffer_promises_.push_back(std::move(promise)); + } + cnt_++; + do_flush(false /*force*/); + } + + void get(const string &key, Promise<string> promise) { + auto it = buffer_.find(key); + if (it != buffer_.end()) { + return promise.set_value(it->second ? it->second.value() : ""); + } + promise.set_value(kv_->get(key)); + } + void close(Promise<> promise) { + do_flush(true /*force*/); + kv_safe_.reset(); + kv_ = nullptr; + stop(); + promise.set_value(Unit()); + } + + private: + std::shared_ptr<SqliteKeyValueSafe> kv_safe_; + SqliteKeyValue *kv_ = nullptr; + + static constexpr double MAX_PENDING_QUERIES_DELAY = 1; + static constexpr size_t MAX_PENDING_QUERIES_COUNT = 100; + std::unordered_map<string, optional<string>> buffer_; + std::vector<Promise<>> buffer_promises_; + size_t cnt_ = 0; + + double wakeup_at_ = 0; + void do_flush(bool force) { + if (buffer_.empty()) { + return; + } + + if (!force) { + auto now = Time::now_cached(); + if (wakeup_at_ == 0) { + wakeup_at_ = now + MAX_PENDING_QUERIES_DELAY; + } + if (now < wakeup_at_ && cnt_ < MAX_PENDING_QUERIES_COUNT) { + set_timeout_at(wakeup_at_); + return; + } + } + + wakeup_at_ = 0; + cnt_ = 0; + + kv_->begin_transaction().ensure(); + for (auto &it : buffer_) { + if (it.second) { + kv_->set(it.first, it.second.value()); + } else { + kv_->erase(it.first); + } + } + kv_->commit_transaction().ensure(); + buffer_.clear(); + for (auto &promise : buffer_promises_) { + promise.set_value(Unit()); + } + buffer_promises_.clear(); + } + + void timeout_expired() override { + do_flush(false /*force*/); + } + + void start_up() override { + kv_ = &kv_safe_->get(); + } + }; + ActorOwn<Impl> impl_; +}; +std::unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv, + int32 scheduler_id) { + return std::make_unique<SqliteKeyValueAsync>(std::move(kv), scheduler_id); +} + +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h b/libs/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h new file mode 100644 index 0000000000..6015d26fb2 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/SqliteKeyValueAsync.h @@ -0,0 +1,30 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/db/SqliteKeyValueSafe.h" + +#include "td/actor/PromiseFuture.h" + +#include <memory> + +namespace td { + +class SqliteKeyValueAsyncInterface { + public: + virtual ~SqliteKeyValueAsyncInterface() = default; + + virtual void set(string key, string value, Promise<> promise) = 0; + virtual void erase(string key, Promise<> promise) = 0; + + virtual void get(string key, Promise<string> promise) = 0; + virtual void close(Promise<> promise) = 0; +}; + +std::unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv, + int32 scheduler_id = 1); +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h b/libs/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h new file mode 100644 index 0000000000..d63af3cfb2 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/SqliteKeyValueSafe.h @@ -0,0 +1,36 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/db/SqliteConnectionSafe.h" +#include "td/db/SqliteKeyValue.h" + +#include <memory> + +namespace td { + +class SqliteKeyValueSafe { + public: + SqliteKeyValueSafe(string name, std::shared_ptr<SqliteConnectionSafe> safe_connection) + : lsls_kv_([name = std::move(name), safe_connection = std::move(safe_connection)] { + SqliteKeyValue kv; + kv.init_with_connection(safe_connection->get().clone(), name).ensure(); + return kv; + }) { + } + SqliteKeyValue &get() { + return lsls_kv_.get(); + } + void close() { + lsls_kv_.clear_values(); + } + + private: + LazySchedulerLocalStorage<SqliteKeyValue> lsls_kv_; +}; + +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/SqliteStatement.cpp b/libs/tdlib/td/tddb/td/db/SqliteStatement.cpp new file mode 100644 index 0000000000..c9ce8c3e8e --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/SqliteStatement.cpp @@ -0,0 +1,203 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/SqliteStatement.h" + +#include "td/utils/format.h" +#include "td/utils/logging.h" +#include "td/utils/StackAllocator.h" +#include "td/utils/StringBuilder.h" + +#include "sqlite/sqlite3.h" + +namespace td { + +namespace { +int printExplainQueryPlan(StringBuilder &sb, sqlite3_stmt *pStmt) { + const char *zSql = sqlite3_sql(pStmt); + if (zSql == nullptr) { + return SQLITE_ERROR; + } + + sb << "Explain " << tag("cmd", zSql); + char *zExplain = sqlite3_mprintf("EXPLAIN QUERY PLAN %s", zSql); + if (zExplain == nullptr) { + return SQLITE_NOMEM; + } + + sqlite3_stmt *pExplain; /* Compiled EXPLAIN QUERY PLAN command */ + int rc = sqlite3_prepare_v2(sqlite3_db_handle(pStmt), zExplain, -1, &pExplain, nullptr); + sqlite3_free(zExplain); + if (rc != SQLITE_OK) { + return rc; + } + + while (SQLITE_ROW == sqlite3_step(pExplain)) { + int iSelectid = sqlite3_column_int(pExplain, 0); + int iOrder = sqlite3_column_int(pExplain, 1); + int iFrom = sqlite3_column_int(pExplain, 2); + const char *zDetail = reinterpret_cast<const char *>(sqlite3_column_text(pExplain, 3)); + + sb << "\n" << iSelectid << " " << iOrder << " " << iFrom << " " << zDetail; + } + + return sqlite3_finalize(pExplain); +} +} // namespace + +SqliteStatement::SqliteStatement(sqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db) + : stmt_(stmt), db_(std::move(db)) { + CHECK(stmt != nullptr); +} +SqliteStatement::~SqliteStatement() = default; + +Result<string> SqliteStatement::explain() { + if (empty()) { + return Status::Error("No statement"); + } + auto tmp = StackAllocator::alloc(10000); + StringBuilder sb(tmp.as_slice()); + auto code = printExplainQueryPlan(sb, stmt_.get()); + if (code != SQLITE_OK) { + return last_error(); + } + if (sb.is_error()) { + return Status::Error("StringBuilder buffer overflow"); + } + return sb.as_cslice().str(); +} +Status SqliteStatement::bind_blob(int id, Slice blob) { + auto rc = sqlite3_bind_blob(stmt_.get(), id, blob.data(), static_cast<int>(blob.size()), nullptr); + if (rc != SQLITE_OK) { + return last_error(); + } + return Status::OK(); +} +Status SqliteStatement::bind_string(int id, Slice str) { + auto rc = sqlite3_bind_text(stmt_.get(), id, str.data(), static_cast<int>(str.size()), nullptr); + if (rc != SQLITE_OK) { + return last_error(); + } + return Status::OK(); +} + +Status SqliteStatement::bind_int32(int id, int32 value) { + auto rc = sqlite3_bind_int(stmt_.get(), id, value); + if (rc != SQLITE_OK) { + return last_error(); + } + return Status::OK(); +} +Status SqliteStatement::bind_int64(int id, int64 value) { + auto rc = sqlite3_bind_int64(stmt_.get(), id, value); + if (rc != SQLITE_OK) { + return last_error(); + } + return Status::OK(); +} +Status SqliteStatement::bind_null(int id) { + auto rc = sqlite3_bind_null(stmt_.get(), id); + if (rc != SQLITE_OK) { + return last_error(); + } + return Status::OK(); +} + +StringBuilder &operator<<(StringBuilder &sb, SqliteStatement::Datatype type) { + using Datatype = SqliteStatement::Datatype; + switch (type) { + case Datatype::Integer: + return sb << "Integer"; + case Datatype::Float: + return sb << "Float"; + case Datatype::Blob: + return sb << "Blob"; + case Datatype::Null: + return sb << "Null"; + case Datatype::Text: + return sb << "Text"; + } + UNREACHABLE(); + return sb; +} +Slice SqliteStatement::view_blob(int id) { + LOG_IF(ERROR, view_datatype(id) != Datatype::Blob) << view_datatype(id); + auto *data = sqlite3_column_blob(stmt_.get(), id); + auto size = sqlite3_column_bytes(stmt_.get(), id); + if (data == nullptr) { + return Slice(); + } + return Slice(static_cast<const char *>(data), size); +} +Slice SqliteStatement::view_string(int id) { + LOG_IF(ERROR, view_datatype(id) != Datatype::Text) << view_datatype(id); + auto *data = sqlite3_column_text(stmt_.get(), id); + auto size = sqlite3_column_bytes(stmt_.get(), id); + if (data == nullptr) { + return Slice(); + } + return Slice(data, size); +} +int32 SqliteStatement::view_int32(int id) { + LOG_IF(ERROR, view_datatype(id) != Datatype::Integer) << view_datatype(id); + return sqlite3_column_int(stmt_.get(), id); +} +int64 SqliteStatement::view_int64(int id) { + LOG_IF(ERROR, view_datatype(id) != Datatype::Integer) << view_datatype(id); + return sqlite3_column_int64(stmt_.get(), id); +} +SqliteStatement::Datatype SqliteStatement::view_datatype(int id) { + auto type = sqlite3_column_type(stmt_.get(), id); + switch (type) { + case SQLITE_INTEGER: + return Datatype::Integer; + case SQLITE_FLOAT: + return Datatype::Float; + case SQLITE_BLOB: + return Datatype::Blob; + case SQLITE_NULL: + return Datatype::Null; + case SQLITE3_TEXT: + return Datatype::Text; + default: + UNREACHABLE(); + } +} + +void SqliteStatement::reset() { + sqlite3_reset(stmt_.get()); + state_ = Start; +} + +Status SqliteStatement::step() { + if (state_ == Finish) { + return Status::Error("One has to reset statement"); + } + VLOG(sqlite) << "Start step " << tag("cmd", sqlite3_sql(stmt_.get())) << tag("stmt", stmt_.get()) + << tag("db", db_.get()); + auto rc = sqlite3_step(stmt_.get()); + VLOG(sqlite) << "Finish step " << tag("cmd", sqlite3_sql(stmt_.get())) << tag("stmt", stmt_.get()) + << tag("db", db_.get()); + if (rc == SQLITE_ROW) { + state_ = GotRow; + return Status::OK(); + } + if (rc == SQLITE_DONE) { + state_ = Finish; + return Status::OK(); + } + state_ = Finish; + return last_error(); +} + +void SqliteStatement::StmtDeleter::operator()(sqlite3_stmt *stmt) { + sqlite3_finalize(stmt); +} + +Status SqliteStatement::last_error() { + return db_->last_error(); +} +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/SqliteStatement.h b/libs/tdlib/td/tddb/td/db/SqliteStatement.h new file mode 100644 index 0000000000..2e2182ff7e --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/SqliteStatement.h @@ -0,0 +1,80 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/common.h" +#include "td/utils/Slice.h" +#include "td/utils/Status.h" + +#include "td/db/detail/RawSqliteDb.h" + +#include <memory> + +struct sqlite3; +struct sqlite3_stmt; + +namespace td { + +class SqliteStatement { + public: + SqliteStatement() = default; + SqliteStatement(const SqliteStatement &other) = delete; + SqliteStatement &operator=(const SqliteStatement &other) = delete; + SqliteStatement(SqliteStatement &&other) = default; + SqliteStatement &operator=(SqliteStatement &&other) = default; + ~SqliteStatement(); + + Status bind_blob(int id, Slice blob) TD_WARN_UNUSED_RESULT; + Status bind_string(int id, Slice str) TD_WARN_UNUSED_RESULT; + Status bind_int32(int id, int32 value) TD_WARN_UNUSED_RESULT; + Status bind_int64(int id, int64 value) TD_WARN_UNUSED_RESULT; + Status bind_null(int id) TD_WARN_UNUSED_RESULT; + Status step() TD_WARN_UNUSED_RESULT; + Slice view_string(int id) TD_WARN_UNUSED_RESULT; + Slice view_blob(int id) TD_WARN_UNUSED_RESULT; + int32 view_int32(int id) TD_WARN_UNUSED_RESULT; + int64 view_int64(int id) TD_WARN_UNUSED_RESULT; + enum class Datatype { Integer, Float, Blob, Null, Text }; + Datatype view_datatype(int id); + + Result<string> explain(); + + bool can_step() const { + return state_ != Finish; + } + bool has_row() const { + return state_ == GotRow; + } + bool empty() const { + return !stmt_; + } + + void reset(); + + auto guard() { + return ScopeExit{} + [this] { this->reset(); }; + } + + // TODO get row + + private: + friend class SqliteDb; + SqliteStatement(sqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db); + + class StmtDeleter { + public: + void operator()(sqlite3_stmt *stmt); + }; + + enum { Start, GotRow, Finish } state_ = Start; + + std::unique_ptr<sqlite3_stmt, StmtDeleter> stmt_; + std::shared_ptr<detail::RawSqliteDb> db_; + + Status last_error(); +}; +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/TsSeqKeyValue.h b/libs/tdlib/td/tddb/td/db/TsSeqKeyValue.h new file mode 100644 index 0000000000..8d94d79673 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/TsSeqKeyValue.h @@ -0,0 +1,71 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/db/SeqKeyValue.h" + +#include "td/utils/port/RwMutex.h" +#include "td/utils/Slice.h" + +#include <unordered_map> +#include <utility> + +namespace td { +class TsSeqKeyValue { + public: + using SeqNo = SeqKeyValue::SeqNo; + TsSeqKeyValue() = default; + explicit TsSeqKeyValue(SeqKeyValue kv) : kv_(std::move(kv)) { + } + + TsSeqKeyValue(TsSeqKeyValue &&) = default; + TsSeqKeyValue &operator=(TsSeqKeyValue &&) = default; + TsSeqKeyValue(const TsSeqKeyValue &) = delete; + TsSeqKeyValue &operator=(const TsSeqKeyValue &) = delete; + ~TsSeqKeyValue() = default; + + SeqNo set(Slice key, Slice value) { + auto lock = rw_mutex_.lock_write().move_as_ok(); + return kv_.set(key, value); + } + std::pair<SeqNo, RwMutex::WriteLock> set_and_lock(Slice key, Slice value) { + auto lock = rw_mutex_.lock_write().move_as_ok(); + return std::make_pair(kv_.set(key, value), std::move(lock)); + } + SeqNo erase(const string &key) { + auto lock = rw_mutex_.lock_write().move_as_ok(); + return kv_.erase(key); + } + std::pair<SeqNo, RwMutex::WriteLock> erase_and_lock(const string &key) { + auto lock = rw_mutex_.lock_write().move_as_ok(); + return std::make_pair(kv_.erase(key), std::move(lock)); + } + string get(const string &key) { + auto lock = rw_mutex_.lock_read().move_as_ok(); + return kv_.get(key); + } + size_t size() const { + return kv_.size(); + } + std::unordered_map<string, string> get_all() { + auto lock = rw_mutex_.lock_write().move_as_ok(); + return kv_.get_all(); + } + // not thread safe method + SeqKeyValue &inner() { + return kv_; + } + + auto lock() { + return rw_mutex_.lock_write().move_as_ok(); + } + + private: + RwMutex rw_mutex_; + SeqKeyValue kv_; +}; +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/Binlog.cpp b/libs/tdlib/td/tddb/td/db/binlog/Binlog.cpp new file mode 100644 index 0000000000..5d76028dba --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/Binlog.cpp @@ -0,0 +1,629 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/binlog/Binlog.h" + +#include "td/db/binlog/detail/BinlogEventsBuffer.h" +#include "td/db/binlog/detail/BinlogEventsProcessor.h" + +#include "td/utils/buffer.h" +#include "td/utils/format.h" +#include "td/utils/logging.h" +#include "td/utils/misc.h" +#include "td/utils/port/Clocks.h" +#include "td/utils/port/Fd.h" +#include "td/utils/port/path.h" +#include "td/utils/port/Stat.h" +#include "td/utils/Random.h" +#include "td/utils/ScopeGuard.h" +#include "td/utils/Status.h" +#include "td/utils/Time.h" +#include "td/utils/tl_helpers.h" +#include "td/utils/tl_parsers.h" + +namespace td { +namespace detail { +struct AesCtrEncryptionEvent { + static constexpr size_t min_salt_size() { + return 16; // 256 bits + } + static constexpr size_t default_salt_size() { + return 32; // 256 bits + } + static constexpr size_t key_size() { + return 32; // 256 bits + } + static constexpr size_t iv_size() { + return 16; // 128 bits + } + static constexpr size_t hash_size() { + return 32; // 256 bits + } + static constexpr size_t kdf_iteration_count() { + return 60002; + } + static constexpr size_t kdf_fast_iteration_count() { + return 2; + } + + BufferSlice key_salt_; + BufferSlice iv_; + BufferSlice key_hash_; + + BufferSlice generate_key(const DbKey &db_key) { + CHECK(!db_key.is_empty()); + BufferSlice key(key_size()); + size_t iteration_count = kdf_iteration_count(); + if (db_key.is_raw_key()) { + iteration_count = kdf_fast_iteration_count(); + } + pbkdf2_sha256(db_key.data(), key_salt_.as_slice(), narrow_cast<int>(iteration_count), key.as_slice()); + return key; + } + BufferSlice generate_hash(Slice key) { + BufferSlice hash(hash_size()); + hmac_sha256(key, "cucumbers everywhere", hash.as_slice()); + return hash; + } + + template <class StorerT> + void store(StorerT &storer) const { + using td::store; + BEGIN_STORE_FLAGS(); + END_STORE_FLAGS(); + store(key_salt_, storer); + store(iv_, storer); + store(key_hash_, storer); + } + template <class ParserT> + void parse(ParserT &&parser) { + using td::parse; + BEGIN_PARSE_FLAGS(); + END_PARSE_FLAGS(); + parse(key_salt_, parser); + parse(iv_, parser); + parse(key_hash_, parser); + } +}; + +class BinlogReader { + public: + BinlogReader() = default; + explicit BinlogReader(ChainBufferReader *input) : input_(input) { + } + void set_input(ChainBufferReader *input) { + input_ = input; + } + + int64 offset() { + return offset_; + } + Result<size_t> read_next(BinlogEvent *event) { + if (state_ == ReadLength) { + if (input_->size() < 4) { + return 4; + } + auto it = input_->clone(); + + char buf[4]; + it.advance(4, MutableSlice(buf, 4)); + size_ = static_cast<size_t>(TlParser(Slice(buf, 4)).fetch_int()); + + if (size_ > MAX_EVENT_SIZE) { + return Status::Error(PSLICE() << "Too big event " << tag("size", size_)); + } + if (size_ < MIN_EVENT_SIZE) { + return Status::Error(PSLICE() << "Too small event " << tag("size", size_)); + } + state_ = ReadEvent; + } + + if (input_->size() < size_) { + return size_; + } + + TRY_STATUS(event->init(input_->cut_head(size_).move_as_buffer_slice())); + offset_ += size_; + event->offset_ = offset_; + state_ = ReadLength; + return 0; + } + + private: + ChainBufferReader *input_; + enum { ReadLength, ReadEvent } state_ = ReadLength; + size_t size_{0}; + int64 offset_{0}; +}; +} // namespace detail + +bool Binlog::IGNORE_ERASE_HACK = false; + +Binlog::Binlog() = default; + +Binlog::~Binlog() { + close().ignore(); +} + +Result<FileFd> Binlog::open_binlog(CSlice path, int32 flags) { + TRY_RESULT(fd, FileFd::open(path, flags)); + TRY_STATUS(fd.lock(FileFd::LockFlags::Write, 100)); + return std::move(fd); +} + +Status Binlog::init(string path, const Callback &callback, DbKey db_key, DbKey old_db_key, int32 dummy, + const Callback &debug_callback) { + close().ignore(); + + db_key_ = std::move(db_key); + old_db_key_ = std::move(old_db_key); + + processor_ = std::make_unique<detail::BinlogEventsProcessor>(); + // Turn off BinlogEventsBuffer + // events_buffer_ = std::make_unique<detail::BinlogEventsBuffer>(); + + // try to restore binlog from regenerated version + if (stat(path).is_error()) { + rename(PSLICE() << path << ".new", path).ignore(); + } + + info_ = BinlogInfo(); + info_.was_created = stat(path).is_error(); + + TRY_RESULT(fd, open_binlog(path, FileFd::Flags::Read | FileFd::Flags::Write | FileFd::Flags::Create)); + fd_ = BufferedFdBase<FileFd>(std::move(fd)); + fd_size_ = 0; + path_ = std::move(path); + + auto status = load_binlog(callback, debug_callback); + if (status.is_error()) { + close().ignore(); + return status; + } + info_.last_id = processor_->last_id(); + last_id_ = processor_->last_id(); + if (info_.wrong_password) { + close().ignore(); + return Status::Error(Error::WrongPassword, "Wrong password"); + } + + if ((!db_key_.is_empty() && !db_key_used_) || (db_key_.is_empty() && encryption_type_ != EncryptionType::None)) { + aes_ctr_key_salt_ = BufferSlice(); + do_reindex(); + } + + info_.is_opened = true; + return Status::OK(); +} + +void Binlog::add_event(BinlogEvent &&event) { + if (!events_buffer_) { + do_add_event(std::move(event)); + } else { + events_buffer_->add_event(std::move(event)); + } + lazy_flush(); + + if (state_ == State::Run) { + auto fd_size = fd_size_; + if (events_buffer_) { + fd_size += events_buffer_->size(); + } + auto need_reindex = [&](int64 min_size, int rate) { + return fd_size > min_size && fd_size / rate > processor_->total_raw_events_size(); + }; + if (need_reindex(100000, 5) || need_reindex(500000, 2)) { + LOG(INFO) << tag("fd_size", format::as_size(fd_size)) + << tag("total events size", format::as_size(processor_->total_raw_events_size())); + do_reindex(); + } + } +} + +size_t Binlog::flush_events_buffer(bool force) { + if (!events_buffer_) { + return 0; + } + if (!force && !events_buffer_->need_flush()) { + return events_buffer_->size(); + } + CHECK(!in_flush_events_buffer_); + in_flush_events_buffer_ = true; + events_buffer_->flush([&](BinlogEvent &&event) { this->do_add_event(std::move(event)); }); + in_flush_events_buffer_ = false; + return 0; +} + +void Binlog::do_add_event(BinlogEvent &&event) { + if (event.flags_ & BinlogEvent::Flags::Partial) { + event.flags_ &= ~BinlogEvent::Flags::Partial; + pending_events_.emplace_back(std::move(event)); + } else { + for (auto &pending_event : pending_events_) { + do_event(std::move(pending_event)); + } + pending_events_.clear(); + do_event(std::move(event)); + } +} + +Status Binlog::close(bool need_sync) { + if (fd_.empty()) { + return Status::OK(); + } + SCOPE_EXIT { + path_ = ""; + info_.is_opened = false; + fd_.close(); + need_sync_ = false; + }; + if (need_sync) { + sync(); + } else { + flush(); + } + return Status::OK(); +} + +void Binlog::change_key(DbKey new_db_key) { + db_key_ = std::move(new_db_key); + aes_ctr_key_salt_ = BufferSlice(); + do_reindex(); +} + +Status Binlog::close_and_destroy() { + auto path = path_; + auto close_status = close(false); + destroy(path).ignore(); + return close_status; +} +Status Binlog::destroy(Slice path) { + unlink(PSLICE() << path).ignore(); + unlink(PSLICE() << path << ".new").ignore(); + return Status::OK(); +} + +void Binlog::do_event(BinlogEvent &&event) { + fd_events_++; + fd_size_ += event.raw_event_.size(); + + if (state_ == State::Run || state_ == State::Reindex) { + VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] ") << event; + switch (encryption_type_) { + case EncryptionType::None: { + buffer_writer_.append(event.raw_event_.clone()); + break; + } + case EncryptionType::AesCtr: { + buffer_writer_.append(event.raw_event_.as_slice()); + break; + } + } + } + + if (event.type_ < 0) { + if (event.type_ == BinlogEvent::ServiceTypes::AesCtrEncryption) { + detail::AesCtrEncryptionEvent encryption_event; + encryption_event.parse(TlParser(event.data_)); + + BufferSlice key; + if (aes_ctr_key_salt_.as_slice() == encryption_event.key_salt_.as_slice()) { + key = BufferSlice(Slice(aes_ctr_key_.raw, sizeof(aes_ctr_key_.raw))); + } else if (!db_key_.is_empty()) { + key = encryption_event.generate_key(db_key_); + } + + if (encryption_event.generate_hash(key.as_slice()).as_slice() != encryption_event.key_hash_.as_slice()) { + CHECK(state_ == State::Load); + if (!old_db_key_.is_empty()) { + key = encryption_event.generate_key(old_db_key_); + if (encryption_event.generate_hash(key.as_slice()).as_slice() != encryption_event.key_hash_.as_slice()) { + info_.wrong_password = true; + } + } else { + info_.wrong_password = true; + } + } else { + db_key_used_ = true; + } + + encryption_type_ = EncryptionType::AesCtr; + + aes_ctr_key_salt_ = encryption_event.key_salt_.copy(); + update_encryption(key.as_slice(), encryption_event.iv_.as_slice()); + + if (state_ == State::Load) { + update_read_encryption(); + LOG(INFO) << "Load: init encryption"; + } else { + CHECK(state_ == State::Reindex); + flush(); + update_write_encryption(); + //LOG(INFO) << format::cond(state_ == State::Run, "Run", "Reindex") << ": init encryption"; + } + return; + } + } + + if (state_ != State::Reindex) { + processor_->add_event(std::move(event)); + } +} + +void Binlog::sync() { + flush(); + if (need_sync_) { + auto status = fd_.sync(); + LOG_IF(FATAL, status.is_error()) << "Failed to sync binlog: " << status; + need_sync_ = false; + } +} + +void Binlog::flush() { + if (state_ == State::Load) { + return; + } + flush_events_buffer(true); + // NB: encryption happens during flush + if (byte_flow_flag_) { + byte_flow_source_.wakeup(); + } + auto r_written = fd_.flush_write(); + r_written.ensure(); + auto written = r_written.ok(); + if (written > 0) { + need_sync_ = true; + } + need_flush_since_ = 0; + LOG_IF(FATAL, fd_.need_flush_write()) << "Failed to flush binlog"; +} + +void Binlog::lazy_flush() { + size_t events_buffer_size = flush_events_buffer(false /*force*/); + buffer_reader_.sync_with_writer(); + auto size = buffer_reader_.size() + events_buffer_size; + if (size > (1 << 14)) { + flush(); + } else if (size > 0 && need_flush_since_ == 0) { + need_flush_since_ = Time::now_cached(); + } +} + +void Binlog::update_read_encryption() { + CHECK(binlog_reader_ptr_); + switch (encryption_type_) { + case EncryptionType::None: { + binlog_reader_ptr_->set_input(&buffer_reader_); + byte_flow_flag_ = false; + break; + } + case EncryptionType::AesCtr: { + byte_flow_source_ = ByteFlowSource(&buffer_reader_); + aes_xcode_byte_flow_ = AesCtrByteFlow(); + aes_xcode_byte_flow_.init(std::move(aes_ctr_state_)); + byte_flow_sink_ = ByteFlowSink(); + byte_flow_source_ >> aes_xcode_byte_flow_ >> byte_flow_sink_; + byte_flow_flag_ = true; + binlog_reader_ptr_->set_input(byte_flow_sink_.get_output()); + break; + } + } +} + +void Binlog::update_write_encryption() { + switch (encryption_type_) { + case EncryptionType::None: { + fd_.set_output_reader(&buffer_reader_); + byte_flow_flag_ = false; + break; + } + case EncryptionType::AesCtr: { + byte_flow_source_ = ByteFlowSource(&buffer_reader_); + aes_xcode_byte_flow_ = AesCtrByteFlow(); + aes_xcode_byte_flow_.init(std::move(aes_ctr_state_)); + byte_flow_sink_ = ByteFlowSink(); + byte_flow_source_ >> aes_xcode_byte_flow_ >> byte_flow_sink_; + byte_flow_flag_ = true; + fd_.set_output_reader(byte_flow_sink_.get_output()); + break; + } + } +} + +Status Binlog::load_binlog(const Callback &callback, const Callback &debug_callback) { + state_ = State::Load; + + buffer_writer_ = ChainBufferWriter(); + buffer_reader_ = buffer_writer_.extract_reader(); + fd_.set_input_writer(&buffer_writer_); + detail::BinlogReader reader; + binlog_reader_ptr_ = &reader; + + update_read_encryption(); + + bool ready_flag = false; + fd_.update_flags(Fd::Flag::Read); + info_.wrong_password = false; + while (true) { + BinlogEvent event; + auto r_need_size = reader.read_next(&event); + if (r_need_size.is_error()) { + LOG(ERROR) << r_need_size.error(); + break; + } + auto need_size = r_need_size.move_as_ok(); + // LOG(ERROR) << "need size = " << need_size; + if (need_size == 0) { + if (IGNORE_ERASE_HACK && event.type_ == BinlogEvent::ServiceTypes::Empty && + (event.flags_ & BinlogEvent::Flags::Rewrite) != 0) { + // skip erase + } else { + if (debug_callback) { + debug_callback(event); + } + do_add_event(std::move(event)); + if (info_.wrong_password) { + return Status::OK(); + } + } + ready_flag = false; + } else { + // TODO(now): fix bug + if (ready_flag) { + break; + } + TRY_STATUS(fd_.flush_read(max(need_size, static_cast<size_t>(4096)))); + buffer_reader_.sync_with_writer(); + if (byte_flow_flag_) { + byte_flow_source_.wakeup(); + } + ready_flag = true; + } + } + + auto offset = processor_->offset(); + processor_->for_each([&](BinlogEvent &event) { + VLOG(binlog) << "Replay binlog event: " << event; + if (callback) { + callback(event); + } + }); + + auto fd_size = fd_.get_size(); + if (offset != fd_size) { + LOG(ERROR) << "Truncate " << tag("path", path_) << tag("old_size", fd_size) << tag("new_size", offset); + fd_.seek(offset).ensure(); + fd_.truncate_to_current_position(offset).ensure(); + db_key_used_ = false; // force reindex + } + CHECK(IGNORE_ERASE_HACK || fd_size_ == offset) << fd_size << " " << fd_size_ << " " << offset; + binlog_reader_ptr_ = nullptr; + state_ = State::Run; + + buffer_writer_ = ChainBufferWriter(); + buffer_reader_ = buffer_writer_.extract_reader(); + + // reuse aes_ctr_state_ + if (encryption_type_ == EncryptionType::AesCtr) { + aes_ctr_state_ = aes_xcode_byte_flow_.move_aes_ctr_state(); + } + update_write_encryption(); + + return Status::OK(); +} + +static int64 file_size(CSlice path) { + auto r_stat = stat(path); + if (r_stat.is_error()) { + return 0; + } + return r_stat.ok().size_; +} + +void Binlog::update_encryption(Slice key, Slice iv) { + MutableSlice(aes_ctr_key_.raw, sizeof(aes_ctr_key_.raw)).copy_from(key); + UInt128 aes_ctr_iv; + MutableSlice(aes_ctr_iv.raw, sizeof(aes_ctr_iv.raw)).copy_from(iv); + aes_ctr_state_.init(aes_ctr_key_, aes_ctr_iv); +} + +void Binlog::reset_encryption() { + if (db_key_.is_empty()) { + encryption_type_ = EncryptionType::None; + return; + } + + using EncryptionEvent = detail::AesCtrEncryptionEvent; + EncryptionEvent event; + + if (aes_ctr_key_salt_.empty()) { + event.key_salt_ = BufferSlice(EncryptionEvent::default_salt_size()); + Random::secure_bytes(event.key_salt_.as_slice()); + } else { + event.key_salt_ = aes_ctr_key_salt_.clone(); + } + event.iv_ = BufferSlice(EncryptionEvent::iv_size()); + Random::secure_bytes(event.iv_.as_slice()); + + BufferSlice key; + if (aes_ctr_key_salt_.as_slice() == event.key_salt_.as_slice()) { + key = BufferSlice(Slice(aes_ctr_key_.raw, sizeof(aes_ctr_key_.raw))); + } else { + key = event.generate_key(db_key_); + } + + event.key_hash_ = event.generate_hash(key.as_slice()); + + do_event(BinlogEvent( + BinlogEvent::create_raw(0, BinlogEvent::ServiceTypes::AesCtrEncryption, 0, create_default_storer(event)))); +} + +void Binlog::do_reindex() { + flush_events_buffer(true); + // start reindex + CHECK(state_ == State::Run); + state_ = State::Reindex; + SCOPE_EXIT { + state_ = State::Run; + }; + + auto start_time = Clocks::monotonic(); + auto start_size = file_size(path_); + auto start_events = fd_events_; + + string new_path = path_ + ".new"; + + auto r_opened_file = open_binlog(new_path, FileFd::Flags::Write | FileFd::Flags::Create | FileFd::Truncate); + if (r_opened_file.is_error()) { + LOG(ERROR) << "Can't open new binlog for regenerate: " << r_opened_file.error(); + return; + } + fd_.close(); + fd_ = BufferedFdBase<FileFd>(r_opened_file.move_as_ok()); + + buffer_writer_ = ChainBufferWriter(); + buffer_reader_ = buffer_writer_.extract_reader(); + encryption_type_ = EncryptionType::None; + update_write_encryption(); + + // reindex + fd_size_ = 0; + fd_events_ = 0; + reset_encryption(); + processor_->for_each([&](BinlogEvent &event) { + do_event(std::move(event)); // NB: no move is actually happens + }); + need_sync_ = true; // must sync creation of the file + sync(); + + // finish_reindex + auto status = unlink(path_); + LOG_IF(FATAL, status.is_error()) << "Failed to unlink old binlog: " << status; + status = rename(new_path, path_); + LOG_IF(FATAL, status.is_error()) << "Failed to rename binlog: " << status; + + auto finish_time = Clocks::monotonic(); + auto finish_size = fd_size_; + auto finish_events = fd_events_; + CHECK(fd_size_ == file_size(path_)); + + // TODO: print warning only if time or ratio is suspicious + double ratio = static_cast<double>(start_size) / static_cast<double>(finish_size + 1); + LOG(INFO) << "regenerate index " << tag("name", path_) << tag("time", format::as_time(finish_time - start_time)) + << tag("before_size", format::as_size(start_size)) << tag("after_size", format::as_size(finish_size)) + << tag("ratio", ratio) << tag("before_events", start_events) << tag("after_events", finish_events); + + buffer_writer_ = ChainBufferWriter(); + buffer_reader_ = buffer_writer_.extract_reader(); + + // reuse aes_ctr_state_ + if (encryption_type_ == EncryptionType::AesCtr) { + aes_ctr_state_ = aes_xcode_byte_flow_.move_aes_ctr_state(); + } + update_write_encryption(); +} + +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/Binlog.h b/libs/tdlib/td/tddb/td/db/binlog/Binlog.h new file mode 100644 index 0000000000..cdda868b4e --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/Binlog.h @@ -0,0 +1,143 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/db/binlog/BinlogEvent.h" +#include "td/db/DbKey.h" + +#include "td/utils/AesCtrByteFlow.h" +#include "td/utils/buffer.h" +#include "td/utils/BufferedFd.h" +#include "td/utils/ByteFlow.h" +#include "td/utils/common.h" +#include "td/utils/crypto.h" +#include "td/utils/port/FileFd.h" +#include "td/utils/Slice.h" +#include "td/utils/Status.h" + +#include <functional> + +namespace td { +struct BinlogInfo { + bool was_created; + uint64 last_id; + bool is_encrypted{false}; + bool wrong_password{false}; + bool is_opened{false}; +}; + +namespace detail { +class BinlogReader; +class BinlogEventsProcessor; +class BinlogEventsBuffer; +}; // namespace detail + +class Binlog { + public: + enum Error : int { WrongPassword = -1 }; + static bool IGNORE_ERASE_HACK; + Binlog(); + Binlog(const Binlog &other) = delete; + Binlog &operator=(const Binlog &other) = delete; + Binlog(Binlog &&other) = delete; + Binlog &operator=(Binlog &&other) = delete; + ~Binlog(); + + using Callback = std::function<void(const BinlogEvent &)>; + Status init(string path, const Callback &callback, DbKey db_key = DbKey::empty(), DbKey old_db_key = DbKey::empty(), + int32 dummy = -1, const Callback &debug_callback = Callback()) TD_WARN_UNUSED_RESULT; + + uint64 next_id() { + return ++last_id_; + } + uint64 next_id(int32 shift) { + auto res = last_id_ + 1; + last_id_ += shift; + return res; + } + uint64 peek_next_id() const { + return last_id_ + 1; + } + + bool empty() const { + return fd_.empty(); + } + + void add_raw_event(BufferSlice &&raw_event) { + add_event(BinlogEvent(std::move(raw_event))); + } + + void add_event(BinlogEvent &&event); + void sync(); + void flush(); + void lazy_flush(); + double need_flush_since() const { + return need_flush_since_; + } + void change_key(DbKey new_db_key); + + Status close(bool need_sync = true) TD_WARN_UNUSED_RESULT; + Status close_and_destroy() TD_WARN_UNUSED_RESULT; + static Status destroy(Slice path) TD_WARN_UNUSED_RESULT; + + CSlice get_path() const { + return path_; + } + + BinlogInfo get_info() const { // works even after binlog was closed + return info_; + } + + private: + BufferedFdBase<FileFd> fd_; + ChainBufferWriter buffer_writer_; + ChainBufferReader buffer_reader_; + detail::BinlogReader *binlog_reader_ptr_; + + BinlogInfo info_; + DbKey db_key_; + bool db_key_used_ = false; + DbKey old_db_key_; + enum class EncryptionType { None, AesCtr } encryption_type_ = EncryptionType::None; + + // AesCtrEncryption + BufferSlice aes_ctr_key_salt_; + UInt256 aes_ctr_key_; + AesCtrState aes_ctr_state_; + + bool byte_flow_flag_ = false; + ByteFlowSource byte_flow_source_; + ByteFlowSink byte_flow_sink_; + AesCtrByteFlow aes_xcode_byte_flow_; + + int64 fd_size_{0}; + uint64 fd_events_{0}; + string path_; + std::vector<BinlogEvent> pending_events_; + std::unique_ptr<detail::BinlogEventsProcessor> processor_; + std::unique_ptr<detail::BinlogEventsBuffer> events_buffer_; + bool in_flush_events_buffer_{false}; + uint64 last_id_{0}; + double need_flush_since_ = 0; + bool need_sync_{false}; + enum class State { Empty, Load, Reindex, Run } state_{State::Empty}; + + static constexpr uint32 MAX_EVENT_SIZE = 65536; + + Result<FileFd> open_binlog(CSlice path, int32 flags); + size_t flush_events_buffer(bool force); + void do_add_event(BinlogEvent &&event); + void do_event(BinlogEvent &&event); + Status load_binlog(const Callback &callback, const Callback &debug_callback = Callback()) TD_WARN_UNUSED_RESULT; + void do_reindex(); + + void update_encryption(Slice key, Slice iv); + void reset_encryption(); + void update_read_encryption(); + void update_write_encryption(); +}; +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp b/libs/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp new file mode 100644 index 0000000000..e4584e920e --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/BinlogEvent.cpp @@ -0,0 +1,38 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/binlog/BinlogEvent.h" + +#include "td/utils/tl_parsers.h" + +namespace td { +int32 VERBOSITY_NAME(binlog) = VERBOSITY_NAME(DEBUG) + 8; + +Status BinlogEvent::init(BufferSlice &&raw_event, bool check_crc) { + TlParser parser(raw_event.as_slice()); + size_ = parser.fetch_int(); + CHECK(size_ == raw_event.size()); + id_ = parser.fetch_long(); + type_ = parser.fetch_int(); + flags_ = parser.fetch_int(); + extra_ = parser.fetch_long(); + CHECK(size_ >= MIN_EVENT_SIZE); + auto slice_data = parser.fetch_string_raw<Slice>(size_ - MIN_EVENT_SIZE); + data_ = MutableSlice(const_cast<char *>(slice_data.begin()), slice_data.size()); + crc32_ = static_cast<uint32>(parser.fetch_int()); + if (check_crc) { + CHECK(size_ >= EVENT_TAIL_SIZE); + auto calculated_crc = crc32(raw_event.as_slice().truncate(size_ - EVENT_TAIL_SIZE)); + if (calculated_crc != crc32_) { + return Status::Error(PSLICE() << "crc mismatch " << tag("actual", format::as_hex(calculated_crc)) + << tag("expected", format::as_hex(crc32_))); + } + } + raw_event_ = std::move(raw_event); + return Status::OK(); +} + +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/BinlogEvent.h b/libs/tdlib/td/tddb/td/db/binlog/BinlogEvent.h new file mode 100644 index 0000000000..1874543dff --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/BinlogEvent.h @@ -0,0 +1,109 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/buffer.h" +#include "td/utils/common.h" +#include "td/utils/crypto.h" +#include "td/utils/format.h" +#include "td/utils/logging.h" +#include "td/utils/misc.h" +#include "td/utils/Slice.h" +#include "td/utils/Status.h" +#include "td/utils/Storer.h" +#include "td/utils/StringBuilder.h" +#include "td/utils/tl_storers.h" + +namespace td { +struct EmptyStorerImpl { + EmptyStorerImpl() { + } + + template <class StorerT> + void store(StorerT &storer) const { + } +}; + +inline auto EmptyStorer() { + static const EmptyStorerImpl impl; + return create_default_storer(impl); +} + +static constexpr size_t MAX_EVENT_SIZE = 1 << 24; +static constexpr size_t EVENT_HEADER_SIZE = 4 + 8 + 4 + 4 + 8; +static constexpr size_t EVENT_TAIL_SIZE = 4; +static constexpr size_t MIN_EVENT_SIZE = EVENT_HEADER_SIZE + EVENT_TAIL_SIZE; + +extern int32 VERBOSITY_NAME(binlog); + +// TODO: smaller BinlogEvent +struct BinlogEvent { + int64 offset_; + + uint32 size_; + uint64 id_; + int32 type_; // type can be merged with flags + int32 flags_; + uint64 extra_; + MutableSlice data_; + uint32 crc32_; + + BufferSlice raw_event_; + + enum ServiceTypes { Header = -1, Empty = -2, AesCtrEncryption = -3, NoEncryption = -4 }; + enum Flags { Rewrite = 1, Partial = 2 }; + + void clear() { + raw_event_ = BufferSlice(); + } + bool empty() const { + return raw_event_.empty(); + } + BinlogEvent clone() const { + BinlogEvent result; + result.init(raw_event_.clone()).ensure(); + return result; + } + + BufferSlice data_as_buffer_slice() const { + return raw_event_.from_slice(data_); + } + + BinlogEvent() = default; + explicit BinlogEvent(BufferSlice &&raw_event) { + init(std::move(raw_event), false).ensure(); + } + Status init(BufferSlice &&raw_event, bool check_crc = true) TD_WARN_UNUSED_RESULT; + + static BufferSlice create_raw(uint64 id, int32 type, int32 flags, const Storer &storer); +}; + +inline StringBuilder &operator<<(StringBuilder &sb, const BinlogEvent &event) { + return sb << "LogEvent[" << tag("id", format::as_hex(event.id_)) << tag("type", event.type_) + << tag("flags", event.flags_) << tag("data", format::as_hex_dump<4>(event.data_)) << "]"; +} + +// Implementation +inline BufferSlice BinlogEvent::create_raw(uint64 id, int32 type, int32 flags, const Storer &storer) { + auto raw_event = BufferSlice{storer.size() + MIN_EVENT_SIZE}; + + TlStorerUnsafe tl_storer(raw_event.as_slice().begin()); + tl_storer.store_int(narrow_cast<int32>(raw_event.size())); + tl_storer.store_long(id); + tl_storer.store_int(type); + tl_storer.store_int(flags); + tl_storer.store_long(0); + + CHECK(tl_storer.get_buf() == raw_event.as_slice().begin() + EVENT_HEADER_SIZE); + tl_storer.store_storer(storer); + + CHECK(tl_storer.get_buf() == raw_event.as_slice().end() - EVENT_TAIL_SIZE); + tl_storer.store_int(::td::crc32(raw_event.as_slice().truncate(raw_event.size() - EVENT_TAIL_SIZE))); + + return raw_event; +} +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/BinlogHelper.h b/libs/tdlib/td/tddb/td/db/binlog/BinlogHelper.h new file mode 100644 index 0000000000..1224ac4ac2 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/BinlogHelper.h @@ -0,0 +1,45 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/PromiseFuture.h" + +#include "td/db/binlog/BinlogEvent.h" + +#include "td/utils/common.h" +#include "td/utils/Storer.h" + +namespace td { +class BinlogHelper { + public: + template <class BinlogT, class StorerT> + static uint64 add(const BinlogT &binlog_ptr, int32 type, const StorerT &storer, Promise<> promise = Promise<>()) { + auto logevent_id = binlog_ptr->next_id(); + binlog_ptr->add_raw_event(logevent_id, BinlogEvent::create_raw(logevent_id, type, 0, storer), std::move(promise)); + return logevent_id; + } + + template <class BinlogT, class StorerT> + static uint64 rewrite(const BinlogT &binlog_ptr, uint64 logevent_id, int32 type, const StorerT &storer, + Promise<> promise = Promise<>()) { + auto seq_no = binlog_ptr->next_id(); + binlog_ptr->add_raw_event(seq_no, BinlogEvent::create_raw(logevent_id, type, BinlogEvent::Flags::Rewrite, storer), + std::move(promise)); + return seq_no; + } + + template <class BinlogT> + static uint64 erase(const BinlogT &binlog_ptr, uint64 logevent_id, Promise<> promise = Promise<>()) { + auto seq_no = binlog_ptr->next_id(); + binlog_ptr->add_raw_event(seq_no, + BinlogEvent::create_raw(logevent_id, BinlogEvent::ServiceTypes::Empty, + BinlogEvent::Flags::Rewrite, EmptyStorer()), + std::move(promise)); + return seq_no; + } +}; +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/BinlogInterface.h b/libs/tdlib/td/tddb/td/db/binlog/BinlogInterface.h new file mode 100644 index 0000000000..2360f3c480 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/BinlogInterface.h @@ -0,0 +1,51 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/PromiseFuture.h" + +#include "td/db/binlog/BinlogEvent.h" +#include "td/db/DbKey.h" + +#include "td/utils/buffer.h" +#include "td/utils/common.h" + +namespace td { +class BinlogInterface { + public: + BinlogInterface() = default; + BinlogInterface(const BinlogInterface &) = delete; + BinlogInterface &operator=(const BinlogInterface &) = delete; + BinlogInterface(BinlogInterface &&) = delete; + BinlogInterface &operator=(BinlogInterface &&) = delete; + virtual ~BinlogInterface() = default; + + void close(Promise<> promise = {}) { + close_impl(std::move(promise)); + } + void close_and_destroy(Promise<> promise = {}) { + close_and_destroy_impl(std::move(promise)); + } + void add_raw_event(uint64 id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) { + add_raw_event_impl(id, std::move(raw_event), std::move(promise)); + } + void lazy_sync(Promise<> promise = Promise<>()) { + add_raw_event_impl(next_id(), BufferSlice(), std::move(promise)); + } + virtual void force_sync(Promise<> promise) = 0; + virtual void force_flush() = 0; + virtual void change_key(DbKey db_key, Promise<> promise = Promise<>()) = 0; + + virtual uint64 next_id() = 0; + virtual uint64 next_id(int32 shift) = 0; + + protected: + virtual void close_impl(Promise<> promise) = 0; + virtual void close_and_destroy_impl(Promise<> promise) = 0; + virtual void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) = 0; +}; +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp b/libs/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp new file mode 100644 index 0000000000..aaf07f2967 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.cpp @@ -0,0 +1,203 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/binlog/ConcurrentBinlog.h" + +#include "td/utils/logging.h" +#include "td/utils/OrderedEventsProcessor.h" +#include "td/utils/Time.h" + +#include <map> + +namespace td { +namespace detail { +class BinlogActor : public Actor { + public: + BinlogActor(std::unique_ptr<Binlog> binlog, uint64 seq_no) : binlog_(std::move(binlog)), processor_(seq_no) { + } + void close(Promise<> promise) { + binlog_->close().ensure(); + promise.set_value(Unit()); + LOG(INFO) << "close: done"; + stop(); + } + void close_and_destroy(Promise<> promise) { + binlog_->close_and_destroy().ensure(); + promise.set_value(Unit()); + LOG(INFO) << "close_and_destroy: done"; + stop(); + } + + struct Event { + BufferSlice raw_event; + Promise<> sync_promise; + }; + void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise) { + processor_.add(seq_no, Event{std::move(raw_event), std::move(promise)}, [&](uint64 id, Event &&event) { + if (!event.raw_event.empty()) { + do_add_raw_event(std::move(event.raw_event)); + } + do_lazy_sync(std::move(event.sync_promise)); + }); + flush_immediate_sync(); + try_flush(); + } + + void force_sync(Promise<> &&promise) { + auto seq_no = processor_.max_unfinished_seq_no(); + if (processor_.max_finished_seq_no() == seq_no) { + do_immediate_sync(std::move(promise)); + } else { + immediate_sync_promises_.emplace(seq_no, std::move(promise)); + } + } + + void force_flush() { + // TODO: use same logic as in force_sync + binlog_->flush(); + flush_flag_ = false; + } + + void change_key(DbKey db_key, Promise<> promise) { + binlog_->change_key(std::move(db_key)); + promise.set_value(Unit()); + } + + private: + std::unique_ptr<Binlog> binlog_; + + OrderedEventsProcessor<Event> processor_; + + std::multimap<uint64, Promise<>> immediate_sync_promises_; + std::vector<Promise<>> sync_promises_; + bool force_sync_flag_ = false; + bool lazy_sync_flag_ = false; + bool flush_flag_ = false; + double wakeup_at_ = 0; + + static constexpr int32 FLUSH_TIMEOUT = 1; // 1s + + void wakeup_after(double after) { + auto now = Time::now_cached(); + wakeup_at(now + after); + } + + void wakeup_at(double at) { + if (wakeup_at_ == 0 || wakeup_at_ > at) { + wakeup_at_ = at; + set_timeout_at(wakeup_at_); + } + } + + void do_add_raw_event(BufferSlice &&raw_event) { + binlog_->add_raw_event(std::move(raw_event)); + } + + void try_flush() { + auto need_flush_since = binlog_->need_flush_since(); + auto now = Time::now_cached(); + if (now > need_flush_since + FLUSH_TIMEOUT - 1e-9) { + binlog_->flush(); + } else { + if (!force_sync_flag_) { + flush_flag_ = true; + wakeup_at(need_flush_since + FLUSH_TIMEOUT); + } + } + } + + void flush_immediate_sync() { + auto seq_no = processor_.max_finished_seq_no(); + for (auto it = immediate_sync_promises_.begin(), end = immediate_sync_promises_.end(); + it != end && it->first <= seq_no; it = immediate_sync_promises_.erase(it)) { + do_immediate_sync(std::move(it->second)); + } + } + + void do_immediate_sync(Promise<> &&promise) { + if (promise) { + sync_promises_.emplace_back(std::move(promise)); + } + if (!force_sync_flag_) { + force_sync_flag_ = true; + wakeup_after(0.003); + } + } + + void do_lazy_sync(Promise<> &&promise) { + if (!promise) { + return; + } + sync_promises_.emplace_back(std::move(promise)); + if (!lazy_sync_flag_ && !force_sync_flag_) { + wakeup_after(30); + lazy_sync_flag_ = true; + } + } + + void timeout_expired() override { + bool need_sync = lazy_sync_flag_ || force_sync_flag_; + lazy_sync_flag_ = false; + force_sync_flag_ = false; + bool need_flush = flush_flag_; + flush_flag_ = false; + wakeup_at_ = 0; + if (need_sync) { + binlog_->sync(); + // LOG(ERROR) << "BINLOG SYNC"; + for (auto &promise : sync_promises_) { + promise.set_value(Unit()); + } + sync_promises_.clear(); + } else if (need_flush) { + try_flush(); + // LOG(ERROR) << "BINLOG FLUSH"; + } + } +}; +} // namespace detail + +ConcurrentBinlog::ConcurrentBinlog() = default; +ConcurrentBinlog::~ConcurrentBinlog() = default; +ConcurrentBinlog::ConcurrentBinlog(std::unique_ptr<Binlog> binlog, int scheduler_id) { + init_impl(std::move(binlog), scheduler_id); +} + +Result<BinlogInfo> ConcurrentBinlog::init(string path, const Callback &callback, DbKey db_key, DbKey old_db_key, + int scheduler_id) { + auto binlog = std::make_unique<Binlog>(); + TRY_STATUS(binlog->init(std::move(path), callback, std::move(db_key), std::move(old_db_key))); + auto info = binlog->get_info(); + init_impl(std::move(binlog), scheduler_id); + return info; +} + +void ConcurrentBinlog::init_impl(std::unique_ptr<Binlog> binlog, int32 scheduler_id) { + path_ = binlog->get_path().str(); + last_id_ = binlog->peek_next_id(); + binlog_actor_ = + create_actor_on_scheduler<detail::BinlogActor>("Binlog " + path_, scheduler_id, std::move(binlog), last_id_); +} + +void ConcurrentBinlog::close_impl(Promise<> promise) { + send_closure(std::move(binlog_actor_), &detail::BinlogActor::close, std::move(promise)); +} +void ConcurrentBinlog::close_and_destroy_impl(Promise<> promise) { + send_closure(std::move(binlog_actor_), &detail::BinlogActor::close_and_destroy, std::move(promise)); +} +void ConcurrentBinlog::add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) { + send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, id, std::move(raw_event), std::move(promise)); +} +void ConcurrentBinlog::force_sync(Promise<> promise) { + send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise)); +} +void ConcurrentBinlog::force_flush() { + send_closure(binlog_actor_, &detail::BinlogActor::force_flush); +} +void ConcurrentBinlog::change_key(DbKey db_key, Promise<> promise) { + send_closure(binlog_actor_, &detail::BinlogActor::change_key, std::move(db_key), std::move(promise)); +} +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h b/libs/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h new file mode 100644 index 0000000000..ce77a78a84 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/ConcurrentBinlog.h @@ -0,0 +1,68 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/actor.h" +#include "td/actor/PromiseFuture.h" + +#include "td/db/binlog/Binlog.h" +#include "td/db/binlog/BinlogInterface.h" + +#include "td/utils/buffer.h" +#include "td/utils/common.h" +#include "td/utils/Slice.h" +#include "td/utils/Status.h" + +#include <atomic> +#include <functional> + +namespace td { + +namespace detail { +class BinlogActor; +} // namespace detail + +class ConcurrentBinlog : public BinlogInterface { + public: + using Callback = std::function<void(const BinlogEvent &)>; + Result<BinlogInfo> init(string path, const Callback &callback, DbKey db_key = DbKey::empty(), + DbKey old_db_key = DbKey::empty(), int scheduler_id = -1) TD_WARN_UNUSED_RESULT; + + ConcurrentBinlog(); + explicit ConcurrentBinlog(std::unique_ptr<Binlog> binlog, int scheduler_id = -1); + ConcurrentBinlog(const ConcurrentBinlog &other) = delete; + ConcurrentBinlog &operator=(const ConcurrentBinlog &other) = delete; + ConcurrentBinlog(ConcurrentBinlog &&other) = delete; + ConcurrentBinlog &operator=(ConcurrentBinlog &&other) = delete; + ~ConcurrentBinlog() override; + + void force_sync(Promise<> promise) override; + void force_flush() override; + void change_key(DbKey db_key, Promise<> promise) override; + + uint64 next_id() override { + return last_id_.fetch_add(1, std::memory_order_relaxed); + } + uint64 next_id(int32 shift) override { + return last_id_.fetch_add(shift, std::memory_order_relaxed); + } + + CSlice get_path() const { + return path_; + } + + private: + void init_impl(std::unique_ptr<Binlog> binlog, int scheduler_id); + void close_impl(Promise<> promise) override; + void close_and_destroy_impl(Promise<> promise) override; + void add_raw_event_impl(uint64 id, BufferSlice &&raw_event, Promise<> promise) override; + + ActorOwn<detail::BinlogActor> binlog_actor_; + string path_; + std::atomic<uint64> last_id_; +}; +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp b/libs/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp new file mode 100644 index 0000000000..a8b8bf9e1b --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/binlog_dump.cpp @@ -0,0 +1,53 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/binlog/Binlog.h" + +#include "td/utils/common.h" +#include "td/utils/format.h" +#include "td/utils/logging.h" + +#include <cstdio> +#include <map> + +int main(int argc, char *argv[]) { + if (argc < 2) { + std::fprintf(stderr, "Usage: binlog_dump <binlog_file_name>\n"); + return 1; + } + + struct Info { + std::size_t full_size = 0; + std::size_t compressed_size = 0; + }; + std::map<td::uint64, Info> info; + + SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR)); + td::Binlog binlog; + binlog + .init(argv[1], + [&](auto &event) { + info[0].compressed_size += event.raw_event_.size(); + info[event.type_].compressed_size += event.raw_event_.size(); + }, + td::DbKey::empty(), td::DbKey::empty(), -1, + [&](auto &event) mutable { + info[0].full_size += event.raw_event_.size(); + info[event.type_].full_size += event.raw_event_.size(); + LOG(PLAIN) << "LogEvent[" << td::tag("id", td::format::as_hex(event.id_)) << td::tag("type", event.type_) + << td::tag("flags", event.flags_) << td::tag("data", td::format::escaped(event.data_)) + << "]\n"; + }) + .ensure(); + + for (auto &it : info) { + LOG(ERROR) << td::tag("handler", td::format::as_hex(it.first)) + << td::tag("full_size", td::format::as_size(it.second.full_size)) + << td::tag("compressed_size", td::format::as_size(it.second.compressed_size)); + } + + return 0; +} diff --git a/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp b/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp new file mode 100644 index 0000000000..b7ddc98ff1 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.cpp @@ -0,0 +1,39 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/binlog/detail/BinlogEventsBuffer.h" + +#include <algorithm> + +namespace td { +namespace detail { +void BinlogEventsBuffer::add_event(BinlogEvent &&event) { + total_events_++; + if ((event.flags_ & BinlogEvent::Flags::Partial) == 0) { + auto it = std::find(ids_.begin(), ids_.end(), event.id_); + if (it != ids_.end()) { + auto &to_event = events_[it - ids_.begin()]; + size_ -= to_event.size_; + to_event = std::move(event); + size_ += to_event.size_; + return; + } + } + ids_.push_back(event.id_); + size_ += event.size_; + events_.push_back(std::move(event)); +} +bool BinlogEventsBuffer::need_flush() const { + return total_events_ > 5000 || ids_.size() > 100; +} +void BinlogEventsBuffer::clear() { + ids_.clear(); + events_.clear(); + total_events_ = 0; + size_ = 0; +} +} // namespace detail +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h b/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h new file mode 100644 index 0000000000..dcd6d7c1b3 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsBuffer.h @@ -0,0 +1,47 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once +#include "td/db/binlog/BinlogEvent.h" + +#include "td/utils/common.h" + +namespace td { +namespace detail { +class BinlogEventsBuffer { + public: + void add_event(BinlogEvent &&event); + + bool need_flush() const; + + template <class CallbackT> + void flush(CallbackT &&callback) { + for (size_t i = 0; i < ids_.size(); i++) { + auto &event = events_[i]; + if (i + 1 != ids_.size() && (event.flags_ & BinlogEvent::Flags::Partial) == 0) { + callback(BinlogEvent(BinlogEvent::create_raw(event.id_, event.type_, event.flags_ | BinlogEvent::Flags::Partial, + create_storer(event.data_)))); + } else { + callback(std::move(event)); + } + } + clear(); + } + size_t size() const { + return size_; + } + + private: + vector<uint64> ids_; + vector<BinlogEvent> events_; + size_t total_events_{0}; + size_t size_{0}; + + void do_event(BinlogEvent &&event); + void clear(); +}; +} // namespace detail +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp b/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp new file mode 100644 index 0000000000..50ad91bff8 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.cpp @@ -0,0 +1,70 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/binlog/detail/BinlogEventsProcessor.h" + +#include "td/utils/logging.h" + +#include <algorithm> + +namespace td { +namespace detail { +void BinlogEventsProcessor::do_event(BinlogEvent &&event) { + offset_ = event.offset_; + auto fixed_id = event.id_ * 2; + if ((event.flags_ & BinlogEvent::Flags::Rewrite) && !ids_.empty() && ids_.back() >= fixed_id) { + auto it = std::lower_bound(ids_.begin(), ids_.end(), fixed_id); + if (it == ids_.end() || *it != fixed_id) { + LOG(FATAL) << "Ignore rewrite logevent"; + return; + } + auto pos = it - ids_.begin(); + total_raw_events_size_ -= static_cast<int64>(events_[pos].raw_event_.size()); + if (event.type_ == BinlogEvent::ServiceTypes::Empty) { + *it += 1; + empty_events_++; + events_[pos].clear(); + } else { + event.flags_ &= ~BinlogEvent::Flags::Rewrite; + total_raw_events_size_ += static_cast<int64>(event.raw_event_.size()); + events_[pos] = std::move(event); + } + } else if (event.type_ == BinlogEvent::ServiceTypes::Empty) { + // just skip this event + } else { + CHECK(ids_.empty() || ids_.back() < fixed_id); + last_id_ = event.id_; + total_raw_events_size_ += static_cast<int64>(event.raw_event_.size()); + total_events_++; + ids_.push_back(fixed_id); + events_.emplace_back(std::move(event)); + } + + if (total_events_ > 10 && empty_events_ * 4 > total_events_ * 3) { + compactify(); + } +} + +void BinlogEventsProcessor::compactify() { + CHECK(ids_.size() == events_.size()); + auto ids_from = ids_.begin(); + auto ids_to = ids_from; + auto events_from = events_.begin(); + auto events_to = events_from; + for (; ids_from != ids_.end(); ids_from++, events_from++) { + if ((*ids_from & 1) == 0) { + *ids_to++ = *ids_from; + *events_to++ = std::move(*events_from); + } + } + ids_.erase(ids_to, ids_.end()); + events_.erase(events_to, events_.end()); + total_events_ = ids_.size(); + empty_events_ = 0; + CHECK(ids_.size() == events_.size()); +} +} // namespace detail +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h b/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h new file mode 100644 index 0000000000..645f8c50ab --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/binlog/detail/BinlogEventsProcessor.h @@ -0,0 +1,53 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once +#include "td/db/binlog/BinlogEvent.h" + +#include "td/utils/common.h" + +namespace td { +namespace detail { +class BinlogEventsProcessor { + public: + void add_event(BinlogEvent &&event) { + do_event(std::move(event)); + } + + template <class CallbackT> + void for_each(CallbackT &&callback) { + for (size_t i = 0; i < ids_.size(); i++) { + if ((ids_[i] & 1) == 0) { + callback(events_[i]); + } + } + } + + uint64 last_id() const { + return last_id_; + } + int64 offset() const { + return offset_; + } + int64 total_raw_events_size() const { + return total_raw_events_size_; + } + + private: + // holds (id * 2 + was_deleted) + std::vector<uint64> ids_; + std::vector<BinlogEvent> events_; + size_t total_events_{0}; + size_t empty_events_{0}; + uint64 last_id_{0}; + int64 offset_{0}; + int64 total_raw_events_size_{0}; + + void do_event(BinlogEvent &&event); + void compactify(); +}; +} // namespace detail +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp b/libs/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp new file mode 100644 index 0000000000..6ee1b45af2 --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/detail/RawSqliteDb.cpp @@ -0,0 +1,39 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/detail/RawSqliteDb.h" + +#include "sqlite/sqlite3.h" + +#include "td/utils/common.h" +#include "td/utils/logging.h" +#include "td/utils/port/path.h" + +namespace td { +namespace detail { +Status RawSqliteDb::last_error(sqlite3 *db) { + return Status::Error(Slice(sqlite3_errmsg(db))); +} +Status RawSqliteDb::destroy(Slice path) { + with_db_path(path, [](auto path) { unlink(path).ignore(); }); + return Status::OK(); +} +Status RawSqliteDb::last_error() { + //If database was corrupted, try to delete it. + auto code = sqlite3_errcode(db_); + if (code == SQLITE_CORRUPT) { + destroy(path_).ignore(); + } + + return last_error(db_); +} +RawSqliteDb::~RawSqliteDb() { + auto rc = sqlite3_close(db_); + LOG_IF(FATAL, rc != SQLITE_OK) << last_error(db_); +} + +} // namespace detail +} // namespace td diff --git a/libs/tdlib/td/tddb/td/db/detail/RawSqliteDb.h b/libs/tdlib/td/tddb/td/db/detail/RawSqliteDb.h new file mode 100644 index 0000000000..801f3b192a --- /dev/null +++ b/libs/tdlib/td/tddb/td/db/detail/RawSqliteDb.h @@ -0,0 +1,51 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/logging.h" +#include "td/utils/Slice.h" +#include "td/utils/Status.h" + +struct sqlite3; + +namespace td { +namespace detail { +class RawSqliteDb { + public: + RawSqliteDb(sqlite3 *db, std::string path) : db_(db), path_(std::move(path)) { + } + RawSqliteDb(const RawSqliteDb &) = delete; + RawSqliteDb(RawSqliteDb &&) = delete; + RawSqliteDb &operator=(const RawSqliteDb &) = delete; + RawSqliteDb &operator=(RawSqliteDb &&) = delete; + ~RawSqliteDb(); + + template <class F> + static void with_db_path(Slice main_path, F &&f) { + f(PSLICE() << main_path); + f(PSLICE() << main_path << "-journal"); + f(PSLICE() << main_path << "-shm"); + f(PSLICE() << main_path << "-wal"); + } + static Status destroy(Slice path) TD_WARN_UNUSED_RESULT; + + sqlite3 *db() { + return db_; + } + CSlice path() const { + return path_; + } + + Status last_error(); + static Status last_error(sqlite3 *db); + + private: + sqlite3 *db_; + std::string path_; +}; +}; // namespace detail +} // namespace td |